use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::DateTime;
use rusqlite::{params, Connection, OpenFlags};
use thiserror::Error;
use tokio::sync::Mutex;
use gradatum_dto::QaEventDto;
#[derive(Debug, Error)]
pub enum EventLogError {
#[error("event_log SQLite : {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("event_log timestamp invalide : {0}")]
BadTimestamp(String),
#[error("event_log mutex poisonné")]
Poisoned,
}
#[derive(Clone)]
pub struct EventLogStore {
conn: Arc<Mutex<Connection>>,
}
impl EventLogStore {
pub async fn open(path: &Path) -> Result<Self, EventLogError> {
let path = path.to_path_buf();
let conn = tokio::task::spawn_blocking(move || {
let conn = Connection::open_with_flags(
&path,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "busy_timeout", 5000i32)?;
conn.pragma_update(None, "foreign_keys", true)?;
Ok::<Connection, rusqlite::Error>(conn)
})
.await
.map_err(|_| EventLogError::Poisoned)??;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
#[cfg(test)]
pub async fn open_in_memory() -> Result<Self, EventLogError> {
let conn = tokio::task::spawn_blocking(|| {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS event_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
tenant_id TEXT NOT NULL,
route TEXT NOT NULL,
model_alias TEXT NOT NULL,
model_used TEXT,
provider TEXT NOT NULL,
feature_id TEXT,
status_code INTEGER NOT NULL,
latency_ms INTEGER NOT NULL,
tokens_input INTEGER,
tokens_output INTEGER,
cost_usd REAL,
processed INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL,
agent_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_event_log_created ON event_log(created_at);
CREATE INDEX IF NOT EXISTS idx_event_log_tenant ON event_log(tenant_id);
CREATE INDEX IF NOT EXISTS idx_event_log_feature ON event_log(feature_id);
CREATE INDEX IF NOT EXISTS idx_event_log_processed ON event_log(processed);
CREATE INDEX IF NOT EXISTS idx_event_log_agent ON event_log(agent_id);",
)?;
Ok::<Connection, rusqlite::Error>(conn)
})
.await
.map_err(|_| EventLogError::Poisoned)??;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
pub async fn insert_batch(
&self,
tenant_id: &str,
events: &[QaEventDto],
) -> Result<usize, EventLogError> {
if events.is_empty() {
return Ok(0);
}
let now_ms = system_now_ms();
let tenant_id = tenant_id.to_owned();
let ts_vec: Vec<i64> = events
.iter()
.map(|e| parse_rfc3339_ms(&e.timestamp))
.collect::<Result<Vec<_>, _>>()?;
let events: Vec<QaEventDto> = events.to_vec();
let conn = Arc::clone(&self.conn);
let inserted = tokio::task::spawn_blocking(move || {
let conn = conn.blocking_lock();
let tx = conn.unchecked_transaction()?;
let mut count = 0usize;
for (dto, ts) in events.iter().zip(ts_vec.iter()) {
tx.execute(
"INSERT INTO event_log
(ts, tenant_id, route, model_alias, model_used, provider,
feature_id, status_code, latency_ms,
tokens_input, tokens_output, cost_usd,
processed, created_at, agent_id)
VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,0,?13,?14)",
params![
ts,
tenant_id,
dto.route,
dto.model_alias,
dto.model_used,
dto.provider,
dto.feature_id,
dto.status_code as i64,
dto.latency_ms as i64,
dto.tokens_input.map(|v| v as i64),
dto.tokens_output.map(|v| v as i64),
dto.cost_usd,
now_ms,
dto.agent_id,
],
)?;
count += 1;
}
tx.commit()?;
Ok::<usize, rusqlite::Error>(count)
})
.await
.map_err(|_| EventLogError::Poisoned)??;
Ok(inserted)
}
pub async fn purge(
&self,
retention_cutoff_ms: i64,
max_rows: u64,
) -> Result<u64, EventLogError> {
let conn = Arc::clone(&self.conn);
let deleted = tokio::task::spawn_blocking(move || {
let conn = conn.blocking_lock();
let mut total_deleted = 0u64;
let deleted_age = conn.execute(
"DELETE FROM event_log WHERE created_at < ?1",
params![retention_cutoff_ms],
)?;
total_deleted += deleted_age as u64;
let current_count: i64 =
conn.query_row("SELECT COUNT(*) FROM event_log", [], |r| r.get(0))?;
if current_count > max_rows as i64 {
let excess = current_count - max_rows as i64;
let deleted_cap = conn.execute(
"DELETE FROM event_log WHERE id IN (
SELECT id FROM event_log ORDER BY created_at ASC LIMIT ?1
)",
params![excess],
)?;
total_deleted += deleted_cap as u64;
}
Ok::<u64, rusqlite::Error>(total_deleted)
})
.await
.map_err(|_| EventLogError::Poisoned)??;
Ok(deleted)
}
pub async fn count(&self) -> Result<u64, EventLogError> {
let conn = Arc::clone(&self.conn);
let count = tokio::task::spawn_blocking(move || {
let conn = conn.blocking_lock();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM event_log", [], |r| r.get(0))?;
Ok::<u64, rusqlite::Error>(count as u64)
})
.await
.map_err(|_| EventLogError::Poisoned)??;
Ok(count)
}
}
fn system_now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("horloge système avant epoch UNIX — invariant système")
.as_millis() as i64
}
fn parse_rfc3339_ms(ts: &str) -> Result<i64, EventLogError> {
DateTime::parse_from_rfc3339(ts)
.map(|dt| dt.timestamp_millis())
.map_err(|_| {
let safe_ts: String = ts.chars().filter(|c| !c.is_control()).take(64).collect();
EventLogError::BadTimestamp(safe_ts)
})
}
#[cfg(test)]
mod tests {
use super::*;
fn make_dto(route: &str, has_tokens: bool) -> QaEventDto {
QaEventDto {
route: route.to_owned(),
model_alias: "alias-test".to_owned(),
provider: "test-provider".to_owned(),
status_code: 200,
latency_ms: 42,
timestamp: "2026-06-01T12:00:00Z".to_owned(),
feature_id: Some("feat-1".to_owned()),
model_used: None,
tokens_input: if has_tokens { Some(100) } else { None },
tokens_output: if has_tokens { Some(50) } else { None },
cost_usd: None,
agent_id: None,
}
}
#[tokio::test]
async fn insert_batch_append_two_batches() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch1 = vec![make_dto("/v1/chat", true), make_dto("/v1/embed", false)];
let n1 = store
.insert_batch("main", &batch1)
.await
.expect("insert batch1");
assert_eq!(n1, 2, "batch1 doit insérer 2 lignes");
let batch2 = vec![make_dto("/v1/chat", true)];
let n2 = store
.insert_batch("main", &batch2)
.await
.expect("insert batch2");
assert_eq!(n2, 1, "batch2 doit insérer 1 ligne");
let total = store.count().await.expect("count");
assert_eq!(total, 3, "total doit être 3 (2 + 1 lignes)");
}
#[tokio::test]
async fn insert_batch_empty_returns_zero() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let n = store.insert_batch("main", &[]).await.expect("insert empty");
assert_eq!(n, 0);
assert_eq!(store.count().await.expect("count"), 0);
}
#[tokio::test]
async fn insert_batch_accepts_none_tokens() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch = vec![make_dto("/v1/embed", false)];
let n = store
.insert_batch("main", &batch)
.await
.expect("insert with None tokens");
assert_eq!(n, 1);
assert_eq!(store.count().await.expect("count"), 1);
}
#[tokio::test]
async fn purge_by_age_removes_old_rows_and_keeps_recent() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch = vec![
make_dto("/v1/chat", false),
make_dto("/v1/chat", false),
make_dto("/v1/chat", false),
];
store.insert_batch("main", &batch).await.expect("insert");
assert_eq!(store.count().await.expect("count"), 3);
let cutoff_future = system_now_ms() + 60_000;
let deleted = store
.purge(cutoff_future, 1_000_000)
.await
.expect("purge age");
assert_eq!(
deleted, 3,
"doit supprimer les 3 lignes (toutes 'anciennes')"
);
assert_eq!(store.count().await.expect("count"), 0);
}
#[tokio::test]
async fn purge_by_age_keeps_recent_rows() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch = vec![make_dto("/v1/chat", false), make_dto("/v1/embed", false)];
store.insert_batch("main", &batch).await.expect("insert");
let deleted = store.purge(0, 1_000_000).await.expect("purge no-op");
assert_eq!(deleted, 0, "cutoff=0 ne doit rien supprimer");
assert_eq!(store.count().await.expect("count"), 2);
}
#[tokio::test]
async fn purge_cap_max_rows() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch: Vec<QaEventDto> = (0..5)
.map(|i| make_dto(&format!("/r/{i}"), false))
.collect();
store.insert_batch("main", &batch).await.expect("insert");
assert_eq!(store.count().await.expect("count"), 5);
let deleted = store.purge(0, 3).await.expect("purge cap");
assert_eq!(deleted, 2, "doit supprimer 2 lignes (5 - 3 = 2 excès)");
assert_eq!(
store.count().await.expect("count"),
3,
"il doit rester exactement 3 lignes"
);
}
#[tokio::test]
async fn purge_cap_combined_with_age() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let batch: Vec<QaEventDto> = (0..4)
.map(|i| make_dto(&format!("/r/{i}"), false))
.collect();
store.insert_batch("main", &batch).await.expect("insert");
let cutoff = system_now_ms() + 60_000;
let deleted = store.purge(cutoff, 2).await.expect("purge combined");
assert_eq!(deleted, 4, "doit supprimer les 4 lignes par âge");
assert_eq!(store.count().await.expect("count"), 0);
}
#[tokio::test]
async fn count_returns_zero_on_empty() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
assert_eq!(store.count().await.expect("count"), 0);
}
#[tokio::test]
async fn bad_timestamp_returns_error() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let bad = vec![QaEventDto {
timestamp: "NOT-A-DATE".to_owned(),
route: "/v1/chat".to_owned(),
model_alias: "a".to_owned(),
provider: "p".to_owned(),
status_code: 200,
latency_ms: 1,
feature_id: None,
model_used: None,
tokens_input: None,
tokens_output: None,
cost_usd: None,
agent_id: None,
}];
let result = store.insert_batch("main", &bad).await;
assert!(
matches!(result, Err(EventLogError::BadTimestamp(_))),
"timestamp invalide doit retourner BadTimestamp"
);
}
#[tokio::test]
async fn insert_batch_with_agent_id_present() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let mut dto = make_dto("/v1/chat", false);
dto.agent_id = Some("example-agent".to_owned());
let n = store
.insert_batch("main", &[dto])
.await
.expect("insert avec agent_id");
assert_eq!(n, 1, "doit insérer 1 ligne avec agent_id");
assert_eq!(store.count().await.expect("count"), 1);
}
#[tokio::test]
async fn insert_batch_with_agent_id_none() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let dto = make_dto("/v1/embed", false);
let n = store
.insert_batch("main", &[dto])
.await
.expect("insert avec agent_id None");
assert_eq!(n, 1, "doit insérer 1 ligne avec agent_id NULL");
assert_eq!(store.count().await.expect("count"), 1);
}
#[tokio::test]
async fn insert_batch_mixed_agent_id() {
let store = EventLogStore::open_in_memory()
.await
.expect("open in-memory");
let mut dto_with = make_dto("/v1/chat", true);
dto_with.agent_id = Some("some-agent".to_owned());
let dto_without = make_dto("/v1/embed", false);
let n = store
.insert_batch("main", &[dto_with, dto_without])
.await
.expect("insert batch mixte agent_id");
assert_eq!(n, 2, "doit insérer 2 lignes (agent_id présent + None)");
assert_eq!(store.count().await.expect("count"), 2);
}
}