1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4
5use rusqlite::{Connection, OptionalExtension, params};
6use sha2::{Digest, Sha256};
7
8use super::types::{
9 AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
10 ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleTimelineMode, ConsoleTimelinePage,
11 ConsoleTimelineQuery, ConsoleTimelineWindowPage, ConsoleTimelineWindowQuery, NewConsoleFrame,
12};
13
14pub type ConsoleLogResult<T> = Result<T, ConsoleLogError>;
15
16pub type ConsoleLogError = Box<dyn std::error::Error + Send + Sync>;
17
18#[async_trait::async_trait]
19pub trait ConsoleLogStore: Send + Sync {
20 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome>;
21
22 async fn update_frame_status(
23 &self,
24 frame_id: &str,
25 status: ConsoleFrameStatus,
26 ) -> ConsoleLogResult<Option<ConsoleFrame>>;
27
28 async fn query_frames(
29 &self,
30 query: ConsoleTimelineQuery,
31 ) -> ConsoleLogResult<ConsoleTimelinePage>;
32
33 async fn query_windowed_frames(
34 &self,
35 query: ConsoleTimelineWindowQuery,
36 ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
37 if query.mode != ConsoleTimelineMode::Since || query.before.is_some() {
38 return Err(std::io::Error::other(
39 "console log store must implement query_windowed_frames for v0.4 timeline windows",
40 )
41 .into());
42 }
43 let page = self
44 .query_frames(ConsoleTimelineQuery {
45 identity: query.identity,
46 conversation_id: query.conversation_id,
47 after: query.after,
48 limit: query.limit,
49 })
50 .await?;
51 Ok(ConsoleTimelineWindowPage {
52 latest_cursor: page.next_cursor.clone(),
53 exhausted: false,
54 frames: page.frames,
55 next_cursor: page.next_cursor,
56 })
57 }
58
59 async fn frame_by_dedupe_key(&self, dedupe_key: &str)
60 -> ConsoleLogResult<Option<ConsoleFrame>>;
61
62 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>>;
63
64 async fn clear_frames(&self) -> ConsoleLogResult<()>;
65
66 async fn record_source_watermark(
67 &self,
68 runtime_key: &str,
69 source_kind: ConsoleFrameSourceKind,
70 source_cursor: &str,
71 ) -> ConsoleLogResult<()>;
72
73 async fn source_watermark(
74 &self,
75 runtime_key: &str,
76 source_kind: ConsoleFrameSourceKind,
77 ) -> ConsoleLogResult<Option<String>>;
78}
79
80#[derive(Default)]
81pub struct InMemoryConsoleLogStore {
82 state: Mutex<InMemoryState>,
83}
84
85#[derive(Default)]
86struct InMemoryState {
87 next_seq: u64,
88 frames: BTreeMap<u64, ConsoleFrame>,
89 dedupe_to_seq: HashMap<String, u64>,
90 id_to_seq: HashMap<String, u64>,
91 identity_to_seqs: HashMap<String, BTreeSet<u64>>,
92 conversation_to_seqs: HashMap<String, BTreeSet<u64>>,
93 watermarks: HashMap<(String, String), String>,
94}
95
96impl InMemoryConsoleLogStore {
97 pub fn new() -> Self {
98 Self {
99 state: Mutex::new(InMemoryState {
100 next_seq: 1,
101 frames: BTreeMap::new(),
102 dedupe_to_seq: HashMap::new(),
103 id_to_seq: HashMap::new(),
104 identity_to_seqs: HashMap::new(),
105 conversation_to_seqs: HashMap::new(),
106 watermarks: HashMap::new(),
107 }),
108 }
109 }
110}
111
112#[async_trait::async_trait]
113impl ConsoleLogStore for InMemoryConsoleLogStore {
114 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
115 let mut state = self
116 .state
117 .lock()
118 .map_err(|_| boxed_error("console log lock poisoned"))?;
119 if let Some(seq) = state.dedupe_to_seq.get(&frame.dedupe_key).copied()
120 && let Some(existing) = state.frames.get(&seq)
121 {
122 return Ok(AppendOutcome {
123 disposition: AppendDisposition::Existing,
124 frame: existing.clone(),
125 });
126 }
127
128 let seq = state.next_seq;
129 state.next_seq = state.next_seq.saturating_add(1);
130 let id = frame
131 .id
132 .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
133 let frame = ConsoleFrame {
134 id: id.clone(),
135 cursor: ConsoleCursor::from_seq(seq),
136 dedupe_key: frame.dedupe_key,
137 timestamp_ms: frame.timestamp_ms,
138 runtime_key: frame.runtime_key,
139 identity: frame.identity,
140 conversation_id: frame.conversation_id,
141 session_id: frame.session_id,
142 kind: frame.kind,
143 status: frame.status,
144 frame_version: 1,
145 updated_at_ms: None,
146 payload: frame.payload,
147 source: frame.source,
148 source_event_id: frame.source_event_id,
149 interaction_id: frame.interaction_id,
150 turn_id: frame.turn_id,
151 run_id: frame.run_id,
152 parent_frame_id: frame.parent_frame_id,
153 caused_by_frame_id: frame.caused_by_frame_id,
154 };
155 state.dedupe_to_seq.insert(frame.dedupe_key.clone(), seq);
156 state.id_to_seq.insert(id, seq);
157 state
158 .identity_to_seqs
159 .entry(frame.identity.clone())
160 .or_default()
161 .insert(seq);
162 if let Some(conversation_id) = frame.conversation_id.as_ref() {
163 state
164 .conversation_to_seqs
165 .entry(conversation_id.clone())
166 .or_default()
167 .insert(seq);
168 }
169 state.frames.insert(seq, frame.clone());
170 Ok(AppendOutcome {
171 disposition: AppendDisposition::Inserted,
172 frame,
173 })
174 }
175
176 async fn update_frame_status(
177 &self,
178 frame_id: &str,
179 status: ConsoleFrameStatus,
180 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
181 let mut state = self
182 .state
183 .lock()
184 .map_err(|_| boxed_error("console log lock poisoned"))?;
185 let Some(seq) = state.id_to_seq.get(frame_id).copied() else {
186 return Ok(None);
187 };
188 let Some(frame) = state.frames.get_mut(&seq) else {
189 return Ok(None);
190 };
191 frame.status = status;
192 frame.frame_version = frame.frame_version.saturating_add(1);
193 frame.updated_at_ms = Some(current_time_ms());
194 Ok(Some(frame.clone()))
195 }
196
197 async fn query_frames(
198 &self,
199 query: ConsoleTimelineQuery,
200 ) -> ConsoleLogResult<ConsoleTimelinePage> {
201 let page = self.query_windowed_frames(query.into()).await?;
202 Ok(ConsoleTimelinePage {
203 frames: page.frames,
204 next_cursor: page.next_cursor,
205 })
206 }
207
208 async fn query_windowed_frames(
209 &self,
210 query: ConsoleTimelineWindowQuery,
211 ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
212 let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
213 let before_seq = query.before.as_ref().map(cursor_seq).transpose()?;
214 let limit = normalize_limit(query.limit);
215 let scan_limit = limit.saturating_add(1);
216 let state = self
217 .state
218 .lock()
219 .map_err(|_| boxed_error("console log lock poisoned"))?;
220 let frame_matches = |seq: u64, frame: &ConsoleFrame| -> bool {
221 if after_seq.is_some_and(|after| seq <= after) {
222 return false;
223 }
224 if before_seq.is_some_and(|before| seq >= before) {
225 return false;
226 }
227 if let Some(identity) = query.identity.as_deref()
228 && frame.identity != identity
229 {
230 return false;
231 }
232 if let Some(conversation_id) = query.conversation_id.as_deref()
233 && frame.conversation_id.as_deref() != Some(conversation_id)
234 {
235 return false;
236 }
237 true
238 };
239 if let (Some(identity), Some(conversation_id)) =
240 (query.identity.as_deref(), query.conversation_id.as_deref())
241 {
242 let identity_seqs = state.identity_to_seqs.get(identity);
243 let conversation_seqs = state.conversation_to_seqs.get(conversation_id);
244 return match (identity_seqs, conversation_seqs) {
245 (Some(left), Some(right)) if left.len() <= right.len() => {
246 Ok(in_memory_window_from_seq_iters(
247 &state,
248 query.mode,
249 (limit, scan_limit),
250 left.iter().copied().filter(|seq| right.contains(seq)),
251 left.iter().rev().copied().filter(|seq| right.contains(seq)),
252 left.iter().rev().copied().filter(|seq| right.contains(seq)),
253 &frame_matches,
254 ))
255 }
256 (Some(left), Some(right)) => Ok(in_memory_window_from_seq_iters(
257 &state,
258 query.mode,
259 (limit, scan_limit),
260 right.iter().copied().filter(|seq| left.contains(seq)),
261 right.iter().rev().copied().filter(|seq| left.contains(seq)),
262 right.iter().rev().copied().filter(|seq| left.contains(seq)),
263 &frame_matches,
264 )),
265 _ => Ok(empty_window()),
266 };
267 }
268 if let Some(identity) = query.identity.as_deref() {
269 let Some(seqs) = state.identity_to_seqs.get(identity) else {
270 return Ok(empty_window());
271 };
272 return Ok(in_memory_window_from_seq_iters(
273 &state,
274 query.mode,
275 (limit, scan_limit),
276 seqs.iter().copied(),
277 seqs.iter().rev().copied(),
278 seqs.iter().rev().copied(),
279 &frame_matches,
280 ));
281 }
282 if let Some(conversation_id) = query.conversation_id.as_deref() {
283 let Some(seqs) = state.conversation_to_seqs.get(conversation_id) else {
284 return Ok(empty_window());
285 };
286 return Ok(in_memory_window_from_seq_iters(
287 &state,
288 query.mode,
289 (limit, scan_limit),
290 seqs.iter().copied(),
291 seqs.iter().rev().copied(),
292 seqs.iter().rev().copied(),
293 &frame_matches,
294 ));
295 }
296 Ok(in_memory_window_from_seq_iters(
297 &state,
298 query.mode,
299 (limit, scan_limit),
300 state.frames.keys().copied(),
301 state.frames.keys().rev().copied(),
302 state.frames.keys().rev().copied(),
303 &frame_matches,
304 ))
305 }
306
307 async fn frame_by_dedupe_key(
308 &self,
309 dedupe_key: &str,
310 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
311 let state = self
312 .state
313 .lock()
314 .map_err(|_| boxed_error("console log lock poisoned"))?;
315 let Some(seq) = state.dedupe_to_seq.get(dedupe_key).copied() else {
316 return Ok(None);
317 };
318 Ok(state.frames.get(&seq).cloned())
319 }
320
321 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
322 let state = self
323 .state
324 .lock()
325 .map_err(|_| boxed_error("console log lock poisoned"))?;
326 Ok(state
327 .frames
328 .keys()
329 .next_back()
330 .copied()
331 .map(ConsoleCursor::from_seq))
332 }
333
334 async fn clear_frames(&self) -> ConsoleLogResult<()> {
335 let mut state = self
336 .state
337 .lock()
338 .map_err(|_| boxed_error("console log lock poisoned"))?;
339 state.frames.clear();
340 state.dedupe_to_seq.clear();
341 state.id_to_seq.clear();
342 state.identity_to_seqs.clear();
343 state.conversation_to_seqs.clear();
344 state.next_seq = 1;
345 Ok(())
346 }
347
348 async fn record_source_watermark(
349 &self,
350 runtime_key: &str,
351 source_kind: ConsoleFrameSourceKind,
352 source_cursor: &str,
353 ) -> ConsoleLogResult<()> {
354 let mut state = self
355 .state
356 .lock()
357 .map_err(|_| boxed_error("console log lock poisoned"))?;
358 state.watermarks.insert(
359 (runtime_key.to_string(), source_kind.as_str().to_string()),
360 source_cursor.to_string(),
361 );
362 Ok(())
363 }
364
365 async fn source_watermark(
366 &self,
367 runtime_key: &str,
368 source_kind: ConsoleFrameSourceKind,
369 ) -> ConsoleLogResult<Option<String>> {
370 let state = self
371 .state
372 .lock()
373 .map_err(|_| boxed_error("console log lock poisoned"))?;
374 Ok(state
375 .watermarks
376 .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
377 .cloned())
378 }
379}
380
381fn empty_window() -> ConsoleTimelineWindowPage {
382 ConsoleTimelineWindowPage {
383 frames: Vec::new(),
384 next_cursor: None,
385 latest_cursor: None,
386 exhausted: true,
387 }
388}
389
390fn in_memory_window_from_seq_iters<IForward, IReverse, ILatest, F>(
391 state: &InMemoryState,
392 mode: ConsoleTimelineMode,
393 limits: (usize, usize),
394 forward_iter: IForward,
395 reverse_iter: IReverse,
396 mut latest_iter: ILatest,
397 frame_matches: &F,
398) -> ConsoleTimelineWindowPage
399where
400 IForward: Iterator<Item = u64>,
401 IReverse: Iterator<Item = u64>,
402 ILatest: Iterator<Item = u64>,
403 F: Fn(u64, &ConsoleFrame) -> bool,
404{
405 let (limit, scan_limit) = limits;
406 let mut frames = match mode {
407 ConsoleTimelineMode::Since => forward_iter
408 .filter_map(|seq| {
409 let frame = state.frames.get(&seq)?;
410 frame_matches(seq, frame).then(|| frame.clone())
411 })
412 .take(scan_limit)
413 .collect::<Vec<_>>(),
414 ConsoleTimelineMode::Recent => {
415 let mut frames = reverse_iter
416 .filter_map(|seq| {
417 let frame = state.frames.get(&seq)?;
418 frame_matches(seq, frame).then(|| frame.clone())
419 })
420 .take(scan_limit)
421 .collect::<Vec<_>>();
422 frames.reverse();
423 frames
424 }
425 };
426 let exhausted = frames.len() <= limit;
427 if frames.len() > limit {
428 match mode {
429 ConsoleTimelineMode::Since => frames.truncate(limit),
430 ConsoleTimelineMode::Recent => {
431 frames.remove(0);
432 }
433 }
434 }
435 let next_cursor = frames.last().map(|frame| frame.cursor.clone());
436 let latest_cursor = match mode {
437 ConsoleTimelineMode::Since => latest_iter.find_map(|seq| {
438 let frame = state.frames.get(&seq)?;
439 frame_matches(seq, frame).then(|| frame.cursor.clone())
440 }),
441 ConsoleTimelineMode::Recent => next_cursor.clone(),
442 };
443 ConsoleTimelineWindowPage {
444 frames,
445 next_cursor,
446 latest_cursor,
447 exhausted,
448 }
449}
450
451pub struct SqliteConsoleLogStore {
452 conn: Arc<Mutex<Connection>>,
453 watermarks: Arc<Mutex<HashMap<(String, String), String>>>,
454}
455
456impl SqliteConsoleLogStore {
457 pub fn open(path: impl AsRef<Path>) -> ConsoleLogResult<Self> {
458 let conn = Connection::open(path).map_err(into_boxed)?;
459 Self::from_connection(conn)
460 }
461
462 pub fn in_memory() -> ConsoleLogResult<Self> {
463 let conn = Connection::open_in_memory().map_err(into_boxed)?;
464 Self::from_connection(conn)
465 }
466
467 fn from_connection(conn: Connection) -> ConsoleLogResult<Self> {
468 initialize_schema(&conn)?;
469 let watermarks = load_source_watermarks(&conn)?;
470 Ok(Self {
471 conn: Arc::new(Mutex::new(conn)),
472 watermarks: Arc::new(Mutex::new(watermarks)),
473 })
474 }
475}
476
477#[async_trait::async_trait]
478impl ConsoleLogStore for SqliteConsoleLogStore {
479 async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
480 let conn = self
481 .conn
482 .lock()
483 .map_err(|_| boxed_error("console log lock poisoned"))?;
484 if let Some(existing) = select_frame_by_dedupe(&conn, &frame.dedupe_key)? {
485 return Ok(AppendOutcome {
486 disposition: AppendDisposition::Existing,
487 frame: existing,
488 });
489 }
490
491 let id = frame
492 .id
493 .clone()
494 .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
495 let payload_json = serde_json::to_string(&frame.payload).map_err(into_boxed)?;
496 conn.execute(
497 "INSERT INTO console_frames (
498 id, dedupe_key, timestamp_ms, runtime_key, identity,
499 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
500 source_kind, source_cursor, source_event_id, interaction_id,
501 parent_frame_id, caused_by_frame_id, turn_id, run_id
502 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)",
503 params![
504 id,
505 frame.dedupe_key,
506 frame.timestamp_ms as i64,
507 frame.runtime_key,
508 frame.identity,
509 frame.conversation_id,
510 frame.session_id,
511 frame.kind,
512 frame.status.as_str(),
513 payload_json,
514 frame.source.kind.as_str(),
515 frame.source.source_cursor,
516 frame.source_event_id,
517 frame.interaction_id,
518 frame.parent_frame_id,
519 frame.caused_by_frame_id,
520 frame.turn_id,
521 frame.run_id,
522 ],
523 )
524 .map_err(into_boxed)?;
525 let inserted = select_frame_by_dedupe(&conn, &frame.dedupe_key)?
526 .ok_or_else(|| boxed_error("inserted console frame was not readable"))?;
527 Ok(AppendOutcome {
528 disposition: AppendDisposition::Inserted,
529 frame: inserted,
530 })
531 }
532
533 async fn update_frame_status(
534 &self,
535 frame_id: &str,
536 status: ConsoleFrameStatus,
537 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
538 let conn = self
539 .conn
540 .lock()
541 .map_err(|_| boxed_error("console log lock poisoned"))?;
542 conn.execute(
543 "UPDATE console_frames SET status = ?1, frame_version = frame_version + 1, updated_at_ms = ?2 WHERE id = ?3",
544 params![status.as_str(), current_time_ms() as i64, frame_id],
545 )
546 .map_err(into_boxed)?;
547 select_frame_by_id(&conn, frame_id)
548 }
549
550 async fn query_frames(
551 &self,
552 query: ConsoleTimelineQuery,
553 ) -> ConsoleLogResult<ConsoleTimelinePage> {
554 let page = self.query_windowed_frames(query.into()).await?;
555 Ok(ConsoleTimelinePage {
556 frames: page.frames,
557 next_cursor: page.next_cursor,
558 })
559 }
560
561 async fn query_windowed_frames(
562 &self,
563 query: ConsoleTimelineWindowQuery,
564 ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
565 let after_seq = query.after.as_ref().map(cursor_seq_i64).transpose()?;
566 let before_seq = query.before.as_ref().map(cursor_seq_i64).transpose()?;
567 let limit = normalize_limit(query.limit);
568 let scan_limit = limit.saturating_add(1);
569 let conn = self
570 .conn
571 .lock()
572 .map_err(|_| boxed_error("console log lock poisoned"))?;
573 let mut sql = String::from(
574 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
575 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
576 source_kind, source_cursor, source_event_id, interaction_id,
577 parent_frame_id, caused_by_frame_id, turn_id, run_id
578 FROM console_frames WHERE cursor_seq > ?1 AND cursor_seq < ?2",
579 );
580 let mut next_param = 3usize;
581 if query.identity.is_some() {
582 sql.push_str(" AND identity = ?");
583 sql.push_str(&next_param.to_string());
584 next_param += 1;
585 }
586 if query.conversation_id.is_some() {
587 sql.push_str(" AND conversation_id = ?");
588 sql.push_str(&next_param.to_string());
589 next_param += 1;
590 }
591 match query.mode {
592 ConsoleTimelineMode::Since => sql.push_str(" ORDER BY cursor_seq ASC LIMIT ?"),
593 ConsoleTimelineMode::Recent => sql.push_str(" ORDER BY cursor_seq DESC LIMIT ?"),
594 }
595 sql.push_str(&next_param.to_string());
596
597 let after = after_seq.unwrap_or(0);
598 let before = before_seq.unwrap_or(i64::MAX);
599 let mut values = vec![
600 rusqlite::types::Value::Integer(after),
601 rusqlite::types::Value::Integer(before),
602 ];
603 if let Some(identity) = query.identity.as_ref() {
604 values.push(rusqlite::types::Value::Text(identity.clone()));
605 }
606 if let Some(conversation_id) = query.conversation_id.as_ref() {
607 values.push(rusqlite::types::Value::Text(conversation_id.clone()));
608 }
609 values.push(rusqlite::types::Value::Integer(scan_limit as i64));
610 let mut frames = query_sql_frames(&conn, &sql, rusqlite::params_from_iter(values))?;
611 if query.mode == ConsoleTimelineMode::Recent {
612 frames.reverse();
613 }
614 let exhausted = frames.len() <= limit;
615 if frames.len() > limit {
616 match query.mode {
617 ConsoleTimelineMode::Since => frames.truncate(limit),
618 ConsoleTimelineMode::Recent => {
619 frames.remove(0);
620 }
621 }
622 }
623 let latest_cursor = latest_matching_cursor(
624 &conn,
625 after,
626 before,
627 query.identity.as_deref(),
628 query.conversation_id.as_deref(),
629 )?;
630 let next_cursor = frames.last().map(|frame| frame.cursor.clone());
631 Ok(ConsoleTimelineWindowPage {
632 frames,
633 next_cursor,
634 latest_cursor,
635 exhausted,
636 })
637 }
638
639 async fn frame_by_dedupe_key(
640 &self,
641 dedupe_key: &str,
642 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
643 let conn = self
644 .conn
645 .lock()
646 .map_err(|_| boxed_error("console log lock poisoned"))?;
647 select_frame_by_dedupe(&conn, dedupe_key)
648 }
649
650 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
651 let conn = self
652 .conn
653 .lock()
654 .map_err(|_| boxed_error("console log lock poisoned"))?;
655 let seq: Option<i64> = conn
656 .query_row(
657 "SELECT cursor_seq FROM console_frames ORDER BY cursor_seq DESC LIMIT 1",
658 [],
659 |row| row.get(0),
660 )
661 .optional()
662 .map_err(into_boxed)?;
663 Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
664 }
665
666 async fn clear_frames(&self) -> ConsoleLogResult<()> {
667 let conn = self
668 .conn
669 .lock()
670 .map_err(|_| boxed_error("console log lock poisoned"))?;
671 conn.execute("DELETE FROM console_frames", [])
672 .map_err(into_boxed)?;
673 conn.execute(
674 "DELETE FROM sqlite_sequence WHERE name = 'console_frames'",
675 [],
676 )
677 .ok();
678 Ok(())
679 }
680
681 async fn record_source_watermark(
682 &self,
683 runtime_key: &str,
684 source_kind: ConsoleFrameSourceKind,
685 source_cursor: &str,
686 ) -> ConsoleLogResult<()> {
687 let conn = self
688 .conn
689 .lock()
690 .map_err(|_| boxed_error("console log lock poisoned"))?;
691 conn.execute(
692 "INSERT INTO console_source_watermarks (
693 runtime_key, source_kind, source_cursor, last_ingested_at_ms
694 ) VALUES (?1, ?2, ?3, ?4)
695 ON CONFLICT(runtime_key, source_kind) DO UPDATE SET
696 source_cursor = excluded.source_cursor,
697 last_ingested_at_ms = excluded.last_ingested_at_ms",
698 params![
699 runtime_key,
700 source_kind.as_str(),
701 source_cursor,
702 current_time_ms() as i64,
703 ],
704 )
705 .map_err(into_boxed)?;
706 self.watermarks
707 .lock()
708 .map_err(|_| boxed_error("console watermark lock poisoned"))?
709 .insert(
710 (runtime_key.to_string(), source_kind.as_str().to_string()),
711 source_cursor.to_string(),
712 );
713 Ok(())
714 }
715
716 async fn source_watermark(
717 &self,
718 runtime_key: &str,
719 source_kind: ConsoleFrameSourceKind,
720 ) -> ConsoleLogResult<Option<String>> {
721 let watermarks = self
722 .watermarks
723 .lock()
724 .map_err(|_| boxed_error("console watermark lock poisoned"))?;
725 Ok(watermarks
726 .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
727 .cloned())
728 }
729}
730
731fn load_source_watermarks(
732 conn: &Connection,
733) -> ConsoleLogResult<HashMap<(String, String), String>> {
734 let mut stmt = conn
735 .prepare(
736 "SELECT runtime_key, source_kind, source_cursor
737 FROM console_source_watermarks",
738 )
739 .map_err(into_boxed)?;
740 let rows = stmt
741 .query_map([], |row| {
742 Ok((
743 (row.get::<_, String>(0)?, row.get::<_, String>(1)?),
744 row.get::<_, String>(2)?,
745 ))
746 })
747 .map_err(into_boxed)?;
748 let mut watermarks = HashMap::new();
749 for row in rows {
750 let (key, cursor) = row.map_err(into_boxed)?;
751 watermarks.insert(key, cursor);
752 }
753 Ok(watermarks)
754}
755
756fn initialize_schema(conn: &Connection) -> ConsoleLogResult<()> {
757 conn.execute_batch(
758 "CREATE TABLE IF NOT EXISTS console_frames (
759 cursor_seq INTEGER PRIMARY KEY AUTOINCREMENT,
760 id TEXT NOT NULL UNIQUE,
761 dedupe_key TEXT NOT NULL UNIQUE,
762 timestamp_ms INTEGER NOT NULL,
763 runtime_key TEXT NOT NULL,
764 identity TEXT NOT NULL,
765 conversation_id TEXT,
766 session_id TEXT,
767 kind TEXT NOT NULL,
768 status TEXT NOT NULL,
769 frame_version INTEGER NOT NULL DEFAULT 1,
770 updated_at_ms INTEGER,
771 payload_json TEXT NOT NULL,
772 source_kind TEXT NOT NULL,
773 source_cursor TEXT,
774 source_event_id TEXT,
775 interaction_id TEXT,
776 parent_frame_id TEXT,
777 caused_by_frame_id TEXT,
778 turn_id TEXT,
779 run_id TEXT
780 );
781 CREATE TABLE IF NOT EXISTS console_source_watermarks (
782 runtime_key TEXT NOT NULL,
783 source_kind TEXT NOT NULL,
784 source_cursor TEXT NOT NULL,
785 last_ingested_at_ms INTEGER NOT NULL,
786 PRIMARY KEY(runtime_key, source_kind)
787 );
788 CREATE INDEX IF NOT EXISTS idx_console_frames_identity_cursor
789 ON console_frames(identity, cursor_seq);
790 CREATE INDEX IF NOT EXISTS idx_console_frames_conversation_cursor
791 ON console_frames(conversation_id, cursor_seq);",
792 )
793 .map_err(into_boxed)
794}
795
796fn query_sql_frames<P: rusqlite::Params>(
797 conn: &Connection,
798 sql: &str,
799 params: P,
800) -> ConsoleLogResult<Vec<ConsoleFrame>> {
801 let mut stmt = conn.prepare(sql).map_err(into_boxed)?;
802 let rows = stmt.query_map(params, row_to_frame).map_err(into_boxed)?;
803 let mut frames = Vec::new();
804 for row in rows {
805 frames.push(row.map_err(into_boxed)?);
806 }
807 Ok(frames)
808}
809
810fn select_frame_by_dedupe(
811 conn: &Connection,
812 dedupe_key: &str,
813) -> ConsoleLogResult<Option<ConsoleFrame>> {
814 conn.query_row(
815 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
816 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
817 source_kind, source_cursor, source_event_id, interaction_id,
818 parent_frame_id, caused_by_frame_id, turn_id, run_id
819 FROM console_frames WHERE dedupe_key = ?1",
820 params![dedupe_key],
821 row_to_frame,
822 )
823 .optional()
824 .map_err(into_boxed)
825}
826
827fn select_frame_by_id(conn: &Connection, id: &str) -> ConsoleLogResult<Option<ConsoleFrame>> {
828 conn.query_row(
829 "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
830 conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
831 source_kind, source_cursor, source_event_id, interaction_id,
832 parent_frame_id, caused_by_frame_id, turn_id, run_id
833 FROM console_frames WHERE id = ?1",
834 params![id],
835 row_to_frame,
836 )
837 .optional()
838 .map_err(into_boxed)
839}
840
841fn latest_matching_cursor(
842 conn: &Connection,
843 after: i64,
844 before: i64,
845 identity: Option<&str>,
846 conversation_id: Option<&str>,
847) -> ConsoleLogResult<Option<ConsoleCursor>> {
848 let mut sql = String::from(
849 "SELECT cursor_seq FROM console_frames WHERE cursor_seq > ?1 AND cursor_seq < ?2",
850 );
851 if identity.is_some() {
852 sql.push_str(" AND identity = ?3");
853 }
854 if conversation_id.is_some() {
855 sql.push_str(" AND conversation_id = ?");
856 let next_param = 3 + usize::from(identity.is_some());
857 sql.push_str(&next_param.to_string());
858 }
859 sql.push_str(" ORDER BY cursor_seq DESC LIMIT 1");
860
861 let mut values = vec![
862 rusqlite::types::Value::Integer(after),
863 rusqlite::types::Value::Integer(before),
864 ];
865 if let Some(identity) = identity {
866 values.push(rusqlite::types::Value::Text(identity.to_string()));
867 }
868 if let Some(conversation_id) = conversation_id {
869 values.push(rusqlite::types::Value::Text(conversation_id.to_string()));
870 }
871 let seq: Option<i64> = conn
872 .query_row(&sql, rusqlite::params_from_iter(values), |row| row.get(0))
873 .optional()
874 .map_err(into_boxed)?;
875 Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
876}
877
878fn row_to_frame(row: &rusqlite::Row<'_>) -> rusqlite::Result<ConsoleFrame> {
879 let seq: i64 = row.get(0)?;
880 let payload_json: String = row.get(12)?;
881 let payload = serde_json::from_str(&payload_json).unwrap_or(serde_json::Value::Null);
882 let source_kind: String = row.get(13)?;
883 Ok(ConsoleFrame {
884 cursor: ConsoleCursor::from_seq(seq as u64),
885 id: row.get(1)?,
886 dedupe_key: row.get(2)?,
887 timestamp_ms: row.get::<_, i64>(3)? as u64,
888 runtime_key: row.get(4)?,
889 identity: row.get(5)?,
890 conversation_id: row.get(6)?,
891 session_id: row.get(7)?,
892 kind: row.get(8)?,
893 status: ConsoleFrameStatus::from_str(row.get::<_, String>(9)?.as_str()),
894 frame_version: row.get::<_, i64>(10)? as u64,
895 updated_at_ms: row.get::<_, Option<i64>>(11)?.map(|value| value as u64),
896 payload,
897 source: ConsoleFrameSource {
898 kind: ConsoleFrameSourceKind::from_str(&source_kind),
899 source_cursor: row.get(14)?,
900 },
901 source_event_id: row.get(15)?,
902 interaction_id: row.get(16)?,
903 parent_frame_id: row.get(17)?,
904 caused_by_frame_id: row.get(18)?,
905 turn_id: row.get(19)?,
906 run_id: row.get(20)?,
907 })
908}
909
910fn normalize_limit(limit: usize) -> usize {
911 limit.clamp(1, 1000)
912}
913
914fn cursor_seq(cursor: &ConsoleCursor) -> ConsoleLogResult<u64> {
915 cursor
916 .seq()
917 .ok_or_else(|| boxed_error(format!("invalid console cursor: {cursor}")))
918}
919
920fn cursor_seq_i64(cursor: &ConsoleCursor) -> ConsoleLogResult<i64> {
921 let seq = cursor_seq(cursor)?;
922 i64::try_from(seq).map_err(|_| boxed_error(format!("console cursor out of range: {cursor}")))
923}
924
925pub(crate) fn stable_frame_id(dedupe_key: &str) -> String {
926 let mut hasher = Sha256::new();
927 hasher.update(dedupe_key.as_bytes());
928 format!("console-frame-{}", to_hex(&hasher.finalize()))
929}
930
931fn to_hex(bytes: &[u8]) -> String {
932 const HEX: &[u8; 16] = b"0123456789abcdef";
933 let mut out = String::with_capacity(bytes.len() * 2);
934 for byte in bytes {
935 out.push(HEX[(byte >> 4) as usize] as char);
936 out.push(HEX[(byte & 0x0f) as usize] as char);
937 }
938 out
939}
940
941fn boxed_error(message: impl Into<String>) -> ConsoleLogError {
942 Box::new(std::io::Error::other(message.into()))
943}
944
945fn into_boxed<E>(error: E) -> ConsoleLogError
946where
947 E: std::error::Error + Send + Sync + 'static,
948{
949 Box::new(error)
950}
951
952fn current_time_ms() -> u64 {
953 match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
954 Ok(duration) => duration.as_millis() as u64,
955 Err(_) => 0,
956 }
957}
958
959#[cfg(test)]
960#[allow(clippy::expect_used)]
961mod tests {
962 use serde_json::json;
963
964 use super::*;
965
966 struct LegacyQueryOnlyStore;
967
968 #[async_trait::async_trait]
969 impl ConsoleLogStore for LegacyQueryOnlyStore {
970 async fn append_if_absent(
971 &self,
972 _frame: NewConsoleFrame,
973 ) -> ConsoleLogResult<AppendOutcome> {
974 Err(boxed_error("not implemented for test"))
975 }
976
977 async fn update_frame_status(
978 &self,
979 _frame_id: &str,
980 _status: ConsoleFrameStatus,
981 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
982 Err(boxed_error("not implemented for test"))
983 }
984
985 async fn query_frames(
986 &self,
987 _query: ConsoleTimelineQuery,
988 ) -> ConsoleLogResult<ConsoleTimelinePage> {
989 Ok(ConsoleTimelinePage {
990 frames: Vec::new(),
991 next_cursor: None,
992 })
993 }
994
995 async fn frame_by_dedupe_key(
996 &self,
997 _dedupe_key: &str,
998 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
999 Err(boxed_error("not implemented for test"))
1000 }
1001
1002 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
1003 Ok(None)
1004 }
1005
1006 async fn clear_frames(&self) -> ConsoleLogResult<()> {
1007 Ok(())
1008 }
1009
1010 async fn record_source_watermark(
1011 &self,
1012 _runtime_key: &str,
1013 _source_kind: ConsoleFrameSourceKind,
1014 _source_cursor: &str,
1015 ) -> ConsoleLogResult<()> {
1016 Ok(())
1017 }
1018
1019 async fn source_watermark(
1020 &self,
1021 _runtime_key: &str,
1022 _source_kind: ConsoleFrameSourceKind,
1023 ) -> ConsoleLogResult<Option<String>> {
1024 Ok(None)
1025 }
1026 }
1027
1028 fn sample_frame(dedupe_key: &str, identity: &str) -> NewConsoleFrame {
1029 NewConsoleFrame {
1030 id: None,
1031 dedupe_key: dedupe_key.to_string(),
1032 timestamp_ms: 10,
1033 runtime_key: "runtime-a".to_string(),
1034 identity: identity.to_string(),
1035 conversation_id: Some(identity.to_string()),
1036 session_id: Some("session-1".to_string()),
1037 kind: "text_delta".to_string(),
1038 status: ConsoleFrameStatus::Delivered,
1039 payload: json!({ "delta": "hello" }),
1040 source: ConsoleFrameSource {
1041 kind: ConsoleFrameSourceKind::ConsoleEvent,
1042 source_cursor: None,
1043 },
1044 source_event_id: Some(dedupe_key.to_string()),
1045 interaction_id: None,
1046 turn_id: None,
1047 run_id: None,
1048 parent_frame_id: None,
1049 caused_by_frame_id: None,
1050 }
1051 }
1052
1053 #[tokio::test]
1054 async fn legacy_store_default_rejects_v04_window_queries_loudly() {
1055 let store = LegacyQueryOnlyStore;
1056
1057 let err = store
1058 .query_windowed_frames(ConsoleTimelineWindowQuery {
1059 mode: ConsoleTimelineMode::Recent,
1060 limit: 10,
1061 ..ConsoleTimelineWindowQuery::default()
1062 })
1063 .await
1064 .expect_err("legacy stores must implement recent windows explicitly");
1065 assert!(
1066 err.to_string()
1067 .contains("must implement query_windowed_frames")
1068 );
1069
1070 let err = store
1071 .query_windowed_frames(ConsoleTimelineWindowQuery {
1072 mode: ConsoleTimelineMode::Since,
1073 before: Some(ConsoleCursor::from_seq(10)),
1074 limit: 10,
1075 ..ConsoleTimelineWindowQuery::default()
1076 })
1077 .await
1078 .expect_err("legacy stores must implement before windows explicitly");
1079 assert!(
1080 err.to_string()
1081 .contains("must implement query_windowed_frames")
1082 );
1083
1084 let page = store
1085 .query_windowed_frames(ConsoleTimelineWindowQuery {
1086 mode: ConsoleTimelineMode::Since,
1087 limit: 10,
1088 ..ConsoleTimelineWindowQuery::default()
1089 })
1090 .await
1091 .expect("legacy since-only fallback remains source-compatible");
1092 assert!(page.frames.is_empty());
1093 }
1094
1095 #[tokio::test]
1096 async fn in_memory_log_assigns_monotonic_cursors_and_dedupes() {
1097 let store = InMemoryConsoleLogStore::new();
1098 let first = store
1099 .append_if_absent(sample_frame("event-1", "agent-a"))
1100 .await
1101 .expect("append first");
1102 let duplicate = store
1103 .append_if_absent(sample_frame("event-1", "agent-a"))
1104 .await
1105 .expect("append duplicate");
1106 let second = store
1107 .append_if_absent(sample_frame("event-2", "agent-a"))
1108 .await
1109 .expect("append second");
1110
1111 assert_eq!(first.disposition, AppendDisposition::Inserted);
1112 assert_eq!(duplicate.disposition, AppendDisposition::Existing);
1113 assert_eq!(first.frame.cursor.seq(), Some(1));
1114 assert_eq!(second.frame.cursor.seq(), Some(2));
1115 }
1116
1117 #[tokio::test]
1118 async fn sqlite_log_queries_by_identity_and_cursor() {
1119 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1120 let first = store
1121 .append_if_absent(sample_frame("event-1", "agent-a"))
1122 .await
1123 .expect("append first");
1124 store
1125 .append_if_absent(sample_frame("event-2", "agent-b"))
1126 .await
1127 .expect("append second");
1128 store
1129 .append_if_absent(sample_frame("event-3", "agent-a"))
1130 .await
1131 .expect("append third");
1132
1133 let page = store
1134 .query_windowed_frames(ConsoleTimelineWindowQuery {
1135 identity: Some("agent-a".to_string()),
1136 after: Some(first.frame.cursor),
1137 limit: 10,
1138 ..ConsoleTimelineWindowQuery::default()
1139 })
1140 .await
1141 .expect("query");
1142 assert_eq!(page.frames.len(), 1);
1143 assert_eq!(page.frames[0].dedupe_key, "event-3");
1144 }
1145
1146 #[tokio::test]
1147 async fn in_memory_log_queries_recent_window_in_display_order() {
1148 let store = InMemoryConsoleLogStore::new();
1149 for index in 1..=6 {
1150 store
1151 .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1152 .await
1153 .expect("append frame");
1154 }
1155
1156 let page = store
1157 .query_windowed_frames(ConsoleTimelineWindowQuery {
1158 identity: Some("agent-a".to_string()),
1159 mode: ConsoleTimelineMode::Recent,
1160 limit: 3,
1161 ..ConsoleTimelineWindowQuery::default()
1162 })
1163 .await
1164 .expect("query recent");
1165 assert_eq!(
1166 page.frames
1167 .iter()
1168 .map(|frame| frame.dedupe_key.as_str())
1169 .collect::<Vec<_>>(),
1170 vec!["event-4", "event-5", "event-6"]
1171 );
1172 assert_eq!(
1173 page.next_cursor.as_ref().and_then(ConsoleCursor::seq),
1174 Some(6)
1175 );
1176 assert_eq!(
1177 page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1178 Some(6)
1179 );
1180 assert!(!page.exhausted);
1181
1182 let older = store
1183 .query_windowed_frames(ConsoleTimelineWindowQuery {
1184 identity: Some("agent-a".to_string()),
1185 mode: ConsoleTimelineMode::Recent,
1186 before: page.frames.first().map(|frame| frame.cursor.clone()),
1187 limit: 3,
1188 ..ConsoleTimelineWindowQuery::default()
1189 })
1190 .await
1191 .expect("query older");
1192 assert_eq!(
1193 older
1194 .frames
1195 .iter()
1196 .map(|frame| frame.dedupe_key.as_str())
1197 .collect::<Vec<_>>(),
1198 vec!["event-1", "event-2", "event-3"]
1199 );
1200 assert_eq!(
1201 older.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1202 Some(3)
1203 );
1204 }
1205
1206 #[tokio::test]
1207 async fn in_memory_log_queries_sparse_identity_recent_window_without_global_tail_scan() {
1208 let store = InMemoryConsoleLogStore::new();
1209 store
1210 .append_if_absent(sample_frame("sparse-event", "sparse-agent"))
1211 .await
1212 .expect("append sparse frame");
1213 for index in 1..=25_000 {
1214 store
1215 .append_if_absent(sample_frame(&format!("busy-event-{index}"), "busy-agent"))
1216 .await
1217 .expect("append busy frame");
1218 }
1219
1220 let page = store
1221 .query_windowed_frames(ConsoleTimelineWindowQuery {
1222 identity: Some("sparse-agent".to_string()),
1223 mode: ConsoleTimelineMode::Recent,
1224 limit: 10,
1225 ..ConsoleTimelineWindowQuery::default()
1226 })
1227 .await
1228 .expect("query sparse recent");
1229
1230 assert_eq!(page.frames.len(), 1);
1231 assert_eq!(page.frames[0].dedupe_key, "sparse-event");
1232 assert_eq!(
1233 page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1234 Some(1)
1235 );
1236 }
1237
1238 #[tokio::test]
1239 async fn sqlite_log_queries_250k_sparse_identity_recent_window_with_index() {
1240 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1241 {
1242 let mut conn = store.conn.lock().expect("sqlite lock");
1243 let tx = conn.transaction().expect("begin transaction");
1244 {
1245 let mut insert = tx
1246 .prepare(
1247 "INSERT INTO console_frames (
1248 id, dedupe_key, timestamp_ms, runtime_key, identity,
1249 conversation_id, session_id, kind, status, frame_version, updated_at_ms,
1250 payload_json, source_kind, source_cursor, source_event_id,
1251 interaction_id, parent_frame_id, caused_by_frame_id, turn_id, run_id
1252 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, NULL, ?12, NULL, NULL, NULL, NULL, NULL)",
1253 )
1254 .expect("prepare insert");
1255 insert
1256 .execute(rusqlite::params![
1257 "sparse-frame",
1258 "sparse-event",
1259 1_i64,
1260 "runtime-a",
1261 "sparse-agent",
1262 "sparse-agent",
1263 "session-sparse",
1264 "text_complete",
1265 ConsoleFrameStatus::Completed.as_str(),
1266 r#"{"text":"still visible"}"#,
1267 ConsoleFrameSourceKind::ConsoleEvent.as_str(),
1268 "sparse-event",
1269 ])
1270 .expect("insert sparse frame");
1271 for index in 2..=250_000_i64 {
1272 insert
1273 .execute(rusqlite::params![
1274 format!("busy-frame-{index}"),
1275 format!("busy-event-{index}"),
1276 index,
1277 "runtime-a",
1278 "busy-agent",
1279 "busy-agent",
1280 "session-busy",
1281 "text_delta",
1282 ConsoleFrameStatus::Completed.as_str(),
1283 format!(r#"{{"delta":{index}}}"#),
1284 ConsoleFrameSourceKind::ConsoleEvent.as_str(),
1285 format!("busy-event-{index}"),
1286 ])
1287 .expect("insert busy frame");
1288 }
1289 }
1290 let plan = tx
1291 .prepare(
1292 "EXPLAIN QUERY PLAN
1293 SELECT cursor_seq FROM console_frames
1294 WHERE cursor_seq > ?1 AND cursor_seq < ?2 AND identity = ?3
1295 ORDER BY cursor_seq DESC LIMIT ?4",
1296 )
1297 .expect("prepare query plan")
1298 .query_map(
1299 rusqlite::params![0_i64, i64::MAX, "sparse-agent", 11_i64],
1300 |row| row.get::<_, String>(3),
1301 )
1302 .expect("run query plan")
1303 .collect::<Result<Vec<_>, _>>()
1304 .expect("collect query plan")
1305 .join("\n")
1306 .to_lowercase();
1307 assert!(
1308 plan.contains("idx_console_frames_identity_cursor"),
1309 "sparse identity recent query should use identity/cursor index; plan was: {plan}"
1310 );
1311 tx.commit().expect("commit transaction");
1312 }
1313
1314 let page = store
1315 .query_windowed_frames(ConsoleTimelineWindowQuery {
1316 identity: Some("sparse-agent".to_string()),
1317 mode: ConsoleTimelineMode::Recent,
1318 limit: 10,
1319 ..ConsoleTimelineWindowQuery::default()
1320 })
1321 .await
1322 .expect("query sparse recent");
1323
1324 assert_eq!(page.frames.len(), 1);
1325 assert_eq!(page.frames[0].dedupe_key, "sparse-event");
1326 assert_eq!(
1327 page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1328 Some(1)
1329 );
1330 }
1331
1332 #[tokio::test]
1333 async fn sqlite_log_queries_recent_window_with_before_cursor() {
1334 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1335 for index in 1..=6 {
1336 store
1337 .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1338 .await
1339 .expect("append frame");
1340 }
1341
1342 let page = store
1343 .query_windowed_frames(ConsoleTimelineWindowQuery {
1344 identity: Some("agent-a".to_string()),
1345 mode: ConsoleTimelineMode::Recent,
1346 limit: 2,
1347 ..ConsoleTimelineWindowQuery::default()
1348 })
1349 .await
1350 .expect("query recent");
1351 assert_eq!(
1352 page.frames
1353 .iter()
1354 .map(|frame| frame.dedupe_key.as_str())
1355 .collect::<Vec<_>>(),
1356 vec!["event-5", "event-6"]
1357 );
1358
1359 let older = store
1360 .query_windowed_frames(ConsoleTimelineWindowQuery {
1361 identity: Some("agent-a".to_string()),
1362 mode: ConsoleTimelineMode::Recent,
1363 before: page.frames.first().map(|frame| frame.cursor.clone()),
1364 limit: 2,
1365 ..ConsoleTimelineWindowQuery::default()
1366 })
1367 .await
1368 .expect("query older");
1369 assert_eq!(
1370 older
1371 .frames
1372 .iter()
1373 .map(|frame| frame.dedupe_key.as_str())
1374 .collect::<Vec<_>>(),
1375 vec!["event-3", "event-4"]
1376 );
1377 assert_eq!(
1378 older.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1379 Some(4)
1380 );
1381 }
1382
1383 #[tokio::test]
1384 async fn sqlite_log_reports_exhausted_on_exact_size_recent_final_page() {
1385 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1386 for index in 1..=400 {
1387 store
1388 .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1389 .await
1390 .expect("append frame");
1391 }
1392
1393 let first = store
1394 .query_windowed_frames(ConsoleTimelineWindowQuery {
1395 identity: Some("agent-a".to_string()),
1396 mode: ConsoleTimelineMode::Recent,
1397 limit: 200,
1398 ..ConsoleTimelineWindowQuery::default()
1399 })
1400 .await
1401 .expect("query recent");
1402 assert!(!first.exhausted);
1403 assert_eq!(first.frames[0].dedupe_key, "event-201");
1404
1405 let older = store
1406 .query_windowed_frames(ConsoleTimelineWindowQuery {
1407 identity: Some("agent-a".to_string()),
1408 mode: ConsoleTimelineMode::Recent,
1409 before: first.frames.first().map(|frame| frame.cursor.clone()),
1410 limit: 200,
1411 ..ConsoleTimelineWindowQuery::default()
1412 })
1413 .await
1414 .expect("query older");
1415 assert!(older.exhausted);
1416 assert_eq!(older.frames.len(), 200);
1417 assert_eq!(older.frames[0].dedupe_key, "event-1");
1418 }
1419
1420 #[tokio::test]
1421 async fn sqlite_log_rejects_out_of_range_console_cursors() {
1422 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1423 store
1424 .append_if_absent(sample_frame("event-1", "agent-a"))
1425 .await
1426 .expect("append frame");
1427
1428 let err = store
1429 .query_windowed_frames(ConsoleTimelineWindowQuery {
1430 after: Some(ConsoleCursor::from("console:9223372036854775808")),
1431 limit: 10,
1432 ..ConsoleTimelineWindowQuery::default()
1433 })
1434 .await
1435 .expect_err("oversized after cursor should be rejected");
1436 assert!(err.to_string().contains("out of range"));
1437
1438 let err = store
1439 .query_windowed_frames(ConsoleTimelineWindowQuery {
1440 before: Some(ConsoleCursor::from("console:9223372036854775808")),
1441 limit: 10,
1442 ..ConsoleTimelineWindowQuery::default()
1443 })
1444 .await
1445 .expect_err("oversized before cursor should be rejected");
1446 assert!(err.to_string().contains("out of range"));
1447 }
1448
1449 #[tokio::test]
1450 async fn sqlite_log_updates_status() {
1451 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1452 let first = store
1453 .append_if_absent(sample_frame("event-1", "agent-a"))
1454 .await
1455 .expect("append first");
1456 let updated = store
1457 .update_frame_status(&first.frame.id, ConsoleFrameStatus::DeliveryFailed)
1458 .await
1459 .expect("update")
1460 .expect("updated frame");
1461 assert_eq!(updated.status, ConsoleFrameStatus::DeliveryFailed);
1462 assert_eq!(updated.frame_version, 2);
1463 assert!(updated.updated_at_ms.is_some());
1464 }
1465
1466 #[tokio::test]
1467 async fn sqlite_log_records_source_watermarks() {
1468 let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1469 store
1470 .record_source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent, "evt-99")
1471 .await
1472 .expect("record watermark");
1473 let watermark = store
1474 .source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent)
1475 .await
1476 .expect("read watermark");
1477 assert_eq!(watermark.as_deref(), Some("evt-99"));
1478 }
1479
1480 #[tokio::test]
1481 async fn sqlite_log_persists_frames_across_handles() {
1482 let temp_dir = tempfile::tempdir().expect("temp dir");
1483 let path = temp_dir.path().join("console.sqlite");
1484 let store = SqliteConsoleLogStore::open(&path).expect("open first handle");
1485 store
1486 .append_if_absent(sample_frame("event-1", "agent-a"))
1487 .await
1488 .expect("append frame");
1489 drop(store);
1490
1491 let reopened = SqliteConsoleLogStore::open(&path).expect("open second handle");
1492 let page = reopened
1493 .query_windowed_frames(ConsoleTimelineWindowQuery {
1494 identity: Some("agent-a".to_string()),
1495 limit: 10,
1496 ..ConsoleTimelineWindowQuery::default()
1497 })
1498 .await
1499 .expect("query frames");
1500 assert_eq!(page.frames.len(), 1);
1501 assert_eq!(page.frames[0].dedupe_key, "event-1");
1502 }
1503}