Skip to main content

meerkat_mobkit/console_aggregator/
store.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4
5use rusqlite::{Connection, OptionalExtension, params};
6use sha2::{Digest, Sha256};
7
8use super::types::{
9    AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
10    ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleTimelinePage, ConsoleTimelineQuery,
11    NewConsoleFrame,
12};
13
14pub type ConsoleLogResult<T> = Result<T, ConsoleLogError>;
15
16pub type ConsoleLogError = Box<dyn std::error::Error + Send + Sync>;
17
18#[async_trait::async_trait]
19pub trait ConsoleLogStore: Send + Sync {
20    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome>;
21
22    async fn update_frame_status(
23        &self,
24        frame_id: &str,
25        status: ConsoleFrameStatus,
26    ) -> ConsoleLogResult<Option<ConsoleFrame>>;
27
28    async fn query_frames(
29        &self,
30        query: ConsoleTimelineQuery,
31    ) -> ConsoleLogResult<ConsoleTimelinePage>;
32
33    async fn frame_by_dedupe_key(&self, dedupe_key: &str)
34    -> ConsoleLogResult<Option<ConsoleFrame>>;
35
36    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>>;
37
38    async fn clear_frames(&self) -> ConsoleLogResult<()>;
39
40    async fn record_source_watermark(
41        &self,
42        runtime_key: &str,
43        source_kind: ConsoleFrameSourceKind,
44        source_cursor: &str,
45    ) -> ConsoleLogResult<()>;
46
47    async fn source_watermark(
48        &self,
49        runtime_key: &str,
50        source_kind: ConsoleFrameSourceKind,
51    ) -> ConsoleLogResult<Option<String>>;
52}
53
54#[derive(Default)]
55pub struct InMemoryConsoleLogStore {
56    state: Mutex<InMemoryState>,
57}
58
59#[derive(Default)]
60struct InMemoryState {
61    next_seq: u64,
62    frames: BTreeMap<u64, ConsoleFrame>,
63    dedupe_to_seq: HashMap<String, u64>,
64    id_to_seq: HashMap<String, u64>,
65    watermarks: HashMap<(String, String), String>,
66}
67
68impl InMemoryConsoleLogStore {
69    pub fn new() -> Self {
70        Self {
71            state: Mutex::new(InMemoryState {
72                next_seq: 1,
73                frames: BTreeMap::new(),
74                dedupe_to_seq: HashMap::new(),
75                id_to_seq: HashMap::new(),
76                watermarks: HashMap::new(),
77            }),
78        }
79    }
80}
81
82#[async_trait::async_trait]
83impl ConsoleLogStore for InMemoryConsoleLogStore {
84    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
85        let mut state = self
86            .state
87            .lock()
88            .map_err(|_| boxed_error("console log lock poisoned"))?;
89        if let Some(seq) = state.dedupe_to_seq.get(&frame.dedupe_key).copied()
90            && let Some(existing) = state.frames.get(&seq)
91        {
92            return Ok(AppendOutcome {
93                disposition: AppendDisposition::Existing,
94                frame: existing.clone(),
95            });
96        }
97
98        let seq = state.next_seq;
99        state.next_seq = state.next_seq.saturating_add(1);
100        let id = frame
101            .id
102            .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
103        let frame = ConsoleFrame {
104            id: id.clone(),
105            cursor: ConsoleCursor::from_seq(seq),
106            dedupe_key: frame.dedupe_key,
107            timestamp_ms: frame.timestamp_ms,
108            runtime_key: frame.runtime_key,
109            identity: frame.identity,
110            conversation_id: frame.conversation_id,
111            session_id: frame.session_id,
112            kind: frame.kind,
113            status: frame.status,
114            frame_version: 1,
115            updated_at_ms: None,
116            payload: frame.payload,
117            source: frame.source,
118            source_event_id: frame.source_event_id,
119            interaction_id: frame.interaction_id,
120            turn_id: frame.turn_id,
121            run_id: frame.run_id,
122            parent_frame_id: frame.parent_frame_id,
123            caused_by_frame_id: frame.caused_by_frame_id,
124        };
125        state.dedupe_to_seq.insert(frame.dedupe_key.clone(), seq);
126        state.id_to_seq.insert(id, seq);
127        state.frames.insert(seq, frame.clone());
128        Ok(AppendOutcome {
129            disposition: AppendDisposition::Inserted,
130            frame,
131        })
132    }
133
134    async fn update_frame_status(
135        &self,
136        frame_id: &str,
137        status: ConsoleFrameStatus,
138    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
139        let mut state = self
140            .state
141            .lock()
142            .map_err(|_| boxed_error("console log lock poisoned"))?;
143        let Some(seq) = state.id_to_seq.get(frame_id).copied() else {
144            return Ok(None);
145        };
146        let Some(frame) = state.frames.get_mut(&seq) else {
147            return Ok(None);
148        };
149        frame.status = status;
150        frame.frame_version = frame.frame_version.saturating_add(1);
151        frame.updated_at_ms = Some(current_time_ms());
152        Ok(Some(frame.clone()))
153    }
154
155    async fn query_frames(
156        &self,
157        query: ConsoleTimelineQuery,
158    ) -> ConsoleLogResult<ConsoleTimelinePage> {
159        let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
160        let limit = normalize_limit(query.limit);
161        let state = self
162            .state
163            .lock()
164            .map_err(|_| boxed_error("console log lock poisoned"))?;
165        let mut frames = Vec::new();
166        for (seq, frame) in &state.frames {
167            if after_seq.is_some_and(|after| *seq <= after) {
168                continue;
169            }
170            if let Some(identity) = query.identity.as_deref()
171                && frame.identity != identity
172            {
173                continue;
174            }
175            if let Some(conversation_id) = query.conversation_id.as_deref()
176                && frame.conversation_id.as_deref() != Some(conversation_id)
177            {
178                continue;
179            }
180            frames.push(frame.clone());
181            if frames.len() >= limit {
182                break;
183            }
184        }
185        let next_cursor = frames.last().map(|frame| frame.cursor.clone());
186        Ok(ConsoleTimelinePage {
187            frames,
188            next_cursor,
189        })
190    }
191
192    async fn frame_by_dedupe_key(
193        &self,
194        dedupe_key: &str,
195    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
196        let state = self
197            .state
198            .lock()
199            .map_err(|_| boxed_error("console log lock poisoned"))?;
200        let Some(seq) = state.dedupe_to_seq.get(dedupe_key).copied() else {
201            return Ok(None);
202        };
203        Ok(state.frames.get(&seq).cloned())
204    }
205
206    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
207        let state = self
208            .state
209            .lock()
210            .map_err(|_| boxed_error("console log lock poisoned"))?;
211        Ok(state
212            .frames
213            .keys()
214            .next_back()
215            .copied()
216            .map(ConsoleCursor::from_seq))
217    }
218
219    async fn clear_frames(&self) -> ConsoleLogResult<()> {
220        let mut state = self
221            .state
222            .lock()
223            .map_err(|_| boxed_error("console log lock poisoned"))?;
224        state.frames.clear();
225        state.dedupe_to_seq.clear();
226        state.id_to_seq.clear();
227        state.next_seq = 1;
228        Ok(())
229    }
230
231    async fn record_source_watermark(
232        &self,
233        runtime_key: &str,
234        source_kind: ConsoleFrameSourceKind,
235        source_cursor: &str,
236    ) -> ConsoleLogResult<()> {
237        let mut state = self
238            .state
239            .lock()
240            .map_err(|_| boxed_error("console log lock poisoned"))?;
241        state.watermarks.insert(
242            (runtime_key.to_string(), source_kind.as_str().to_string()),
243            source_cursor.to_string(),
244        );
245        Ok(())
246    }
247
248    async fn source_watermark(
249        &self,
250        runtime_key: &str,
251        source_kind: ConsoleFrameSourceKind,
252    ) -> ConsoleLogResult<Option<String>> {
253        let state = self
254            .state
255            .lock()
256            .map_err(|_| boxed_error("console log lock poisoned"))?;
257        Ok(state
258            .watermarks
259            .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
260            .cloned())
261    }
262}
263
264pub struct SqliteConsoleLogStore {
265    conn: Arc<Mutex<Connection>>,
266    watermarks: Arc<Mutex<HashMap<(String, String), String>>>,
267}
268
269impl SqliteConsoleLogStore {
270    pub fn open(path: impl AsRef<Path>) -> ConsoleLogResult<Self> {
271        let conn = Connection::open(path).map_err(into_boxed)?;
272        Self::from_connection(conn)
273    }
274
275    pub fn in_memory() -> ConsoleLogResult<Self> {
276        let conn = Connection::open_in_memory().map_err(into_boxed)?;
277        Self::from_connection(conn)
278    }
279
280    fn from_connection(conn: Connection) -> ConsoleLogResult<Self> {
281        initialize_schema(&conn)?;
282        let watermarks = load_source_watermarks(&conn)?;
283        Ok(Self {
284            conn: Arc::new(Mutex::new(conn)),
285            watermarks: Arc::new(Mutex::new(watermarks)),
286        })
287    }
288}
289
290#[async_trait::async_trait]
291impl ConsoleLogStore for SqliteConsoleLogStore {
292    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
293        let conn = self
294            .conn
295            .lock()
296            .map_err(|_| boxed_error("console log lock poisoned"))?;
297        if let Some(existing) = select_frame_by_dedupe(&conn, &frame.dedupe_key)? {
298            return Ok(AppendOutcome {
299                disposition: AppendDisposition::Existing,
300                frame: existing,
301            });
302        }
303
304        let id = frame
305            .id
306            .clone()
307            .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
308        let payload_json = serde_json::to_string(&frame.payload).map_err(into_boxed)?;
309        conn.execute(
310            "INSERT INTO console_frames (
311                id, dedupe_key, timestamp_ms, runtime_key, identity,
312                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
313                source_kind, source_cursor, source_event_id, interaction_id,
314                parent_frame_id, caused_by_frame_id, turn_id, run_id
315            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)",
316            params![
317                id,
318                frame.dedupe_key,
319                frame.timestamp_ms as i64,
320                frame.runtime_key,
321                frame.identity,
322                frame.conversation_id,
323                frame.session_id,
324                frame.kind,
325                frame.status.as_str(),
326                payload_json,
327                frame.source.kind.as_str(),
328                frame.source.source_cursor,
329                frame.source_event_id,
330                frame.interaction_id,
331                frame.parent_frame_id,
332                frame.caused_by_frame_id,
333                frame.turn_id,
334                frame.run_id,
335            ],
336        )
337        .map_err(into_boxed)?;
338        let inserted = select_frame_by_dedupe(&conn, &frame.dedupe_key)?
339            .ok_or_else(|| boxed_error("inserted console frame was not readable"))?;
340        Ok(AppendOutcome {
341            disposition: AppendDisposition::Inserted,
342            frame: inserted,
343        })
344    }
345
346    async fn update_frame_status(
347        &self,
348        frame_id: &str,
349        status: ConsoleFrameStatus,
350    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
351        let conn = self
352            .conn
353            .lock()
354            .map_err(|_| boxed_error("console log lock poisoned"))?;
355        conn.execute(
356            "UPDATE console_frames SET status = ?1, frame_version = frame_version + 1, updated_at_ms = ?2 WHERE id = ?3",
357            params![status.as_str(), current_time_ms() as i64, frame_id],
358        )
359        .map_err(into_boxed)?;
360        select_frame_by_id(&conn, frame_id)
361    }
362
363    async fn query_frames(
364        &self,
365        query: ConsoleTimelineQuery,
366    ) -> ConsoleLogResult<ConsoleTimelinePage> {
367        let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
368        let limit = normalize_limit(query.limit);
369        let conn = self
370            .conn
371            .lock()
372            .map_err(|_| boxed_error("console log lock poisoned"))?;
373        let mut sql = String::from(
374            "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
375                    conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
376                    source_kind, source_cursor, source_event_id, interaction_id,
377                    parent_frame_id, caused_by_frame_id, turn_id, run_id
378             FROM console_frames WHERE cursor_seq > ?1",
379        );
380        if query.identity.is_some() {
381            sql.push_str(" AND identity = ?2");
382        }
383        if query.conversation_id.is_some() {
384            sql.push_str(if query.identity.is_some() {
385                " AND conversation_id = ?3"
386            } else {
387                " AND conversation_id = ?2"
388            });
389        }
390        sql.push_str(" ORDER BY cursor_seq ASC LIMIT ?");
391        let limit_param_index = 2
392            + usize::from(query.identity.is_some())
393            + usize::from(query.conversation_id.is_some());
394        sql.push_str(&limit_param_index.to_string());
395
396        let after = after_seq.unwrap_or(0) as i64;
397        let frames = match (query.identity.as_deref(), query.conversation_id.as_deref()) {
398            (Some(identity), Some(conversation_id)) => query_sql_frames(
399                &conn,
400                &sql,
401                params![after, identity, conversation_id, limit as i64],
402            )?,
403            (Some(identity), None) => {
404                query_sql_frames(&conn, &sql, params![after, identity, limit as i64])?
405            }
406            (None, Some(conversation_id)) => {
407                query_sql_frames(&conn, &sql, params![after, conversation_id, limit as i64])?
408            }
409            (None, None) => query_sql_frames(&conn, &sql, params![after, limit as i64])?,
410        };
411        let next_cursor = frames.last().map(|frame| frame.cursor.clone());
412        Ok(ConsoleTimelinePage {
413            frames,
414            next_cursor,
415        })
416    }
417
418    async fn frame_by_dedupe_key(
419        &self,
420        dedupe_key: &str,
421    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
422        let conn = self
423            .conn
424            .lock()
425            .map_err(|_| boxed_error("console log lock poisoned"))?;
426        select_frame_by_dedupe(&conn, dedupe_key)
427    }
428
429    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
430        let conn = self
431            .conn
432            .lock()
433            .map_err(|_| boxed_error("console log lock poisoned"))?;
434        let seq: Option<i64> = conn
435            .query_row(
436                "SELECT cursor_seq FROM console_frames ORDER BY cursor_seq DESC LIMIT 1",
437                [],
438                |row| row.get(0),
439            )
440            .optional()
441            .map_err(into_boxed)?;
442        Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
443    }
444
445    async fn clear_frames(&self) -> ConsoleLogResult<()> {
446        let conn = self
447            .conn
448            .lock()
449            .map_err(|_| boxed_error("console log lock poisoned"))?;
450        conn.execute("DELETE FROM console_frames", [])
451            .map_err(into_boxed)?;
452        conn.execute(
453            "DELETE FROM sqlite_sequence WHERE name = 'console_frames'",
454            [],
455        )
456        .ok();
457        Ok(())
458    }
459
460    async fn record_source_watermark(
461        &self,
462        runtime_key: &str,
463        source_kind: ConsoleFrameSourceKind,
464        source_cursor: &str,
465    ) -> ConsoleLogResult<()> {
466        let conn = self
467            .conn
468            .lock()
469            .map_err(|_| boxed_error("console log lock poisoned"))?;
470        conn.execute(
471            "INSERT INTO console_source_watermarks (
472                runtime_key, source_kind, source_cursor, last_ingested_at_ms
473            ) VALUES (?1, ?2, ?3, ?4)
474            ON CONFLICT(runtime_key, source_kind) DO UPDATE SET
475                source_cursor = excluded.source_cursor,
476                last_ingested_at_ms = excluded.last_ingested_at_ms",
477            params![
478                runtime_key,
479                source_kind.as_str(),
480                source_cursor,
481                current_time_ms() as i64,
482            ],
483        )
484        .map_err(into_boxed)?;
485        self.watermarks
486            .lock()
487            .map_err(|_| boxed_error("console watermark lock poisoned"))?
488            .insert(
489                (runtime_key.to_string(), source_kind.as_str().to_string()),
490                source_cursor.to_string(),
491            );
492        Ok(())
493    }
494
495    async fn source_watermark(
496        &self,
497        runtime_key: &str,
498        source_kind: ConsoleFrameSourceKind,
499    ) -> ConsoleLogResult<Option<String>> {
500        let watermarks = self
501            .watermarks
502            .lock()
503            .map_err(|_| boxed_error("console watermark lock poisoned"))?;
504        Ok(watermarks
505            .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
506            .cloned())
507    }
508}
509
510fn load_source_watermarks(
511    conn: &Connection,
512) -> ConsoleLogResult<HashMap<(String, String), String>> {
513    let mut stmt = conn
514        .prepare(
515            "SELECT runtime_key, source_kind, source_cursor
516             FROM console_source_watermarks",
517        )
518        .map_err(into_boxed)?;
519    let rows = stmt
520        .query_map([], |row| {
521            Ok((
522                (row.get::<_, String>(0)?, row.get::<_, String>(1)?),
523                row.get::<_, String>(2)?,
524            ))
525        })
526        .map_err(into_boxed)?;
527    let mut watermarks = HashMap::new();
528    for row in rows {
529        let (key, cursor) = row.map_err(into_boxed)?;
530        watermarks.insert(key, cursor);
531    }
532    Ok(watermarks)
533}
534
535fn initialize_schema(conn: &Connection) -> ConsoleLogResult<()> {
536    conn.execute_batch(
537        "CREATE TABLE IF NOT EXISTS console_frames (
538            cursor_seq INTEGER PRIMARY KEY AUTOINCREMENT,
539            id TEXT NOT NULL UNIQUE,
540            dedupe_key TEXT NOT NULL UNIQUE,
541            timestamp_ms INTEGER NOT NULL,
542            runtime_key TEXT NOT NULL,
543            identity TEXT NOT NULL,
544            conversation_id TEXT,
545            session_id TEXT,
546            kind TEXT NOT NULL,
547            status TEXT NOT NULL,
548            frame_version INTEGER NOT NULL DEFAULT 1,
549            updated_at_ms INTEGER,
550            payload_json TEXT NOT NULL,
551            source_kind TEXT NOT NULL,
552            source_cursor TEXT,
553            source_event_id TEXT,
554            interaction_id TEXT,
555            parent_frame_id TEXT,
556            caused_by_frame_id TEXT,
557            turn_id TEXT,
558            run_id TEXT
559        );
560        CREATE TABLE IF NOT EXISTS console_source_watermarks (
561            runtime_key TEXT NOT NULL,
562            source_kind TEXT NOT NULL,
563            source_cursor TEXT NOT NULL,
564            last_ingested_at_ms INTEGER NOT NULL,
565            PRIMARY KEY(runtime_key, source_kind)
566        );
567        CREATE INDEX IF NOT EXISTS idx_console_frames_identity_cursor
568            ON console_frames(identity, cursor_seq);
569        CREATE INDEX IF NOT EXISTS idx_console_frames_conversation_cursor
570            ON console_frames(conversation_id, cursor_seq);",
571    )
572    .map_err(into_boxed)
573}
574
575fn query_sql_frames<P: rusqlite::Params>(
576    conn: &Connection,
577    sql: &str,
578    params: P,
579) -> ConsoleLogResult<Vec<ConsoleFrame>> {
580    let mut stmt = conn.prepare(sql).map_err(into_boxed)?;
581    let rows = stmt.query_map(params, row_to_frame).map_err(into_boxed)?;
582    let mut frames = Vec::new();
583    for row in rows {
584        frames.push(row.map_err(into_boxed)?);
585    }
586    Ok(frames)
587}
588
589fn select_frame_by_dedupe(
590    conn: &Connection,
591    dedupe_key: &str,
592) -> ConsoleLogResult<Option<ConsoleFrame>> {
593    conn.query_row(
594        "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
595                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
596                source_kind, source_cursor, source_event_id, interaction_id,
597                parent_frame_id, caused_by_frame_id, turn_id, run_id
598         FROM console_frames WHERE dedupe_key = ?1",
599        params![dedupe_key],
600        row_to_frame,
601    )
602    .optional()
603    .map_err(into_boxed)
604}
605
606fn select_frame_by_id(conn: &Connection, id: &str) -> ConsoleLogResult<Option<ConsoleFrame>> {
607    conn.query_row(
608        "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
609                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
610                source_kind, source_cursor, source_event_id, interaction_id,
611                parent_frame_id, caused_by_frame_id, turn_id, run_id
612         FROM console_frames WHERE id = ?1",
613        params![id],
614        row_to_frame,
615    )
616    .optional()
617    .map_err(into_boxed)
618}
619
620fn row_to_frame(row: &rusqlite::Row<'_>) -> rusqlite::Result<ConsoleFrame> {
621    let seq: i64 = row.get(0)?;
622    let payload_json: String = row.get(12)?;
623    let payload = serde_json::from_str(&payload_json).unwrap_or(serde_json::Value::Null);
624    let source_kind: String = row.get(13)?;
625    Ok(ConsoleFrame {
626        cursor: ConsoleCursor::from_seq(seq as u64),
627        id: row.get(1)?,
628        dedupe_key: row.get(2)?,
629        timestamp_ms: row.get::<_, i64>(3)? as u64,
630        runtime_key: row.get(4)?,
631        identity: row.get(5)?,
632        conversation_id: row.get(6)?,
633        session_id: row.get(7)?,
634        kind: row.get(8)?,
635        status: ConsoleFrameStatus::from_str(row.get::<_, String>(9)?.as_str()),
636        frame_version: row.get::<_, i64>(10)? as u64,
637        updated_at_ms: row.get::<_, Option<i64>>(11)?.map(|value| value as u64),
638        payload,
639        source: ConsoleFrameSource {
640            kind: ConsoleFrameSourceKind::from_str(&source_kind),
641            source_cursor: row.get(14)?,
642        },
643        source_event_id: row.get(15)?,
644        interaction_id: row.get(16)?,
645        parent_frame_id: row.get(17)?,
646        caused_by_frame_id: row.get(18)?,
647        turn_id: row.get(19)?,
648        run_id: row.get(20)?,
649    })
650}
651
652fn normalize_limit(limit: usize) -> usize {
653    limit.clamp(1, 1000)
654}
655
656fn cursor_seq(cursor: &ConsoleCursor) -> ConsoleLogResult<u64> {
657    cursor
658        .seq()
659        .ok_or_else(|| boxed_error(format!("invalid console cursor: {cursor}")))
660}
661
662pub(crate) fn stable_frame_id(dedupe_key: &str) -> String {
663    let mut hasher = Sha256::new();
664    hasher.update(dedupe_key.as_bytes());
665    format!("console-frame-{}", to_hex(&hasher.finalize()))
666}
667
668fn to_hex(bytes: &[u8]) -> String {
669    const HEX: &[u8; 16] = b"0123456789abcdef";
670    let mut out = String::with_capacity(bytes.len() * 2);
671    for byte in bytes {
672        out.push(HEX[(byte >> 4) as usize] as char);
673        out.push(HEX[(byte & 0x0f) as usize] as char);
674    }
675    out
676}
677
678fn boxed_error(message: impl Into<String>) -> ConsoleLogError {
679    Box::new(std::io::Error::other(message.into()))
680}
681
682fn into_boxed<E>(error: E) -> ConsoleLogError
683where
684    E: std::error::Error + Send + Sync + 'static,
685{
686    Box::new(error)
687}
688
689fn current_time_ms() -> u64 {
690    match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
691        Ok(duration) => duration.as_millis() as u64,
692        Err(_) => 0,
693    }
694}
695
696#[cfg(test)]
697#[allow(clippy::expect_used)]
698mod tests {
699    use serde_json::json;
700
701    use super::*;
702
703    fn sample_frame(dedupe_key: &str, identity: &str) -> NewConsoleFrame {
704        NewConsoleFrame {
705            id: None,
706            dedupe_key: dedupe_key.to_string(),
707            timestamp_ms: 10,
708            runtime_key: "runtime-a".to_string(),
709            identity: identity.to_string(),
710            conversation_id: Some(identity.to_string()),
711            session_id: Some("session-1".to_string()),
712            kind: "text_delta".to_string(),
713            status: ConsoleFrameStatus::Delivered,
714            payload: json!({ "delta": "hello" }),
715            source: ConsoleFrameSource {
716                kind: ConsoleFrameSourceKind::ConsoleEvent,
717                source_cursor: None,
718            },
719            source_event_id: Some(dedupe_key.to_string()),
720            interaction_id: None,
721            turn_id: None,
722            run_id: None,
723            parent_frame_id: None,
724            caused_by_frame_id: None,
725        }
726    }
727
728    #[tokio::test]
729    async fn in_memory_log_assigns_monotonic_cursors_and_dedupes() {
730        let store = InMemoryConsoleLogStore::new();
731        let first = store
732            .append_if_absent(sample_frame("event-1", "agent-a"))
733            .await
734            .expect("append first");
735        let duplicate = store
736            .append_if_absent(sample_frame("event-1", "agent-a"))
737            .await
738            .expect("append duplicate");
739        let second = store
740            .append_if_absent(sample_frame("event-2", "agent-a"))
741            .await
742            .expect("append second");
743
744        assert_eq!(first.disposition, AppendDisposition::Inserted);
745        assert_eq!(duplicate.disposition, AppendDisposition::Existing);
746        assert_eq!(first.frame.cursor.seq(), Some(1));
747        assert_eq!(second.frame.cursor.seq(), Some(2));
748    }
749
750    #[tokio::test]
751    async fn sqlite_log_queries_by_identity_and_cursor() {
752        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
753        let first = store
754            .append_if_absent(sample_frame("event-1", "agent-a"))
755            .await
756            .expect("append first");
757        store
758            .append_if_absent(sample_frame("event-2", "agent-b"))
759            .await
760            .expect("append second");
761        store
762            .append_if_absent(sample_frame("event-3", "agent-a"))
763            .await
764            .expect("append third");
765
766        let page = store
767            .query_frames(ConsoleTimelineQuery {
768                identity: Some("agent-a".to_string()),
769                after: Some(first.frame.cursor),
770                limit: 10,
771                ..ConsoleTimelineQuery::default()
772            })
773            .await
774            .expect("query");
775        assert_eq!(page.frames.len(), 1);
776        assert_eq!(page.frames[0].dedupe_key, "event-3");
777    }
778
779    #[tokio::test]
780    async fn sqlite_log_updates_status() {
781        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
782        let first = store
783            .append_if_absent(sample_frame("event-1", "agent-a"))
784            .await
785            .expect("append first");
786        let updated = store
787            .update_frame_status(&first.frame.id, ConsoleFrameStatus::DeliveryFailed)
788            .await
789            .expect("update")
790            .expect("updated frame");
791        assert_eq!(updated.status, ConsoleFrameStatus::DeliveryFailed);
792        assert_eq!(updated.frame_version, 2);
793        assert!(updated.updated_at_ms.is_some());
794    }
795
796    #[tokio::test]
797    async fn sqlite_log_records_source_watermarks() {
798        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
799        store
800            .record_source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent, "evt-99")
801            .await
802            .expect("record watermark");
803        let watermark = store
804            .source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent)
805            .await
806            .expect("read watermark");
807        assert_eq!(watermark.as_deref(), Some("evt-99"));
808    }
809
810    #[tokio::test]
811    async fn sqlite_log_persists_frames_across_handles() {
812        let temp_dir = tempfile::tempdir().expect("temp dir");
813        let path = temp_dir.path().join("console.sqlite");
814        let store = SqliteConsoleLogStore::open(&path).expect("open first handle");
815        store
816            .append_if_absent(sample_frame("event-1", "agent-a"))
817            .await
818            .expect("append frame");
819        drop(store);
820
821        let reopened = SqliteConsoleLogStore::open(&path).expect("open second handle");
822        let page = reopened
823            .query_frames(ConsoleTimelineQuery {
824                identity: Some("agent-a".to_string()),
825                limit: 10,
826                ..ConsoleTimelineQuery::default()
827            })
828            .await
829            .expect("query frames");
830        assert_eq!(page.frames.len(), 1);
831        assert_eq!(page.frames[0].dedupe_key, "event-1");
832    }
833}