use std::sync::Arc;
use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use crate::catalog::backend::TxOptions;
use crate::session::JammiSession;
use crate::tenant::TenantId;
use crate::trigger::topic::TopicDefinition;
use crate::trigger::TopicId;
use super::error::AuditError;
use super::record::PerQueryAudit;
use super::signature;
use super::table::{self, AUDIT_TOPIC};
pub const MAX_LINEAGE_BYTES_ENV: &str = "JAMMI_AUDIT_MAX_LINEAGE_BYTES";
pub const DEFAULT_MAX_LINEAGE_BYTES: usize = 8 * 1024;
pub fn max_lineage_bytes() -> usize {
std::env::var(MAX_LINEAGE_BYTES_ENV)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(DEFAULT_MAX_LINEAGE_BYTES)
}
pub async fn log_records(
session: &JammiSession,
records: Vec<PerQueryAudit>,
) -> Result<(), AuditError> {
if records.is_empty() {
return Ok(());
}
let tenant = session.tenant().ok_or(AuditError::NoTenantBinding)?;
let tenant_str = tenant.to_string();
let max = max_lineage_bytes();
for rec in &records {
let lineage_bytes = serde_json::to_vec(&rec.query_lineage)?.len();
if lineage_bytes > max {
return Err(AuditError::LineageTooLarge {
actual: lineage_bytes,
max,
});
}
}
let store = session.signing_key_store();
let mut signed = Vec::with_capacity(records.len());
for mut rec in records {
rec.tenant_id = Some(tenant_str.clone());
signature::sign_record(&mut rec, store.as_ref())?;
signed.push(rec);
}
table::ensure_table_exists(session).await?;
let batch = build_batch(&signed)?;
let id = table::audit_table_id()?;
let registry = session.mutable_tables_arc();
let backend = session.catalog().backend_arc();
let tenant_for_tx = tenant;
backend
.transaction(TxOptions::default(), move |tx| {
Box::pin(async move {
tx.set_tenant(Some(tenant_for_tx));
registry
.insert_batch(tx, &id, &batch)
.await
.map_err(|e| crate::catalog::backend::BackendError::Execution(e.to_string()))?;
Ok(())
})
})
.await
.map_err(|e| AuditError::Storage(e.to_string()))?;
publish_to_topic(session, &signed, tenant).await?;
Ok(())
}
fn topic_payload_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"record",
DataType::Utf8,
false,
)]))
}
fn build_batch(records: &[PerQueryAudit]) -> Result<RecordBatch, AuditError> {
let schema = table::audit_schema();
let query_id: Vec<String> = records.iter().map(|r| r.query_id.to_string()).collect();
let model_id: Vec<String> = records.iter().map(|r| r.model_id.clone()).collect();
let model_version: Vec<String> = records.iter().map(|r| r.model_version.clone()).collect();
let lineage: Vec<String> = records
.iter()
.map(|r| serde_json::to_string(&r.query_lineage))
.collect::<Result<_, _>>()?;
let top_k: Vec<String> = records
.iter()
.map(|r| serde_json::to_string(&r.top_k_result_ids))
.collect::<Result<_, _>>()?;
let scores: Vec<String> = records
.iter()
.map(|r| serde_json::to_string(&r.retrieval_scores))
.collect::<Result<_, _>>()?;
let executed_at: Vec<i64> = records.iter().map(|r| r.executed_at_micros()).collect();
let signature: Vec<String> = records.iter().map(|r| r.signature.clone()).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(query_id)),
Arc::new(StringArray::from(model_id)),
Arc::new(StringArray::from(model_version)),
Arc::new(StringArray::from(lineage)),
Arc::new(StringArray::from(top_k)),
Arc::new(StringArray::from(scores)),
Arc::new(Int64Array::from(executed_at)),
Arc::new(StringArray::from(signature)),
],
)
.map_err(|e| AuditError::Storage(format!("build audit batch: {e}")))
}
async fn publish_to_topic(
session: &JammiSession,
records: &[PerQueryAudit],
tenant: TenantId,
) -> Result<(), AuditError> {
let repo = session.topic_repo();
let topic = match repo
.lookup_by_name(AUDIT_TOPIC, Some(tenant))
.await
.map_err(|e| AuditError::Broker(e.to_string()))?
{
Some(existing) => existing,
None => {
let definition = TopicDefinition {
id: TopicId::new(),
name: AUDIT_TOPIC.to_string(),
schema: topic_payload_schema(),
tenant: Some(tenant),
broker_metadata: std::collections::BTreeMap::new(),
};
repo.register_topic(&definition)
.await
.map_err(|e| AuditError::Broker(e.to_string()))?;
definition
}
};
session
.trigger_broker()
.register_topic(&topic)
.await
.map_err(|e| AuditError::Broker(e.to_string()))?;
let payloads: Vec<String> = records
.iter()
.map(serde_json::to_string)
.collect::<Result<_, _>>()?;
let batch = RecordBatch::try_new(
Arc::clone(&topic.schema),
vec![Arc::new(StringArray::from(payloads))],
)
.map_err(|e| AuditError::Broker(format!("build topic batch: {e}")))?;
session
.publisher()
.publish_scoped(&topic, Some(tenant), batch)
.await
.map_err(|e| AuditError::Broker(e.to_string()))?;
Ok(())
}