1use std::collections::{BTreeMap, HashMap};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4
5use rusqlite::{Connection, OptionalExtension, params};
6use sha2::{Digest, Sha256};
7
8use super::types::{
9 AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
10 ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleTimelinePage, ConsoleTimelineQuery,
11 NewConsoleFrame,
12};
13
14pub type ConsoleLogResult<T> = Result<T, ConsoleLogError>;
15
16pub type ConsoleLogError = Box<dyn std::error::Error + Send + Sync>;
17
18#[async_trait::async_trait]
19pub trait ConsoleLogStore: Send + Sync {
20 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome>;
21
22 async fn update_frame_status(
23 &self,
24 frame_id: &str,
25 status: ConsoleFrameStatus,
26 ) -> ConsoleLogResult<Option<ConsoleFrame>>;
27
28 async fn query_frames(
29 &self,
30 query: ConsoleTimelineQuery,
31 ) -> ConsoleLogResult<ConsoleTimelinePage>;
32
33 async fn frame_by_dedupe_key(&self, dedupe_key: &str)
34 -> ConsoleLogResult<Option<ConsoleFrame>>;
35
36 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>>;
37
38 async fn clear_frames(&self) -> ConsoleLogResult<()>;
39
40 async fn record_source_watermark(
41 &self,
42 runtime_key: &str,
43 source_kind: ConsoleFrameSourceKind,
44 source_cursor: &str,
45 ) -> ConsoleLogResult<()>;
46
47 async fn source_watermark(
48 &self,
49 runtime_key: &str,
50 source_kind: ConsoleFrameSourceKind,
51 ) -> ConsoleLogResult<Option<String>>;
52}
53
54#[derive(Default)]
55pub struct InMemoryConsoleLogStore {
56 state: Mutex<InMemoryState>,
57}
58
59#[derive(Default)]
60struct InMemoryState {
61 next_seq: u64,
62 frames: BTreeMap<u64, ConsoleFrame>,
63 dedupe_to_seq: HashMap<String, u64>,
64 id_to_seq: HashMap<String, u64>,
65 watermarks: HashMap<(String, String), String>,
66}
67
68impl InMemoryConsoleLogStore {
69 pub fn new() -> Self {
70 Self {
71 state: Mutex::new(InMemoryState {
72 next_seq: 1,
73 frames: BTreeMap::new(),
74 dedupe_to_seq: HashMap::new(),
75 id_to_seq: HashMap::new(),
76 watermarks: HashMap::new(),
77 }),
78 }
79 }
80}
81
82#[async_trait::async_trait]
83impl ConsoleLogStore for InMemoryConsoleLogStore {
84 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
85 let mut state = self
86 .state
87 .lock()
88 .map_err(|_| boxed_error("console log lock poisoned"))?;
89 if let Some(seq) = state.dedupe_to_seq.get(&frame.dedupe_key).copied()
90 && let Some(existing) = state.frames.get(&seq)
91 {
92 return Ok(AppendOutcome {
93 disposition: AppendDisposition::Existing,
94 frame: existing.clone(),
95 });
96 }
97
98 let seq = state.next_seq;
99 state.next_seq = state.next_seq.saturating_add(1);
100 let id = frame
101 .id
102 .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
103 let frame = ConsoleFrame {
104 id: id.clone(),
105 cursor: ConsoleCursor::from_seq(seq),
106 dedupe_key: frame.dedupe_key,
107 timestamp_ms: frame.timestamp_ms,
108 runtime_key: frame.runtime_key,
109 identity: frame.identity,
110 conversation_id: frame.conversation_id,
111 session_id: frame.session_id,
112 kind: frame.kind,
113 status: frame.status,
114 frame_version: 1,
115 updated_at_ms: None,
116 payload: frame.payload,
117 source: frame.source,
118 source_event_id: frame.source_event_id,
119 interaction_id: frame.interaction_id,
120 turn_id: frame.turn_id,
121 run_id: frame.run_id,
122 parent_frame_id: frame.parent_frame_id,
123 caused_by_frame_id: frame.caused_by_frame_id,
124 };
125 state.dedupe_to_seq.insert(frame.dedupe_key.clone(), seq);
126 state.id_to_seq.insert(id, seq);
127 state.frames.insert(seq, frame.clone());
128 Ok(AppendOutcome {
129 disposition: AppendDisposition::Inserted,
130 frame,
131 })
132 }
133
134 async fn update_frame_status(
135 &self,
136 frame_id: &str,
137 status: ConsoleFrameStatus,
138 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
139 let mut state = self
140 .state
141 .lock()
142 .map_err(|_| boxed_error("console log lock poisoned"))?;
143 let Some(seq) = state.id_to_seq.get(frame_id).copied() else {
144 return Ok(None);
145 };
146 let Some(frame) = state.frames.get_mut(&seq) else {
147 return Ok(None);
148 };
149 frame.status = status;
150 frame.frame_version = frame.frame_version.saturating_add(1);
151 frame.updated_at_ms = Some(current_time_ms());
152 Ok(Some(frame.clone()))
153 }
154
155 async fn query_frames(
156 &self,
157 query: ConsoleTimelineQuery,
158 ) -> ConsoleLogResult<ConsoleTimelinePage> {
159 let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
160 let limit = normalize_limit(query.limit);
161 let state = self
162 .state
163 .lock()
164 .map_err(|_| boxed_error("console log lock poisoned"))?;
165 let mut frames = Vec::new();
166 for (seq, frame) in &state.frames {
167 if after_seq.is_some_and(|after| *seq <= after) {
168 continue;
169 }
170 if let Some(identity) = query.identity.as_deref()
171 && frame.identity != identity
172 {
173 continue;
174 }
175 if let Some(conversation_id) = query.conversation_id.as_deref()
176 && frame.conversation_id.as_deref() != Some(conversation_id)
177 {
178 continue;
179 }
180 frames.push(frame.clone());
181 if frames.len() >= limit {
182 break;
183 }
184 }
185 let next_cursor = frames.last().map(|frame| frame.cursor.clone());
186 Ok(ConsoleTimelinePage {
187 frames,
188 next_cursor,
189 })
190 }
191
192 async fn frame_by_dedupe_key(
193 &self,
194 dedupe_key: &str,
195 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
196 let state = self
197 .state
198 .lock()
199 .map_err(|_| boxed_error("console log lock poisoned"))?;
200 let Some(seq) = state.dedupe_to_seq.get(dedupe_key).copied() else {
201 return Ok(None);
202 };
203 Ok(state.frames.get(&seq).cloned())
204 }
205
206 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
207 let state = self
208 .state
209 .lock()
210 .map_err(|_| boxed_error("console log lock poisoned"))?;
211 Ok(state
212 .frames
213 .keys()
214 .next_back()
215 .copied()
216 .map(ConsoleCursor::from_seq))
217 }
218
219 async fn clear_frames(&self) -> ConsoleLogResult<()> {
220 let mut state = self
221 .state
222 .lock()
223 .map_err(|_| boxed_error("console log lock poisoned"))?;
224 state.frames.clear();
225 state.dedupe_to_seq.clear();
226 state.id_to_seq.clear();
227 state.next_seq = 1;
228 Ok(())
229 }
230
231 async fn record_source_watermark(
232 &self,
233 runtime_key: &str,
234 source_kind: ConsoleFrameSourceKind,
235 source_cursor: &str,
236 ) -> ConsoleLogResult<()> {
237 let mut state = self
238 .state
239 .lock()
240 .map_err(|_| boxed_error("console log lock poisoned"))?;
241 state.watermarks.insert(
242 (runtime_key.to_string(), source_kind.as_str().to_string()),
243 source_cursor.to_string(),
244 );
245 Ok(())
246 }
247
248 async fn source_watermark(
249 &self,
250 runtime_key: &str,
251 source_kind: ConsoleFrameSourceKind,
252 ) -> ConsoleLogResult<Option<String>> {
253 let state = self
254 .state
255 .lock()
256 .map_err(|_| boxed_error("console log lock poisoned"))?;
257 Ok(state
258 .watermarks
259 .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
260 .cloned())
261 }
262}
263
264pub struct SqliteConsoleLogStore {
265 conn: Arc<Mutex<Connection>>,
266 watermarks: Arc<Mutex<HashMap<(String, String), String>>>,
267}
268
269impl SqliteConsoleLogStore {
270 pub fn open(path: impl AsRef<Path>) -> ConsoleLogResult<Self> {
271 let conn = Connection::open(path).map_err(into_boxed)?;
272 Self::from_connection(conn)
273 }
274
275 pub fn in_memory() -> ConsoleLogResult<Self> {
276 let conn = Connection::open_in_memory().map_err(into_boxed)?;
277 Self::from_connection(conn)
278 }
279
280 fn from_connection(conn: Connection) -> ConsoleLogResult<Self> {
281 initialize_schema(&conn)?;
282 let watermarks = load_source_watermarks(&conn)?;
283 Ok(Self {
284 conn: Arc::new(Mutex::new(conn)),
285 watermarks: Arc::new(Mutex::new(watermarks)),
286 })
287 }
288}
289
290#[async_trait::async_trait]
291impl ConsoleLogStore for SqliteConsoleLogStore {
292 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
293 let conn = self
294 .conn
295 .lock()
296 .map_err(|_| boxed_error("console log lock poisoned"))?;
297 if let Some(existing) = select_frame_by_dedupe(&conn, &frame.dedupe_key)? {
298 return Ok(AppendOutcome {
299 disposition: AppendDisposition::Existing,
300 frame: existing,
301 });
302 }
303
304 let id = frame
305 .id
306 .clone()
307 .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
308 let payload_json = serde_json::to_string(&frame.payload).map_err(into_boxed)?;
309 conn.execute(
310 "INSERT INTO console_frames (
311 id, dedupe_key, timestamp_ms, runtime_key, identity,
312 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
313 source_kind, source_cursor, source_event_id, interaction_id,
314 parent_frame_id, caused_by_frame_id, turn_id, run_id
315 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)",
316 params![
317 id,
318 frame.dedupe_key,
319 frame.timestamp_ms as i64,
320 frame.runtime_key,
321 frame.identity,
322 frame.conversation_id,
323 frame.session_id,
324 frame.kind,
325 frame.status.as_str(),
326 payload_json,
327 frame.source.kind.as_str(),
328 frame.source.source_cursor,
329 frame.source_event_id,
330 frame.interaction_id,
331 frame.parent_frame_id,
332 frame.caused_by_frame_id,
333 frame.turn_id,
334 frame.run_id,
335 ],
336 )
337 .map_err(into_boxed)?;
338 let inserted = select_frame_by_dedupe(&conn, &frame.dedupe_key)?
339 .ok_or_else(|| boxed_error("inserted console frame was not readable"))?;
340 Ok(AppendOutcome {
341 disposition: AppendDisposition::Inserted,
342 frame: inserted,
343 })
344 }
345
346 async fn update_frame_status(
347 &self,
348 frame_id: &str,
349 status: ConsoleFrameStatus,
350 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
351 let conn = self
352 .conn
353 .lock()
354 .map_err(|_| boxed_error("console log lock poisoned"))?;
355 conn.execute(
356 "UPDATE console_frames SET status = ?1, frame_version = frame_version + 1, updated_at_ms = ?2 WHERE id = ?3",
357 params![status.as_str(), current_time_ms() as i64, frame_id],
358 )
359 .map_err(into_boxed)?;
360 select_frame_by_id(&conn, frame_id)
361 }
362
363 async fn query_frames(
364 &self,
365 query: ConsoleTimelineQuery,
366 ) -> ConsoleLogResult<ConsoleTimelinePage> {
367 let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
368 let limit = normalize_limit(query.limit);
369 let conn = self
370 .conn
371 .lock()
372 .map_err(|_| boxed_error("console log lock poisoned"))?;
373 let mut sql = String::from(
374 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
375 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
376 source_kind, source_cursor, source_event_id, interaction_id,
377 parent_frame_id, caused_by_frame_id, turn_id, run_id
378 FROM console_frames WHERE cursor_seq > ?1",
379 );
380 if query.identity.is_some() {
381 sql.push_str(" AND identity = ?2");
382 }
383 if query.conversation_id.is_some() {
384 sql.push_str(if query.identity.is_some() {
385 " AND conversation_id = ?3"
386 } else {
387 " AND conversation_id = ?2"
388 });
389 }
390 sql.push_str(" ORDER BY cursor_seq ASC LIMIT ?");
391 let limit_param_index = 2
392 + usize::from(query.identity.is_some())
393 + usize::from(query.conversation_id.is_some());
394 sql.push_str(&limit_param_index.to_string());
395
396 let after = after_seq.unwrap_or(0) as i64;
397 let frames = match (query.identity.as_deref(), query.conversation_id.as_deref()) {
398 (Some(identity), Some(conversation_id)) => query_sql_frames(
399 &conn,
400 &sql,
401 params![after, identity, conversation_id, limit as i64],
402 )?,
403 (Some(identity), None) => {
404 query_sql_frames(&conn, &sql, params![after, identity, limit as i64])?
405 }
406 (None, Some(conversation_id)) => {
407 query_sql_frames(&conn, &sql, params![after, conversation_id, limit as i64])?
408 }
409 (None, None) => query_sql_frames(&conn, &sql, params![after, limit as i64])?,
410 };
411 let next_cursor = frames.last().map(|frame| frame.cursor.clone());
412 Ok(ConsoleTimelinePage {
413 frames,
414 next_cursor,
415 })
416 }
417
418 async fn frame_by_dedupe_key(
419 &self,
420 dedupe_key: &str,
421 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
422 let conn = self
423 .conn
424 .lock()
425 .map_err(|_| boxed_error("console log lock poisoned"))?;
426 select_frame_by_dedupe(&conn, dedupe_key)
427 }
428
429 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
430 let conn = self
431 .conn
432 .lock()
433 .map_err(|_| boxed_error("console log lock poisoned"))?;
434 let seq: Option<i64> = conn
435 .query_row(
436 "SELECT cursor_seq FROM console_frames ORDER BY cursor_seq DESC LIMIT 1",
437 [],
438 |row| row.get(0),
439 )
440 .optional()
441 .map_err(into_boxed)?;
442 Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
443 }
444
445 async fn clear_frames(&self) -> ConsoleLogResult<()> {
446 let conn = self
447 .conn
448 .lock()
449 .map_err(|_| boxed_error("console log lock poisoned"))?;
450 conn.execute("DELETE FROM console_frames", [])
451 .map_err(into_boxed)?;
452 conn.execute(
453 "DELETE FROM sqlite_sequence WHERE name = 'console_frames'",
454 [],
455 )
456 .ok();
457 Ok(())
458 }
459
460 async fn record_source_watermark(
461 &self,
462 runtime_key: &str,
463 source_kind: ConsoleFrameSourceKind,
464 source_cursor: &str,
465 ) -> ConsoleLogResult<()> {
466 let conn = self
467 .conn
468 .lock()
469 .map_err(|_| boxed_error("console log lock poisoned"))?;
470 conn.execute(
471 "INSERT INTO console_source_watermarks (
472 runtime_key, source_kind, source_cursor, last_ingested_at_ms
473 ) VALUES (?1, ?2, ?3, ?4)
474 ON CONFLICT(runtime_key, source_kind) DO UPDATE SET
475 source_cursor = excluded.source_cursor,
476 last_ingested_at_ms = excluded.last_ingested_at_ms",
477 params![
478 runtime_key,
479 source_kind.as_str(),
480 source_cursor,
481 current_time_ms() as i64,
482 ],
483 )
484 .map_err(into_boxed)?;
485 self.watermarks
486 .lock()
487 .map_err(|_| boxed_error("console watermark lock poisoned"))?
488 .insert(
489 (runtime_key.to_string(), source_kind.as_str().to_string()),
490 source_cursor.to_string(),
491 );
492 Ok(())
493 }
494
495 async fn source_watermark(
496 &self,
497 runtime_key: &str,
498 source_kind: ConsoleFrameSourceKind,
499 ) -> ConsoleLogResult<Option<String>> {
500 let watermarks = self
501 .watermarks
502 .lock()
503 .map_err(|_| boxed_error("console watermark lock poisoned"))?;
504 Ok(watermarks
505 .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
506 .cloned())
507 }
508}
509
510fn load_source_watermarks(
511 conn: &Connection,
512) -> ConsoleLogResult<HashMap<(String, String), String>> {
513 let mut stmt = conn
514 .prepare(
515 "SELECT runtime_key, source_kind, source_cursor
516 FROM console_source_watermarks",
517 )
518 .map_err(into_boxed)?;
519 let rows = stmt
520 .query_map([], |row| {
521 Ok((
522 (row.get::<_, String>(0)?, row.get::<_, String>(1)?),
523 row.get::<_, String>(2)?,
524 ))
525 })
526 .map_err(into_boxed)?;
527 let mut watermarks = HashMap::new();
528 for row in rows {
529 let (key, cursor) = row.map_err(into_boxed)?;
530 watermarks.insert(key, cursor);
531 }
532 Ok(watermarks)
533}
534
535fn initialize_schema(conn: &Connection) -> ConsoleLogResult<()> {
536 conn.execute_batch(
537 "CREATE TABLE IF NOT EXISTS console_frames (
538 cursor_seq INTEGER PRIMARY KEY AUTOINCREMENT,
539 id TEXT NOT NULL UNIQUE,
540 dedupe_key TEXT NOT NULL UNIQUE,
541 timestamp_ms INTEGER NOT NULL,
542 runtime_key TEXT NOT NULL,
543 identity TEXT NOT NULL,
544 conversation_id TEXT,
545 session_id TEXT,
546 kind TEXT NOT NULL,
547 status TEXT NOT NULL,
548 frame_version INTEGER NOT NULL DEFAULT 1,
549 updated_at_ms INTEGER,
550 payload_json TEXT NOT NULL,
551 source_kind TEXT NOT NULL,
552 source_cursor TEXT,
553 source_event_id TEXT,
554 interaction_id TEXT,
555 parent_frame_id TEXT,
556 caused_by_frame_id TEXT,
557 turn_id TEXT,
558 run_id TEXT
559 );
560 CREATE TABLE IF NOT EXISTS console_source_watermarks (
561 runtime_key TEXT NOT NULL,
562 source_kind TEXT NOT NULL,
563 source_cursor TEXT NOT NULL,
564 last_ingested_at_ms INTEGER NOT NULL,
565 PRIMARY KEY(runtime_key, source_kind)
566 );
567 CREATE INDEX IF NOT EXISTS idx_console_frames_identity_cursor
568 ON console_frames(identity, cursor_seq);
569 CREATE INDEX IF NOT EXISTS idx_console_frames_conversation_cursor
570 ON console_frames(conversation_id, cursor_seq);",
571 )
572 .map_err(into_boxed)
573}
574
575fn query_sql_frames<P: rusqlite::Params>(
576 conn: &Connection,
577 sql: &str,
578 params: P,
579) -> ConsoleLogResult<Vec<ConsoleFrame>> {
580 let mut stmt = conn.prepare(sql).map_err(into_boxed)?;
581 let rows = stmt.query_map(params, row_to_frame).map_err(into_boxed)?;
582 let mut frames = Vec::new();
583 for row in rows {
584 frames.push(row.map_err(into_boxed)?);
585 }
586 Ok(frames)
587}
588
589fn select_frame_by_dedupe(
590 conn: &Connection,
591 dedupe_key: &str,
592) -> ConsoleLogResult<Option<ConsoleFrame>> {
593 conn.query_row(
594 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
595 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
596 source_kind, source_cursor, source_event_id, interaction_id,
597 parent_frame_id, caused_by_frame_id, turn_id, run_id
598 FROM console_frames WHERE dedupe_key = ?1",
599 params![dedupe_key],
600 row_to_frame,
601 )
602 .optional()
603 .map_err(into_boxed)
604}
605
606fn select_frame_by_id(conn: &Connection, id: &str) -> ConsoleLogResult<Option<ConsoleFrame>> {
607 conn.query_row(
608 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
609 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
610 source_kind, source_cursor, source_event_id, interaction_id,
611 parent_frame_id, caused_by_frame_id, turn_id, run_id
612 FROM console_frames WHERE id = ?1",
613 params![id],
614 row_to_frame,
615 )
616 .optional()
617 .map_err(into_boxed)
618}
619
620fn row_to_frame(row: &rusqlite::Row<'_>) -> rusqlite::Result<ConsoleFrame> {
621 let seq: i64 = row.get(0)?;
622 let payload_json: String = row.get(12)?;
623 let payload = serde_json::from_str(&payload_json).unwrap_or(serde_json::Value::Null);
624 let source_kind: String = row.get(13)?;
625 Ok(ConsoleFrame {
626 cursor: ConsoleCursor::from_seq(seq as u64),
627 id: row.get(1)?,
628 dedupe_key: row.get(2)?,
629 timestamp_ms: row.get::<_, i64>(3)? as u64,
630 runtime_key: row.get(4)?,
631 identity: row.get(5)?,
632 conversation_id: row.get(6)?,
633 session_id: row.get(7)?,
634 kind: row.get(8)?,
635 status: ConsoleFrameStatus::from_str(row.get::<_, String>(9)?.as_str()),
636 frame_version: row.get::<_, i64>(10)? as u64,
637 updated_at_ms: row.get::<_, Option<i64>>(11)?.map(|value| value as u64),
638 payload,
639 source: ConsoleFrameSource {
640 kind: ConsoleFrameSourceKind::from_str(&source_kind),
641 source_cursor: row.get(14)?,
642 },
643 source_event_id: row.get(15)?,
644 interaction_id: row.get(16)?,
645 parent_frame_id: row.get(17)?,
646 caused_by_frame_id: row.get(18)?,
647 turn_id: row.get(19)?,
648 run_id: row.get(20)?,
649 })
650}
651
652fn normalize_limit(limit: usize) -> usize {
653 limit.clamp(1, 1000)
654}
655
656fn cursor_seq(cursor: &ConsoleCursor) -> ConsoleLogResult<u64> {
657 cursor
658 .seq()
659 .ok_or_else(|| boxed_error(format!("invalid console cursor: {cursor}")))
660}
661
662pub(crate) fn stable_frame_id(dedupe_key: &str) -> String {
663 let mut hasher = Sha256::new();
664 hasher.update(dedupe_key.as_bytes());
665 format!("console-frame-{}", to_hex(&hasher.finalize()))
666}
667
668fn to_hex(bytes: &[u8]) -> String {
669 const HEX: &[u8; 16] = b"0123456789abcdef";
670 let mut out = String::with_capacity(bytes.len() * 2);
671 for byte in bytes {
672 out.push(HEX[(byte >> 4) as usize] as char);
673 out.push(HEX[(byte & 0x0f) as usize] as char);
674 }
675 out
676}
677
678fn boxed_error(message: impl Into<String>) -> ConsoleLogError {
679 Box::new(std::io::Error::other(message.into()))
680}
681
682fn into_boxed<E>(error: E) -> ConsoleLogError
683where
684 E: std::error::Error + Send + Sync + 'static,
685{
686 Box::new(error)
687}
688
689fn current_time_ms() -> u64 {
690 match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
691 Ok(duration) => duration.as_millis() as u64,
692 Err(_) => 0,
693 }
694}
695
696#[cfg(test)]
697#[allow(clippy::expect_used)]
698mod tests {
699 use serde_json::json;
700
701 use super::*;
702
703 fn sample_frame(dedupe_key: &str, identity: &str) -> NewConsoleFrame {
704 NewConsoleFrame {
705 id: None,
706 dedupe_key: dedupe_key.to_string(),
707 timestamp_ms: 10,
708 runtime_key: "runtime-a".to_string(),
709 identity: identity.to_string(),
710 conversation_id: Some(identity.to_string()),
711 session_id: Some("session-1".to_string()),
712 kind: "text_delta".to_string(),
713 status: ConsoleFrameStatus::Delivered,
714 payload: json!({ "delta": "hello" }),
715 source: ConsoleFrameSource {
716 kind: ConsoleFrameSourceKind::ConsoleEvent,
717 source_cursor: None,
718 },
719 source_event_id: Some(dedupe_key.to_string()),
720 interaction_id: None,
721 turn_id: None,
722 run_id: None,
723 parent_frame_id: None,
724 caused_by_frame_id: None,
725 }
726 }
727
728 #[tokio::test]
729 async fn in_memory_log_assigns_monotonic_cursors_and_dedupes() {
730 let store = InMemoryConsoleLogStore::new();
731 let first = store
732 .append_if_absent(sample_frame("event-1", "agent-a"))
733 .await
734 .expect("append first");
735 let duplicate = store
736 .append_if_absent(sample_frame("event-1", "agent-a"))
737 .await
738 .expect("append duplicate");
739 let second = store
740 .append_if_absent(sample_frame("event-2", "agent-a"))
741 .await
742 .expect("append second");
743
744 assert_eq!(first.disposition, AppendDisposition::Inserted);
745 assert_eq!(duplicate.disposition, AppendDisposition::Existing);
746 assert_eq!(first.frame.cursor.seq(), Some(1));
747 assert_eq!(second.frame.cursor.seq(), Some(2));
748 }
749
750 #[tokio::test]
751 async fn sqlite_log_queries_by_identity_and_cursor() {
752 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
753 let first = store
754 .append_if_absent(sample_frame("event-1", "agent-a"))
755 .await
756 .expect("append first");
757 store
758 .append_if_absent(sample_frame("event-2", "agent-b"))
759 .await
760 .expect("append second");
761 store
762 .append_if_absent(sample_frame("event-3", "agent-a"))
763 .await
764 .expect("append third");
765
766 let page = store
767 .query_frames(ConsoleTimelineQuery {
768 identity: Some("agent-a".to_string()),
769 after: Some(first.frame.cursor),
770 limit: 10,
771 ..ConsoleTimelineQuery::default()
772 })
773 .await
774 .expect("query");
775 assert_eq!(page.frames.len(), 1);
776 assert_eq!(page.frames[0].dedupe_key, "event-3");
777 }
778
779 #[tokio::test]
780 async fn sqlite_log_updates_status() {
781 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
782 let first = store
783 .append_if_absent(sample_frame("event-1", "agent-a"))
784 .await
785 .expect("append first");
786 let updated = store
787 .update_frame_status(&first.frame.id, ConsoleFrameStatus::DeliveryFailed)
788 .await
789 .expect("update")
790 .expect("updated frame");
791 assert_eq!(updated.status, ConsoleFrameStatus::DeliveryFailed);
792 assert_eq!(updated.frame_version, 2);
793 assert!(updated.updated_at_ms.is_some());
794 }
795
796 #[tokio::test]
797 async fn sqlite_log_records_source_watermarks() {
798 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
799 store
800 .record_source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent, "evt-99")
801 .await
802 .expect("record watermark");
803 let watermark = store
804 .source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent)
805 .await
806 .expect("read watermark");
807 assert_eq!(watermark.as_deref(), Some("evt-99"));
808 }
809
810 #[tokio::test]
811 async fn sqlite_log_persists_frames_across_handles() {
812 let temp_dir = tempfile::tempdir().expect("temp dir");
813 let path = temp_dir.path().join("console.sqlite");
814 let store = SqliteConsoleLogStore::open(&path).expect("open first handle");
815 store
816 .append_if_absent(sample_frame("event-1", "agent-a"))
817 .await
818 .expect("append frame");
819 drop(store);
820
821 let reopened = SqliteConsoleLogStore::open(&path).expect("open second handle");
822 let page = reopened
823 .query_frames(ConsoleTimelineQuery {
824 identity: Some("agent-a".to_string()),
825 limit: 10,
826 ..ConsoleTimelineQuery::default()
827 })
828 .await
829 .expect("query frames");
830 assert_eq!(page.frames.len(), 1);
831 assert_eq!(page.frames[0].dedupe_key, "event-1");
832 }
833}