use std::time::Duration;
use aa_core::identity::AgentId;
use async_trait::async_trait;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use super::agent::{AgentFilter, AgentRecord};
use super::audit::{AuditEvent, AuditFilter};
use super::backend::StorageBackend;
use super::error::{StorageError, StorageResult};
use super::health::{HealthStatus, RowCounts, StorageHealth};
use super::metric::{Metric, MetricPoint, MetricQuery};
use super::policy::{PolicyDocument, PolicyMeta, PolicyVersion};
use super::postgres_config::PostgresConfig;
use super::retention::{ColdAction, RetentionPolicy, RetentionStats};
use super::timescale::{has_timescaledb_extension, query_timescale_stats};
use aa_core::config::TimescaleConfig;
fn agent_id_to_text(id: &AgentId) -> String {
uuid::Uuid::from_bytes(*id.as_bytes()).to_string()
}
fn agent_id_from_text(s: &str) -> StorageResult<AgentId> {
let uuid = uuid::Uuid::parse_str(s).map_err(|e| StorageError::QueryFailed(format!("invalid agent_id {s}: {e}")))?;
Ok(AgentId::from_bytes(*uuid.as_bytes()))
}
fn row_to_audit_event(row: &sqlx::postgres::PgRow) -> StorageResult<AuditEvent> {
use sqlx::Row;
let agent_id_text: String = row
.try_get("agent_id")
.map_err(|e| StorageError::QueryFailed(format!("agent_id column: {e}")))?;
let agent_id = agent_id_from_text(&agent_id_text)?;
Ok(AuditEvent {
ts: row
.try_get("ts")
.map_err(|e| StorageError::QueryFailed(format!("ts column: {e}")))?,
event_id: row
.try_get("event_id")
.map_err(|e| StorageError::QueryFailed(format!("event_id column: {e}")))?,
agent_id,
team_id: row
.try_get("team_id")
.map_err(|e| StorageError::QueryFailed(format!("team_id column: {e}")))?,
action: row
.try_get("action")
.map_err(|e| StorageError::QueryFailed(format!("action column: {e}")))?,
decision: row
.try_get("decision")
.map_err(|e| StorageError::QueryFailed(format!("decision column: {e}")))?,
dry_run: row
.try_get("dry_run")
.map_err(|e| StorageError::QueryFailed(format!("dry_run column: {e}")))?,
shadow_decision: row
.try_get("shadow_decision")
.map_err(|e| StorageError::QueryFailed(format!("shadow_decision column: {e}")))?,
matched_rule_id: row
.try_get("matched_rule_id")
.map_err(|e| StorageError::QueryFailed(format!("matched_rule_id column: {e}")))?,
payload: row
.try_get("payload")
.map_err(|e| StorageError::QueryFailed(format!("payload column: {e}")))?,
})
}
fn row_to_agent_record(row: &sqlx::postgres::PgRow) -> StorageResult<AgentRecord> {
use sqlx::Row;
let agent_id_text: String = row
.try_get("agent_id")
.map_err(|e| StorageError::QueryFailed(format!("agent_id column: {e}")))?;
let agent_id = agent_id_from_text(&agent_id_text)?;
let metadata_json: serde_json::Value = row
.try_get("metadata")
.map_err(|e| StorageError::QueryFailed(format!("metadata column: {e}")))?;
let metadata: std::collections::BTreeMap<String, String> =
serde_json::from_value(metadata_json).map_err(|e| StorageError::QueryFailed(format!("metadata parse: {e}")))?;
Ok(AgentRecord {
agent_id,
team_id: row
.try_get("team_id")
.map_err(|e| StorageError::QueryFailed(format!("team_id column: {e}")))?,
org_id: row
.try_get("org_id")
.map_err(|e| StorageError::QueryFailed(format!("org_id column: {e}")))?,
metadata,
registered_at: row
.try_get("registered_at")
.map_err(|e| StorageError::QueryFailed(format!("registered_at column: {e}")))?,
last_seen_at: row
.try_get("last_seen_at")
.map_err(|e| StorageError::QueryFailed(format!("last_seen_at column: {e}")))?,
enforcement_mode: row
.try_get("enforcement_mode")
.map_err(|e| StorageError::QueryFailed(format!("enforcement_mode column: {e}")))?,
})
}
fn push_audit_where<'q>(qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>, filter: &'q AuditFilter) {
let mut started = false;
let mut connective = move |qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>| {
qb.push(if started { " AND " } else { " WHERE " });
started = true;
};
if let Some(agent_id) = filter.agent_id.as_ref() {
connective(qb);
qb.push("agent_id = ").push_bind(agent_id_to_text(agent_id));
}
if let Some(team_id) = filter.team_id.as_ref() {
connective(qb);
qb.push("team_id = ").push_bind(team_id.clone());
}
if let Some(from) = filter.from {
connective(qb);
qb.push("ts >= ").push_bind(from);
}
if let Some(to) = filter.to {
connective(qb);
qb.push("ts < ").push_bind(to);
}
if filter.dry_run_only {
connective(qb);
qb.push("dry_run = TRUE");
}
}
fn policy_document_bytes(value: serde_json::Value) -> Vec<u8> {
if let serde_json::Value::Object(ref obj) = value {
if obj.len() == 1 {
if let Some(serde_json::Value::String(raw)) = obj.get("raw_yaml") {
return raw.clone().into_bytes();
}
}
}
serde_json::to_vec(&value).expect("serialising a parsed JSON value never fails")
}
fn metric_bucket_unit(raw: &str) -> StorageResult<&'static str> {
match raw.trim() {
"1 second" => Ok("second"),
"1 minute" => Ok("minute"),
"1 hour" => Ok("hour"),
"1 day" => Ok("day"),
other => Err(StorageError::QueryFailed(format!(
"unsupported metric bucket interval: {other:?} (supported: \"1 second\", \"1 minute\", \"1 hour\", \"1 day\")"
))),
}
}
fn push_metric_where<'q>(qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>, query: &'q MetricQuery) {
let mut started = false;
let mut connective = move |qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>| {
qb.push(if started { " AND " } else { " WHERE " });
started = true;
};
if let Some(agent_id) = query.agent_id.as_ref() {
connective(qb);
qb.push("agent_id = ").push_bind(agent_id_to_text(agent_id));
}
if let Some(metric) = query.metric.as_ref() {
connective(qb);
qb.push("metric = ").push_bind(metric.clone());
}
if let Some(from) = query.from {
connective(qb);
qb.push("ts >= ").push_bind(from);
}
if let Some(to) = query.to {
connective(qb);
qb.push("ts < ").push_bind(to);
}
}
fn push_agent_where<'q>(qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>, filter: &'q AgentFilter) {
let mut started = false;
let mut connective = move |qb: &mut sqlx::QueryBuilder<'q, sqlx::Postgres>| {
qb.push(if started { " AND " } else { " WHERE " });
started = true;
};
if let Some(team_id) = filter.team_id.as_ref() {
connective(qb);
qb.push("team_id = ").push_bind(team_id.clone());
}
if let Some(org_id) = filter.org_id.as_ref() {
connective(qb);
qb.push("org_id = ").push_bind(org_id.clone());
}
if let Some(name_contains) = filter.name_contains.as_ref() {
connective(qb);
qb.push("metadata->>'name' LIKE ")
.push_bind(format!("%{name_contains}%"));
}
}
pub struct PostgresBackend {
pool: PgPool,
timescale_config: TimescaleConfig,
}
impl PostgresBackend {
pub async fn connect(config: &PostgresConfig) -> StorageResult<Self> {
let database_url = config.database_url.as_deref().ok_or_else(|| {
StorageError::ConnectionFailed(
"AAASM_DATABASE_URL is not set and storage.postgres.database_url is not configured".into(),
)
})?;
let pool = PgPoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.acquire_timeout(Duration::from_secs(config.connect_timeout_secs))
.connect(database_url)
.await
.map_err(|e| StorageError::ConnectionFailed(e.to_string()))?;
Ok(Self {
pool,
timescale_config: config.timescaledb.clone(),
})
}
pub(crate) async fn apply_timescaledb_setup(&self, config: &TimescaleConfig) -> StorageResult<()> {
if !config.enabled {
tracing::info!("storage.postgres.timescaledb.enabled = false; skipping hypertable setup");
return Ok(());
}
if !has_timescaledb_extension(&self.pool).await? {
tracing::warn!(
"TimescaleDB extension not found — using standard PostgreSQL tables. \
Install TimescaleDB for time-series query acceleration and auto-compression."
);
return Ok(());
}
tracing::info!("TimescaleDB extension active; hypertables governed by 0002_timescaledb_hypertables.sql");
Ok(())
}
}
#[async_trait]
impl StorageBackend for PostgresBackend {
async fn migrate(&self) -> StorageResult<()> {
sqlx::migrate!("./migrations/postgres")
.run(&self.pool)
.await
.map_err(|e| StorageError::MigrationFailed(e.to_string()))?;
self.apply_timescaledb_setup(&self.timescale_config).await
}
async fn append_audit_event(&self, event: &AuditEvent) -> StorageResult<()> {
sqlx::query(
"INSERT INTO audit_events \
(ts, event_id, agent_id, team_id, action, decision, \
dry_run, shadow_decision, matched_rule_id, payload) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
)
.bind(event.ts)
.bind(event.event_id)
.bind(agent_id_to_text(&event.agent_id))
.bind(event.team_id.as_deref())
.bind(&event.action)
.bind(&event.decision)
.bind(event.dry_run)
.bind(event.shadow_decision.as_deref())
.bind(event.matched_rule_id.as_deref())
.bind(event.payload.as_ref())
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| StorageError::QueryFailed(e.to_string()))
}
async fn query_audit_events(&self, filter: AuditFilter) -> StorageResult<Vec<AuditEvent>> {
let mut qb = sqlx::QueryBuilder::<sqlx::Postgres>::new(
"SELECT ts, event_id, agent_id, team_id, action, decision, \
dry_run, shadow_decision, matched_rule_id, payload FROM audit_events",
);
push_audit_where(&mut qb, &filter);
qb.push(" ORDER BY ts DESC");
if let Some(limit) = filter.limit {
qb.push(" LIMIT ").push_bind(i64::from(limit));
}
if let Some(offset) = filter.offset {
qb.push(" OFFSET ").push_bind(i64::from(offset));
}
let rows = qb
.build()
.fetch_all(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
rows.iter().map(row_to_audit_event).collect()
}
async fn count_audit_events(&self, filter: AuditFilter) -> StorageResult<u64> {
let mut qb = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT count(*) FROM audit_events");
push_audit_where(&mut qb, &filter);
let count: i64 = qb
.build_query_scalar()
.fetch_one(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
Ok(count as u64)
}
async fn upsert_agent(&self, record: AgentRecord) -> StorageResult<()> {
let metadata = serde_json::to_value(&record.metadata)
.map_err(|e| StorageError::QueryFailed(format!("metadata serialize: {e}")))?;
sqlx::query(
"INSERT INTO agent_registry \
(agent_id, team_id, org_id, metadata, registered_at, last_seen_at, enforcement_mode) \
VALUES ($1, $2, $3, $4, $5, $6, $7) \
ON CONFLICT (agent_id) DO UPDATE SET \
team_id = EXCLUDED.team_id, \
org_id = EXCLUDED.org_id, \
metadata = EXCLUDED.metadata, \
last_seen_at = EXCLUDED.last_seen_at, \
enforcement_mode = EXCLUDED.enforcement_mode",
)
.bind(agent_id_to_text(&record.agent_id))
.bind(record.team_id.as_deref())
.bind(record.org_id.as_deref())
.bind(metadata)
.bind(record.registered_at)
.bind(record.last_seen_at)
.bind(&record.enforcement_mode)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| StorageError::QueryFailed(e.to_string()))
}
async fn get_agent(&self, id: &AgentId) -> StorageResult<Option<AgentRecord>> {
let row = sqlx::query(
"SELECT agent_id, team_id, org_id, metadata, registered_at, last_seen_at, \
enforcement_mode FROM agent_registry WHERE agent_id = $1",
)
.bind(agent_id_to_text(id))
.fetch_optional(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
row.as_ref().map(row_to_agent_record).transpose()
}
async fn list_agents(&self, filter: AgentFilter) -> StorageResult<Vec<AgentRecord>> {
let mut qb = sqlx::QueryBuilder::<sqlx::Postgres>::new(
"SELECT agent_id, team_id, org_id, metadata, registered_at, last_seen_at, \
enforcement_mode FROM agent_registry",
);
push_agent_where(&mut qb, &filter);
qb.push(" ORDER BY agent_id");
if let Some(limit) = filter.limit {
qb.push(" LIMIT ").push_bind(i64::from(limit));
if let Some(offset) = filter.offset {
qb.push(" OFFSET ").push_bind(i64::from(offset));
}
}
let rows = qb
.build()
.fetch_all(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
rows.iter().map(row_to_agent_record).collect()
}
async fn delete_agent(&self, id: &AgentId) -> StorageResult<()> {
let result = sqlx::query("DELETE FROM agent_registry WHERE agent_id = $1")
.bind(agent_id_to_text(id))
.execute(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
if result.rows_affected() == 0 {
return Err(StorageError::NotFound(agent_id_to_text(id)));
}
Ok(())
}
async fn save_policy(&self, doc: PolicyDocument) -> StorageResult<PolicyVersion> {
let document_json = match serde_json::from_slice::<serde_json::Value>(&doc.bytes) {
Ok(value) => value,
Err(_) => {
let text = std::str::from_utf8(&doc.bytes)
.map_err(|e| StorageError::QueryFailed(format!("document bytes not UTF-8: {e}")))?;
serde_json::json!({ "raw_yaml": text })
}
};
let mut tx = self
.pool
.begin()
.await
.map_err(|e| StorageError::QueryFailed(format!("begin tx: {e}")))?;
let next_version: i32 =
sqlx::query_scalar("SELECT COALESCE(MAX(version), 0) + 1 FROM policy_versions WHERE name = $1")
.bind(&doc.name)
.fetch_one(&mut *tx)
.await
.map_err(|e| StorageError::QueryFailed(format!("compute next version: {e}")))?;
let (_id, returned_version, created_at, is_active): (i64, i32, chrono::DateTime<chrono::Utc>, bool) =
sqlx::query_as(
"INSERT INTO policy_versions (name, version, document, is_active) \
VALUES ($1, $2, $3, FALSE) \
RETURNING id, version, created_at, is_active",
)
.bind(&doc.name)
.bind(next_version)
.bind(&document_json)
.fetch_one(&mut *tx)
.await
.map_err(|e| match e {
sqlx::Error::Database(db) if db.is_unique_violation() => {
StorageError::Conflict(format!("{}@{next_version}", doc.name))
}
other => StorageError::QueryFailed(other.to_string()),
})?;
tx.commit()
.await
.map_err(|e| StorageError::QueryFailed(format!("commit tx: {e}")))?;
let version =
u32::try_from(returned_version).map_err(|e| StorageError::QueryFailed(format!("version overflow: {e}")))?;
Ok(PolicyVersion {
meta: PolicyMeta {
name: doc.name.clone(),
version,
created_at,
is_active,
},
document: doc,
})
}
async fn get_active_policy(&self, name: &str) -> StorageResult<Option<PolicyDocument>> {
let row: Option<(serde_json::Value,)> = sqlx::query_as(
"SELECT document FROM policy_versions \
WHERE name = $1 AND is_active = TRUE LIMIT 1",
)
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
Ok(row.map(|(document,)| PolicyDocument {
name: name.to_owned(),
bytes: policy_document_bytes(document),
}))
}
async fn list_policy_versions(&self, name: &str) -> StorageResult<Vec<PolicyMeta>> {
let rows: Vec<(i32, chrono::DateTime<chrono::Utc>, bool)> = sqlx::query_as(
"SELECT version, created_at, is_active FROM policy_versions \
WHERE name = $1 ORDER BY version DESC",
)
.bind(name)
.fetch_all(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
rows.into_iter()
.map(|(version, created_at, is_active)| {
let version =
u32::try_from(version).map_err(|e| StorageError::QueryFailed(format!("version overflow: {e}")))?;
Ok(PolicyMeta {
name: name.to_owned(),
version,
created_at,
is_active,
})
})
.collect()
}
async fn rollback_policy(&self, name: &str, version: u32) -> StorageResult<()> {
let version_i =
i32::try_from(version).map_err(|e| StorageError::QueryFailed(format!("version overflow: {e}")))?;
let mut tx = self
.pool
.begin()
.await
.map_err(|e| StorageError::QueryFailed(format!("begin tx: {e}")))?;
let exists: Option<(i32,)> = sqlx::query_as("SELECT 1 FROM policy_versions WHERE name = $1 AND version = $2")
.bind(name)
.bind(version_i)
.fetch_optional(&mut *tx)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
if exists.is_none() {
return Err(StorageError::NotFound(format!("{name}@{version}")));
}
sqlx::query(
"UPDATE policy_versions SET is_active = FALSE \
WHERE name = $1 AND is_active = TRUE",
)
.bind(name)
.execute(&mut *tx)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
sqlx::query(
"UPDATE policy_versions SET is_active = TRUE \
WHERE name = $1 AND version = $2",
)
.bind(name)
.bind(version_i)
.execute(&mut *tx)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
tx.commit()
.await
.map_err(|e| StorageError::QueryFailed(format!("commit tx: {e}")))?;
Ok(())
}
async fn record_metric(&self, m: Metric) -> StorageResult<()> {
let labels =
serde_json::to_value(&m.labels).map_err(|e| StorageError::QueryFailed(format!("labels serialize: {e}")))?;
sqlx::query(
"INSERT INTO metrics (ts, agent_id, metric, value, labels) \
VALUES ($1, $2, $3, $4, $5)",
)
.bind(m.ts)
.bind(agent_id_to_text(&m.agent_id))
.bind(&m.metric)
.bind(m.value)
.bind(labels)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| StorageError::QueryFailed(e.to_string()))
}
async fn query_metrics(&self, query: MetricQuery) -> StorageResult<Vec<MetricPoint>> {
let bucket_unit = query.bucket.as_deref().map(metric_bucket_unit).transpose()?;
let mut qb = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
if let Some(unit) = bucket_unit {
qb.push("date_trunc('");
qb.push(unit);
qb.push("', ts) AS bucket_ts, AVG(value) AS value FROM metrics");
} else {
qb.push("ts, value FROM metrics");
}
push_metric_where(&mut qb, &query);
if let Some(unit) = bucket_unit {
qb.push(" GROUP BY date_trunc('");
qb.push(unit);
qb.push("', ts) ORDER BY date_trunc('");
qb.push(unit);
qb.push("', ts) ASC");
} else {
qb.push(" ORDER BY ts ASC");
}
if let Some(limit) = query.limit {
qb.push(" LIMIT ").push_bind(i64::from(limit));
}
let rows: Vec<(chrono::DateTime<chrono::Utc>, f64)> = qb
.build_query_as()
.fetch_all(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
Ok(rows.into_iter().map(|(ts, value)| MetricPoint { ts, value }).collect())
}
async fn apply_retention(&self, policy: &RetentionPolicy) -> StorageResult<RetentionStats> {
if matches!(policy.cold_action, ColdAction::Archive) {
return Err(StorageError::RetentionError(format!(
"archive cold_action not supported by PostgresBackend yet (S-D will add drop_chunks); \
archive_url = {:?}",
policy.archive_url
)));
}
let now = chrono::Utc::now();
let cold_threshold = now - chrono::Duration::days(i64::from(policy.hot_days + policy.warm_days));
let hot_threshold = now - chrono::Duration::days(i64::from(policy.hot_days));
let dropped_rows: u64 = if policy.dry_run {
let count: i64 = sqlx::query_scalar("SELECT count(*) FROM audit_events WHERE ts < $1")
.bind(cold_threshold)
.fetch_one(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
count as u64
} else {
let result = sqlx::query("DELETE FROM audit_events WHERE ts < $1")
.bind(cold_threshold)
.execute(&self.pool)
.await
.map_err(|e| StorageError::RetentionError(e.to_string()))?;
result.rows_affected()
};
let hot_count: i64 = sqlx::query_scalar("SELECT count(*) FROM audit_events WHERE ts >= $1")
.bind(hot_threshold)
.fetch_one(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
Ok(RetentionStats {
hot_rows: hot_count as u64,
compressed_rows: 0,
archived_rows: 0,
dropped_rows,
freed_bytes: 0,
ran_at: chrono::Utc::now(),
})
}
async fn healthcheck(&self) -> StorageResult<StorageHealth> {
let start = std::time::Instant::now();
sqlx::query("SELECT 1")
.execute(&self.pool)
.await
.map_err(|e| StorageError::ConnectionFailed(e.to_string()))?;
let (audit_events, agents, policy_versions): (i64, i64, i64) = sqlx::query_as(
"SELECT \
(SELECT count(*) FROM audit_events), \
(SELECT count(*) FROM agent_registry), \
(SELECT count(*) FROM policy_versions)",
)
.fetch_one(&self.pool)
.await
.map_err(|e| StorageError::QueryFailed(e.to_string()))?;
let timescale = match has_timescaledb_extension(&self.pool).await {
Ok(true) => query_timescale_stats(&self.pool).await.ok(),
Ok(false) | Err(_) => None,
};
let latency_ms = u32::try_from(start.elapsed().as_millis()).unwrap_or(u32::MAX);
let status = if latency_ms < 200 {
HealthStatus::Ok
} else {
HealthStatus::Degraded
};
Ok(StorageHealth {
status,
backend: "postgres",
latency_ms,
row_counts: RowCounts {
audit_events: audit_events as u64,
agents: agents as u64,
policy_versions: policy_versions as u64,
},
timescale,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn pg_backend_or_skip() -> Option<PostgresBackend> {
let url = match std::env::var("AAASM_DATABASE_URL") {
Ok(v) => v,
Err(_) => {
eprintln!(
"skipping postgres test: AAASM_DATABASE_URL not set (CI provides this via services: postgres)"
);
return None;
}
};
let config = PostgresConfig {
database_url: Some(url),
..PostgresConfig::default()
};
Some(
PostgresBackend::connect(&config)
.await
.expect("connect to AAASM_DATABASE_URL"),
)
}
#[tokio::test]
async fn connect_rejects_missing_database_url() {
let config = PostgresConfig::default();
let result = PostgresBackend::connect(&config).await;
match result {
Err(StorageError::ConnectionFailed(msg)) => {
assert!(
msg.contains("AAASM_DATABASE_URL"),
"missing-URL error must mention AAASM_DATABASE_URL, got: {msg}"
);
}
Err(other) => panic!("expected ConnectionFailed, got {other:?}"),
Ok(_) => panic!("expected error when database_url is None"),
}
}
#[tokio::test]
async fn apply_timescaledb_setup_skips_when_disabled() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
let disabled = TimescaleConfig {
enabled: false,
..TimescaleConfig::default()
};
backend
.apply_timescaledb_setup(&disabled)
.await
.expect("apply_timescaledb_setup must return Ok when enabled = false");
}
#[tokio::test]
async fn apply_timescaledb_setup_warns_when_extension_absent() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() == Ok("1") {
eprintln!(
"skipping extension-absent test: TIMESCALEDB_AVAILABLE=1 (see apply_timescaledb_setup_succeeds_when_extension_active)"
);
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
let enabled = TimescaleConfig::default();
assert!(enabled.enabled, "default must have enabled = true");
backend
.apply_timescaledb_setup(&enabled)
.await
.expect("apply_timescaledb_setup must return Ok via graceful fallback on plain PostgreSQL");
let present = super::super::timescale::has_timescaledb_extension(&backend.pool)
.await
.expect("probe");
assert!(
!present,
"this test asserts the extension-absent path; if your CI installed TimescaleDB, set TIMESCALEDB_AVAILABLE=1"
);
}
#[tokio::test]
async fn apply_timescaledb_setup_succeeds_when_extension_active() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() != Ok("1") {
eprintln!("skipping extension-present test: TIMESCALEDB_AVAILABLE != 1");
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
let present = super::super::timescale::has_timescaledb_extension(&backend.pool)
.await
.expect("probe");
assert!(
present,
"TIMESCALEDB_AVAILABLE=1 was set but the extension is not installed; \
check the docker image is timescale/timescaledb:latest-pg17"
);
backend
.apply_timescaledb_setup(&TimescaleConfig::default())
.await
.expect("apply_timescaledb_setup must return Ok when extension is active");
}
#[tokio::test]
async fn migrate_creates_expected_tables() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
for table in ["agent_registry", "policy_versions", "audit_events", "metrics"] {
let exists: bool =
sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE tablename = $1)")
.bind(table)
.fetch_one(&backend.pool)
.await
.expect("query pg_tables");
assert!(exists, "table {table} should exist after migrate()");
}
}
#[tokio::test]
async fn migrate_is_idempotent() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("first migrate");
backend.migrate().await.expect("second migrate should be a no-op");
}
#[tokio::test]
async fn migrate_0002_succeeds_when_timescaledb_extension_absent() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() == Ok("1") {
eprintln!(
"skipping plain-postgres test: TIMESCALEDB_AVAILABLE=1 (see migrate_0002_creates_hypertables_when_timescaledb_active for the present-path test)"
);
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend
.migrate()
.await
.expect("migrate must not fail on plain PostgreSQL");
let has_extension: bool =
sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')")
.fetch_one(&backend.pool)
.await
.expect("query pg_extension");
assert!(
!has_extension,
"this test asserts the extension-absent path; if your CI installed TimescaleDB, set TIMESCALEDB_AVAILABLE=1"
);
let hypertable_schema_present: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = '_timescaledb_internal')",
)
.fetch_one(&backend.pool)
.await
.expect("query information_schema.schemata");
assert!(
!hypertable_schema_present,
"TimescaleDB internal schema must NOT exist when extension is absent"
);
}
#[tokio::test]
async fn migrate_0002_creates_hypertables_when_timescaledb_active() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() != Ok("1") {
eprintln!(
"skipping timescaledb present-path test: TIMESCALEDB_AVAILABLE != 1 (set to 1 when AAASM_DATABASE_URL points at a TimescaleDB-enabled instance)"
);
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend
.migrate()
.await
.expect("migrate must not fail on a TimescaleDB-enabled PostgreSQL");
let has_extension: bool =
sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')")
.fetch_one(&backend.pool)
.await
.expect("query pg_extension");
assert!(
has_extension,
"TIMESCALEDB_AVAILABLE=1 was set but the timescaledb extension is not installed; \
check the docker image is timescale/timescaledb:latest-pg17"
);
let hypertable_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM timescaledb_information.hypertables \
WHERE hypertable_name IN ('audit_events', 'metrics')",
)
.fetch_one(&backend.pool)
.await
.expect("query timescaledb_information.hypertables");
assert_eq!(
hypertable_count, 2,
"expected both audit_events and metrics to be promoted to hypertables, found {hypertable_count}"
);
}
fn fresh_agent_id() -> AgentId {
AgentId::from_bytes(*uuid::Uuid::new_v4().as_bytes())
}
fn now_micros() -> chrono::DateTime<chrono::Utc> {
chrono::DateTime::from_timestamp_micros(chrono::Utc::now().timestamp_micros())
.expect("now fits in micros range")
}
fn sample_event(agent_id: AgentId, ts: chrono::DateTime<chrono::Utc>) -> AuditEvent {
AuditEvent {
ts,
event_id: uuid::Uuid::new_v4(),
agent_id,
team_id: Some("test-team".to_string()),
action: "tool_call".to_string(),
decision: "allow".to_string(),
dry_run: false,
shadow_decision: None,
matched_rule_id: Some("rule-42".to_string()),
payload: Some(serde_json::json!({"tool": "shell", "args": ["ls", "-la"]})),
}
}
#[tokio::test]
async fn append_then_query_round_trip() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let event = sample_event(agent_id, now_micros());
backend.append_audit_event(&event).await.expect("append");
let rows = backend
.query_audit_events(AuditFilter {
agent_id: Some(agent_id),
..AuditFilter::default()
})
.await
.expect("query");
assert_eq!(rows.len(), 1, "expected exactly one row for fresh agent");
assert_eq!(rows[0], event, "round-trip event must match insert exactly");
}
#[tokio::test]
async fn query_filters_by_time_range() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let base = now_micros();
let t0 = base - chrono::Duration::minutes(20);
let t1 = base - chrono::Duration::minutes(10);
let t2 = base;
for ts in [t0, t1, t2] {
backend
.append_audit_event(&sample_event(agent_id, ts))
.await
.expect("append");
}
let recent = backend
.query_audit_events(AuditFilter {
agent_id: Some(agent_id),
from: Some(base - chrono::Duration::minutes(15)),
..AuditFilter::default()
})
.await
.expect("query");
assert_eq!(recent.len(), 2, "from-filter should drop the oldest event");
assert_eq!(recent[0].ts, t2);
assert_eq!(recent[1].ts, t1);
}
#[tokio::test]
async fn count_matches_query_length() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let base = now_micros();
for offset in 0..5 {
backend
.append_audit_event(&sample_event(agent_id, base - chrono::Duration::seconds(offset)))
.await
.expect("append");
}
let filter = AuditFilter {
agent_id: Some(agent_id),
..AuditFilter::default()
};
let rows = backend.query_audit_events(filter.clone()).await.expect("query");
let count = backend.count_audit_events(filter).await.expect("count");
assert_eq!(rows.len(), 5);
assert_eq!(count, 5);
assert_eq!(count as usize, rows.len(), "count must equal query length");
}
#[tokio::test]
async fn dry_run_only_filter_excludes_non_dry_events() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let base = now_micros();
let dry = AuditEvent {
dry_run: true,
..sample_event(agent_id, base)
};
let live = AuditEvent {
dry_run: false,
..sample_event(agent_id, base - chrono::Duration::seconds(1))
};
backend.append_audit_event(&dry).await.expect("append dry");
backend.append_audit_event(&live).await.expect("append live");
let dry_only = backend
.query_audit_events(AuditFilter {
agent_id: Some(agent_id),
dry_run_only: true,
..AuditFilter::default()
})
.await
.expect("query dry-only");
assert_eq!(dry_only.len(), 1, "expected only the dry-run event");
assert!(dry_only[0].dry_run, "returned event must be dry_run = true");
assert_eq!(dry_only[0].event_id, dry.event_id);
}
fn sample_agent_record(
agent_id: AgentId,
registered_at: chrono::DateTime<chrono::Utc>,
last_seen_at: chrono::DateTime<chrono::Utc>,
) -> AgentRecord {
let mut metadata = std::collections::BTreeMap::new();
metadata.insert("name".to_string(), "alpha-agent".to_string());
metadata.insert("env".to_string(), "test".to_string());
AgentRecord {
agent_id,
team_id: Some("team-rust".to_string()),
org_id: Some("acme".to_string()),
metadata,
registered_at,
last_seen_at,
enforcement_mode: "enforce".to_string(),
}
}
#[tokio::test]
async fn upsert_then_get_round_trip() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let ts = now_micros();
let record = sample_agent_record(agent_id, ts, ts);
backend.upsert_agent(record.clone()).await.expect("upsert");
let fetched = backend
.get_agent(&agent_id)
.await
.expect("get_agent")
.expect("agent should exist");
assert_eq!(fetched, record, "round-trip record must match insert exactly");
}
#[tokio::test]
async fn upsert_updates_last_seen_at() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let t1 = now_micros();
let t2 = t1 + chrono::Duration::seconds(60);
backend
.upsert_agent(sample_agent_record(agent_id, t1, t1))
.await
.expect("first upsert");
backend
.upsert_agent(sample_agent_record(agent_id, t1, t2))
.await
.expect("second upsert");
let fetched = backend
.get_agent(&agent_id)
.await
.expect("get_agent")
.expect("agent should exist");
assert_eq!(fetched.last_seen_at, t2, "second upsert must move last_seen_at forward");
assert_eq!(
fetched.registered_at, t1,
"registered_at must be preserved across re-registration"
);
}
#[tokio::test]
async fn list_filters_by_team() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let team = format!("team-{}", uuid::Uuid::new_v4());
let other_team = format!("team-{}", uuid::Uuid::new_v4());
let ts = now_micros();
let mut in_team_a = sample_agent_record(fresh_agent_id(), ts, ts);
in_team_a.team_id = Some(team.clone());
let mut in_team_a_2 = sample_agent_record(fresh_agent_id(), ts, ts);
in_team_a_2.team_id = Some(team.clone());
let mut in_other = sample_agent_record(fresh_agent_id(), ts, ts);
in_other.team_id = Some(other_team.clone());
backend.upsert_agent(in_team_a.clone()).await.expect("upsert a1");
backend.upsert_agent(in_team_a_2.clone()).await.expect("upsert a2");
backend.upsert_agent(in_other.clone()).await.expect("upsert other");
let listed = backend
.list_agents(AgentFilter {
team_id: Some(team.clone()),
..AgentFilter::default()
})
.await
.expect("list");
assert_eq!(listed.len(), 2, "filter should return both team-a agents");
assert!(
listed.iter().all(|r| r.team_id.as_deref() == Some(team.as_str())),
"every returned row must belong to {team}",
);
}
#[tokio::test]
async fn delete_unknown_returns_not_found() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let missing = fresh_agent_id();
let err = backend
.delete_agent(&missing)
.await
.expect_err("delete of unknown id must error");
match err {
StorageError::NotFound(payload) => {
assert_eq!(
payload,
agent_id_to_text(&missing),
"NotFound payload should carry the offending TEXT id",
);
}
other => panic!("expected NotFound, got {other:?}"),
}
}
fn fresh_policy_name() -> String {
format!("policy-{}", uuid::Uuid::new_v4())
}
fn json_policy(name: &str, version_marker: u32) -> PolicyDocument {
PolicyDocument {
name: name.to_owned(),
bytes: format!(r#"{{"marker":{version_marker},"rule":"allow"}}"#).into_bytes(),
}
}
#[tokio::test]
async fn save_policy_assigns_monotonic_versions() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let name = fresh_policy_name();
let v1 = backend.save_policy(json_policy(&name, 1)).await.expect("save 1");
let v2 = backend.save_policy(json_policy(&name, 2)).await.expect("save 2");
let v3 = backend.save_policy(json_policy(&name, 3)).await.expect("save 3");
assert_eq!(v1.meta.version, 1);
assert_eq!(v2.meta.version, 2);
assert_eq!(v3.meta.version, 3);
}
#[tokio::test]
async fn save_policy_does_not_activate_by_default() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let name = fresh_policy_name();
let saved = backend.save_policy(json_policy(&name, 1)).await.expect("save");
assert!(
!saved.meta.is_active,
"freshly saved policy must land with is_active = false"
);
let active = backend.get_active_policy(&name).await.expect("get_active");
assert!(
active.is_none(),
"no version should be active until rollback_policy is called"
);
}
#[tokio::test]
async fn rollback_then_get_active_returns_chosen_version() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let name = fresh_policy_name();
let v1 = json_policy(&name, 1);
let v2 = json_policy(&name, 2);
let v3 = json_policy(&name, 3);
backend.save_policy(v1).await.expect("save v1");
backend.save_policy(v2.clone()).await.expect("save v2");
backend.save_policy(v3).await.expect("save v3");
backend.rollback_policy(&name, 2).await.expect("rollback to v2");
let active = backend
.get_active_policy(&name)
.await
.expect("get_active")
.expect("a version must be active after rollback");
assert_eq!(active.bytes, v2.bytes, "active document must be the v2 we wrote");
let metas = backend.list_policy_versions(&name).await.expect("list_policy_versions");
let active_metas: Vec<&PolicyMeta> = metas.iter().filter(|m| m.is_active).collect();
assert_eq!(active_metas.len(), 1, "exactly one version must be active");
assert_eq!(active_metas[0].version, 2);
}
#[tokio::test]
async fn rollback_unknown_version_returns_not_found() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let name = fresh_policy_name();
backend.save_policy(json_policy(&name, 1)).await.expect("save v1");
let err = backend
.rollback_policy(&name, 999)
.await
.expect_err("rollback of missing version must error");
match err {
StorageError::NotFound(payload) => {
assert_eq!(
payload,
format!("{name}@999"),
"NotFound payload should carry <name>@<version>",
);
}
other => panic!("expected NotFound, got {other:?}"),
}
}
fn fresh_metric_name() -> String {
format!("metric-{}", uuid::Uuid::new_v4())
}
fn sample_metric(agent_id: AgentId, metric: &str, ts: chrono::DateTime<chrono::Utc>, value: f64) -> Metric {
let mut labels = std::collections::BTreeMap::new();
labels.insert("region".to_string(), "us-west".to_string());
Metric {
ts,
agent_id,
metric: metric.to_owned(),
value,
labels,
}
}
#[tokio::test]
async fn record_metric_then_query_round_trip() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let metric_name = fresh_metric_name();
let ts = now_micros();
backend
.record_metric(sample_metric(agent_id, &metric_name, ts, 42.5))
.await
.expect("record_metric");
let points = backend
.query_metrics(MetricQuery {
agent_id: Some(agent_id),
metric: Some(metric_name.clone()),
..MetricQuery::default()
})
.await
.expect("query_metrics");
assert_eq!(points.len(), 1, "expected the single sample we just inserted");
assert_eq!(points[0].ts, ts);
assert_eq!(points[0].value, 42.5);
}
#[tokio::test]
async fn query_metrics_with_bucket_aggregates() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let metric_name = fresh_metric_name();
let now = chrono::Utc::now();
let base = chrono::DateTime::from_timestamp(now.timestamp() / 60 * 60, 0)
.expect("minute-aligned timestamp fits in chrono range");
for (offset_secs, value) in [(0i64, 10.0_f64), (10, 20.0), (20, 30.0)] {
let ts = base + chrono::Duration::seconds(offset_secs);
backend
.record_metric(sample_metric(agent_id, &metric_name, ts, value))
.await
.expect("record");
}
let points = backend
.query_metrics(MetricQuery {
agent_id: Some(agent_id),
metric: Some(metric_name.clone()),
bucket: Some("1 minute".to_string()),
..MetricQuery::default()
})
.await
.expect("query_metrics");
assert_eq!(
points.len(),
1,
"three samples in the same minute must collapse to one bucket"
);
assert!(
(points[0].value - 20.0).abs() < 1e-9,
"averaged value should be (10 + 20 + 30) / 3 = 20.0, got {}",
points[0].value,
);
}
#[tokio::test]
async fn query_metrics_unsupported_bucket_unit_returns_query_failed() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let err = backend
.query_metrics(MetricQuery {
bucket: Some("5 microseconds".to_string()),
..MetricQuery::default()
})
.await
.expect_err("unsupported bucket must error");
match err {
StorageError::QueryFailed(msg) => {
assert!(
msg.contains("unsupported metric bucket"),
"error must explain the rejection, got: {msg}",
);
}
other => panic!("expected QueryFailed, got {other:?}"),
}
}
fn ancient_event(agent_id: AgentId, ts: chrono::DateTime<chrono::Utc>) -> AuditEvent {
AuditEvent {
ts,
..sample_event(agent_id, ts)
}
}
#[tokio::test]
async fn apply_retention_dry_run_does_not_delete() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let recent_past = chrono::Utc::now() - chrono::Duration::hours(1);
backend
.append_audit_event(&ancient_event(agent_id, recent_past))
.await
.expect("seed recent-past event");
let stats = backend
.apply_retention(&RetentionPolicy {
hot_days: 0,
warm_days: 0,
cold_action: ColdAction::Drop,
archive_url: None,
dry_run: true,
})
.await
.expect("apply_retention dry_run");
assert!(
stats.dropped_rows >= 1,
"dry_run should project at least our recent-past event as droppable, got {}",
stats.dropped_rows,
);
let remaining = backend
.query_audit_events(AuditFilter {
agent_id: Some(agent_id),
..AuditFilter::default()
})
.await
.expect("query");
assert_eq!(
remaining.len(),
1,
"dry_run must not delete any rows; recent-past event must still be present",
);
}
#[tokio::test]
async fn apply_retention_drop_removes_old_rows() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
let y2k = chrono::DateTime::from_timestamp(946_684_800, 0).expect("Y2K fits in chrono range");
backend
.append_audit_event(&ancient_event(agent_id, y2k))
.await
.expect("seed Y2K event");
let stats = backend
.apply_retention(&RetentionPolicy {
hot_days: 1825,
warm_days: 1825,
cold_action: ColdAction::Drop,
archive_url: None,
dry_run: false,
})
.await
.expect("apply_retention drop");
assert!(
stats.dropped_rows >= 1,
"Drop should report at least our Y2K event as dropped, got {}",
stats.dropped_rows,
);
let remaining = backend
.query_audit_events(AuditFilter {
agent_id: Some(agent_id),
..AuditFilter::default()
})
.await
.expect("query");
assert!(
remaining.is_empty(),
"Y2K event must be gone after Drop, got {} row(s)",
remaining.len(),
);
}
#[tokio::test]
async fn apply_retention_archive_returns_error_until_s_d() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let err = backend
.apply_retention(&RetentionPolicy {
hot_days: 30,
warm_days: 90,
cold_action: ColdAction::Archive,
archive_url: Some("s3://aasm-archive/test".to_string()),
dry_run: false,
})
.await
.expect_err("Archive must error until E18 S-D wires drop_chunks");
match err {
StorageError::RetentionError(msg) => {
assert!(
msg.contains("archive cold_action not supported"),
"RetentionError must explain the unsupported action; got: {msg}",
);
assert!(
msg.contains("s3://aasm-archive/test"),
"RetentionError should echo the configured archive_url; got: {msg}",
);
}
other => panic!("expected RetentionError, got {other:?}"),
}
}
#[tokio::test]
async fn healthcheck_returns_ok_with_row_counts() {
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
backend
.append_audit_event(&sample_event(agent_id, now_micros()))
.await
.expect("append");
let health = backend.healthcheck().await.expect("healthcheck");
assert_eq!(health.status, HealthStatus::Ok);
assert_eq!(health.backend, "postgres");
assert!(
health.row_counts.audit_events >= 1,
"audit_events count must include our fresh insert, got {}",
health.row_counts.audit_events,
);
}
#[tokio::test]
async fn healthcheck_reports_timescale_none_on_plain_postgres() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() == Ok("1") {
eprintln!(
"skipping plain-postgres healthcheck test: TIMESCALEDB_AVAILABLE=1 (see healthcheck_reports_timescale_stats_when_extension_active)"
);
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let health = backend.healthcheck().await.expect("healthcheck");
assert!(
health.timescale.is_none(),
"expected timescale = None on plain PostgreSQL; got {:?}",
health.timescale,
);
}
#[tokio::test]
async fn healthcheck_reports_timescale_stats_when_extension_active() {
if std::env::var("TIMESCALEDB_AVAILABLE").as_deref() != Ok("1") {
eprintln!("skipping extension-present healthcheck test: TIMESCALEDB_AVAILABLE != 1");
return;
}
let Some(backend) = pg_backend_or_skip().await else {
return;
};
backend.migrate().await.expect("migrate");
let agent_id = fresh_agent_id();
backend
.append_audit_event(&sample_event(agent_id, now_micros()))
.await
.expect("append");
let health = backend.healthcheck().await.expect("healthcheck");
let stats = health
.timescale
.expect("expected Some(TimescaleStats) when TimescaleDB extension is active");
assert!(
stats.total_chunks >= 1,
"expected at least one chunk after inserting an audit event, got {}",
stats.total_chunks,
);
}
}