Skip to main content

meerkat_mobkit/console_aggregator/
store.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4
5use rusqlite::{Connection, OptionalExtension, params};
6use sha2::{Digest, Sha256};
7
8use super::types::{
9    AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
10    ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleTimelineMode, ConsoleTimelinePage,
11    ConsoleTimelineQuery, ConsoleTimelineWindowPage, ConsoleTimelineWindowQuery, NewConsoleFrame,
12};
13
14pub type ConsoleLogResult<T> = Result<T, ConsoleLogError>;
15
16pub type ConsoleLogError = Box<dyn std::error::Error + Send + Sync>;
17
18#[async_trait::async_trait]
19pub trait ConsoleLogStore: Send + Sync {
20    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome>;
21
22    async fn update_frame_status(
23        &self,
24        frame_id: &str,
25        status: ConsoleFrameStatus,
26    ) -> ConsoleLogResult<Option<ConsoleFrame>>;
27
28    async fn query_frames(
29        &self,
30        query: ConsoleTimelineQuery,
31    ) -> ConsoleLogResult<ConsoleTimelinePage>;
32
33    async fn query_windowed_frames(
34        &self,
35        query: ConsoleTimelineWindowQuery,
36    ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
37        if query.mode != ConsoleTimelineMode::Since || query.before.is_some() {
38            return Err(std::io::Error::other(
39                "console log store must implement query_windowed_frames for v0.4 timeline windows",
40            )
41            .into());
42        }
43        let page = self
44            .query_frames(ConsoleTimelineQuery {
45                identity: query.identity,
46                conversation_id: query.conversation_id,
47                after: query.after,
48                limit: query.limit,
49            })
50            .await?;
51        Ok(ConsoleTimelineWindowPage {
52            latest_cursor: page.next_cursor.clone(),
53            exhausted: false,
54            frames: page.frames,
55            next_cursor: page.next_cursor,
56        })
57    }
58
59    async fn frame_by_dedupe_key(&self, dedupe_key: &str)
60    -> ConsoleLogResult<Option<ConsoleFrame>>;
61
62    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>>;
63
64    async fn clear_frames(&self) -> ConsoleLogResult<()>;
65
66    async fn record_source_watermark(
67        &self,
68        runtime_key: &str,
69        source_kind: ConsoleFrameSourceKind,
70        source_cursor: &str,
71    ) -> ConsoleLogResult<()>;
72
73    async fn source_watermark(
74        &self,
75        runtime_key: &str,
76        source_kind: ConsoleFrameSourceKind,
77    ) -> ConsoleLogResult<Option<String>>;
78}
79
80#[derive(Default)]
81pub struct InMemoryConsoleLogStore {
82    state: Mutex<InMemoryState>,
83}
84
85#[derive(Default)]
86struct InMemoryState {
87    next_seq: u64,
88    frames: BTreeMap<u64, ConsoleFrame>,
89    dedupe_to_seq: HashMap<String, u64>,
90    id_to_seq: HashMap<String, u64>,
91    identity_to_seqs: HashMap<String, BTreeSet<u64>>,
92    conversation_to_seqs: HashMap<String, BTreeSet<u64>>,
93    watermarks: HashMap<(String, String), String>,
94}
95
96impl InMemoryConsoleLogStore {
97    pub fn new() -> Self {
98        Self {
99            state: Mutex::new(InMemoryState {
100                next_seq: 1,
101                frames: BTreeMap::new(),
102                dedupe_to_seq: HashMap::new(),
103                id_to_seq: HashMap::new(),
104                identity_to_seqs: HashMap::new(),
105                conversation_to_seqs: HashMap::new(),
106                watermarks: HashMap::new(),
107            }),
108        }
109    }
110}
111
112#[async_trait::async_trait]
113impl ConsoleLogStore for InMemoryConsoleLogStore {
114    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
115        let mut state = self
116            .state
117            .lock()
118            .map_err(|_| boxed_error("console log lock poisoned"))?;
119        if let Some(seq) = state.dedupe_to_seq.get(&frame.dedupe_key).copied()
120            && let Some(existing) = state.frames.get(&seq)
121        {
122            return Ok(AppendOutcome {
123                disposition: AppendDisposition::Existing,
124                frame: existing.clone(),
125            });
126        }
127
128        let seq = state.next_seq;
129        state.next_seq = state.next_seq.saturating_add(1);
130        let id = frame
131            .id
132            .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
133        let frame = ConsoleFrame {
134            id: id.clone(),
135            cursor: ConsoleCursor::from_seq(seq),
136            dedupe_key: frame.dedupe_key,
137            timestamp_ms: frame.timestamp_ms,
138            runtime_key: frame.runtime_key,
139            identity: frame.identity,
140            conversation_id: frame.conversation_id,
141            session_id: frame.session_id,
142            kind: frame.kind,
143            status: frame.status,
144            frame_version: 1,
145            updated_at_ms: None,
146            payload: frame.payload,
147            source: frame.source,
148            source_event_id: frame.source_event_id,
149            interaction_id: frame.interaction_id,
150            turn_id: frame.turn_id,
151            run_id: frame.run_id,
152            parent_frame_id: frame.parent_frame_id,
153            caused_by_frame_id: frame.caused_by_frame_id,
154        };
155        state.dedupe_to_seq.insert(frame.dedupe_key.clone(), seq);
156        state.id_to_seq.insert(id, seq);
157        state
158            .identity_to_seqs
159            .entry(frame.identity.clone())
160            .or_default()
161            .insert(seq);
162        if let Some(conversation_id) = frame.conversation_id.as_ref() {
163            state
164                .conversation_to_seqs
165                .entry(conversation_id.clone())
166                .or_default()
167                .insert(seq);
168        }
169        state.frames.insert(seq, frame.clone());
170        Ok(AppendOutcome {
171            disposition: AppendDisposition::Inserted,
172            frame,
173        })
174    }
175
176    async fn update_frame_status(
177        &self,
178        frame_id: &str,
179        status: ConsoleFrameStatus,
180    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
181        let mut state = self
182            .state
183            .lock()
184            .map_err(|_| boxed_error("console log lock poisoned"))?;
185        let Some(seq) = state.id_to_seq.get(frame_id).copied() else {
186            return Ok(None);
187        };
188        let Some(frame) = state.frames.get_mut(&seq) else {
189            return Ok(None);
190        };
191        frame.status = status;
192        frame.frame_version = frame.frame_version.saturating_add(1);
193        frame.updated_at_ms = Some(current_time_ms());
194        Ok(Some(frame.clone()))
195    }
196
197    async fn query_frames(
198        &self,
199        query: ConsoleTimelineQuery,
200    ) -> ConsoleLogResult<ConsoleTimelinePage> {
201        let page = self.query_windowed_frames(query.into()).await?;
202        Ok(ConsoleTimelinePage {
203            frames: page.frames,
204            next_cursor: page.next_cursor,
205        })
206    }
207
208    async fn query_windowed_frames(
209        &self,
210        query: ConsoleTimelineWindowQuery,
211    ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
212        let after_seq = query.after.as_ref().map(cursor_seq).transpose()?;
213        let before_seq = query.before.as_ref().map(cursor_seq).transpose()?;
214        let limit = normalize_limit(query.limit);
215        let scan_limit = limit.saturating_add(1);
216        let state = self
217            .state
218            .lock()
219            .map_err(|_| boxed_error("console log lock poisoned"))?;
220        let frame_matches = |seq: u64, frame: &ConsoleFrame| -> bool {
221            if after_seq.is_some_and(|after| seq <= after) {
222                return false;
223            }
224            if before_seq.is_some_and(|before| seq >= before) {
225                return false;
226            }
227            if let Some(identity) = query.identity.as_deref()
228                && frame.identity != identity
229            {
230                return false;
231            }
232            if let Some(conversation_id) = query.conversation_id.as_deref()
233                && frame.conversation_id.as_deref() != Some(conversation_id)
234            {
235                return false;
236            }
237            true
238        };
239        if let (Some(identity), Some(conversation_id)) =
240            (query.identity.as_deref(), query.conversation_id.as_deref())
241        {
242            let identity_seqs = state.identity_to_seqs.get(identity);
243            let conversation_seqs = state.conversation_to_seqs.get(conversation_id);
244            return match (identity_seqs, conversation_seqs) {
245                (Some(left), Some(right)) if left.len() <= right.len() => {
246                    Ok(in_memory_window_from_seq_iters(
247                        &state,
248                        query.mode,
249                        (limit, scan_limit),
250                        left.iter().copied().filter(|seq| right.contains(seq)),
251                        left.iter().rev().copied().filter(|seq| right.contains(seq)),
252                        left.iter().rev().copied().filter(|seq| right.contains(seq)),
253                        &frame_matches,
254                    ))
255                }
256                (Some(left), Some(right)) => Ok(in_memory_window_from_seq_iters(
257                    &state,
258                    query.mode,
259                    (limit, scan_limit),
260                    right.iter().copied().filter(|seq| left.contains(seq)),
261                    right.iter().rev().copied().filter(|seq| left.contains(seq)),
262                    right.iter().rev().copied().filter(|seq| left.contains(seq)),
263                    &frame_matches,
264                )),
265                _ => Ok(empty_window()),
266            };
267        }
268        if let Some(identity) = query.identity.as_deref() {
269            let Some(seqs) = state.identity_to_seqs.get(identity) else {
270                return Ok(empty_window());
271            };
272            return Ok(in_memory_window_from_seq_iters(
273                &state,
274                query.mode,
275                (limit, scan_limit),
276                seqs.iter().copied(),
277                seqs.iter().rev().copied(),
278                seqs.iter().rev().copied(),
279                &frame_matches,
280            ));
281        }
282        if let Some(conversation_id) = query.conversation_id.as_deref() {
283            let Some(seqs) = state.conversation_to_seqs.get(conversation_id) else {
284                return Ok(empty_window());
285            };
286            return Ok(in_memory_window_from_seq_iters(
287                &state,
288                query.mode,
289                (limit, scan_limit),
290                seqs.iter().copied(),
291                seqs.iter().rev().copied(),
292                seqs.iter().rev().copied(),
293                &frame_matches,
294            ));
295        }
296        Ok(in_memory_window_from_seq_iters(
297            &state,
298            query.mode,
299            (limit, scan_limit),
300            state.frames.keys().copied(),
301            state.frames.keys().rev().copied(),
302            state.frames.keys().rev().copied(),
303            &frame_matches,
304        ))
305    }
306
307    async fn frame_by_dedupe_key(
308        &self,
309        dedupe_key: &str,
310    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
311        let state = self
312            .state
313            .lock()
314            .map_err(|_| boxed_error("console log lock poisoned"))?;
315        let Some(seq) = state.dedupe_to_seq.get(dedupe_key).copied() else {
316            return Ok(None);
317        };
318        Ok(state.frames.get(&seq).cloned())
319    }
320
321    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
322        let state = self
323            .state
324            .lock()
325            .map_err(|_| boxed_error("console log lock poisoned"))?;
326        Ok(state
327            .frames
328            .keys()
329            .next_back()
330            .copied()
331            .map(ConsoleCursor::from_seq))
332    }
333
334    async fn clear_frames(&self) -> ConsoleLogResult<()> {
335        let mut state = self
336            .state
337            .lock()
338            .map_err(|_| boxed_error("console log lock poisoned"))?;
339        state.frames.clear();
340        state.dedupe_to_seq.clear();
341        state.id_to_seq.clear();
342        state.identity_to_seqs.clear();
343        state.conversation_to_seqs.clear();
344        state.next_seq = 1;
345        Ok(())
346    }
347
348    async fn record_source_watermark(
349        &self,
350        runtime_key: &str,
351        source_kind: ConsoleFrameSourceKind,
352        source_cursor: &str,
353    ) -> ConsoleLogResult<()> {
354        let mut state = self
355            .state
356            .lock()
357            .map_err(|_| boxed_error("console log lock poisoned"))?;
358        state.watermarks.insert(
359            (runtime_key.to_string(), source_kind.as_str().to_string()),
360            source_cursor.to_string(),
361        );
362        Ok(())
363    }
364
365    async fn source_watermark(
366        &self,
367        runtime_key: &str,
368        source_kind: ConsoleFrameSourceKind,
369    ) -> ConsoleLogResult<Option<String>> {
370        let state = self
371            .state
372            .lock()
373            .map_err(|_| boxed_error("console log lock poisoned"))?;
374        Ok(state
375            .watermarks
376            .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
377            .cloned())
378    }
379}
380
381fn empty_window() -> ConsoleTimelineWindowPage {
382    ConsoleTimelineWindowPage {
383        frames: Vec::new(),
384        next_cursor: None,
385        latest_cursor: None,
386        exhausted: true,
387    }
388}
389
390fn in_memory_window_from_seq_iters<IForward, IReverse, ILatest, F>(
391    state: &InMemoryState,
392    mode: ConsoleTimelineMode,
393    limits: (usize, usize),
394    forward_iter: IForward,
395    reverse_iter: IReverse,
396    mut latest_iter: ILatest,
397    frame_matches: &F,
398) -> ConsoleTimelineWindowPage
399where
400    IForward: Iterator<Item = u64>,
401    IReverse: Iterator<Item = u64>,
402    ILatest: Iterator<Item = u64>,
403    F: Fn(u64, &ConsoleFrame) -> bool,
404{
405    let (limit, scan_limit) = limits;
406    let mut frames = match mode {
407        ConsoleTimelineMode::Since => forward_iter
408            .filter_map(|seq| {
409                let frame = state.frames.get(&seq)?;
410                frame_matches(seq, frame).then(|| frame.clone())
411            })
412            .take(scan_limit)
413            .collect::<Vec<_>>(),
414        ConsoleTimelineMode::Recent => {
415            let mut frames = reverse_iter
416                .filter_map(|seq| {
417                    let frame = state.frames.get(&seq)?;
418                    frame_matches(seq, frame).then(|| frame.clone())
419                })
420                .take(scan_limit)
421                .collect::<Vec<_>>();
422            frames.reverse();
423            frames
424        }
425    };
426    let exhausted = frames.len() <= limit;
427    if frames.len() > limit {
428        match mode {
429            ConsoleTimelineMode::Since => frames.truncate(limit),
430            ConsoleTimelineMode::Recent => {
431                frames.remove(0);
432            }
433        }
434    }
435    let next_cursor = frames.last().map(|frame| frame.cursor.clone());
436    let latest_cursor = match mode {
437        ConsoleTimelineMode::Since => latest_iter.find_map(|seq| {
438            let frame = state.frames.get(&seq)?;
439            frame_matches(seq, frame).then(|| frame.cursor.clone())
440        }),
441        ConsoleTimelineMode::Recent => next_cursor.clone(),
442    };
443    ConsoleTimelineWindowPage {
444        frames,
445        next_cursor,
446        latest_cursor,
447        exhausted,
448    }
449}
450
451pub struct SqliteConsoleLogStore {
452    conn: Arc<Mutex<Connection>>,
453    watermarks: Arc<Mutex<HashMap<(String, String), String>>>,
454}
455
456impl SqliteConsoleLogStore {
457    pub fn open(path: impl AsRef<Path>) -> ConsoleLogResult<Self> {
458        let conn = Connection::open(path).map_err(into_boxed)?;
459        Self::from_connection(conn)
460    }
461
462    pub fn in_memory() -> ConsoleLogResult<Self> {
463        let conn = Connection::open_in_memory().map_err(into_boxed)?;
464        Self::from_connection(conn)
465    }
466
467    fn from_connection(conn: Connection) -> ConsoleLogResult<Self> {
468        initialize_schema(&conn)?;
469        let watermarks = load_source_watermarks(&conn)?;
470        Ok(Self {
471            conn: Arc::new(Mutex::new(conn)),
472            watermarks: Arc::new(Mutex::new(watermarks)),
473        })
474    }
475}
476
477#[async_trait::async_trait]
478impl ConsoleLogStore for SqliteConsoleLogStore {
479    async fn append_if_absent(&self, frame: NewConsoleFrame) -> ConsoleLogResult<AppendOutcome> {
480        let conn = self
481            .conn
482            .lock()
483            .map_err(|_| boxed_error("console log lock poisoned"))?;
484        if let Some(existing) = select_frame_by_dedupe(&conn, &frame.dedupe_key)? {
485            return Ok(AppendOutcome {
486                disposition: AppendDisposition::Existing,
487                frame: existing,
488            });
489        }
490
491        let id = frame
492            .id
493            .clone()
494            .unwrap_or_else(|| stable_frame_id(&frame.dedupe_key));
495        let payload_json = serde_json::to_string(&frame.payload).map_err(into_boxed)?;
496        conn.execute(
497            "INSERT INTO console_frames (
498                id, dedupe_key, timestamp_ms, runtime_key, identity,
499                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
500                source_kind, source_cursor, source_event_id, interaction_id,
501                parent_frame_id, caused_by_frame_id, turn_id, run_id
502            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)",
503            params![
504                id,
505                frame.dedupe_key,
506                frame.timestamp_ms as i64,
507                frame.runtime_key,
508                frame.identity,
509                frame.conversation_id,
510                frame.session_id,
511                frame.kind,
512                frame.status.as_str(),
513                payload_json,
514                frame.source.kind.as_str(),
515                frame.source.source_cursor,
516                frame.source_event_id,
517                frame.interaction_id,
518                frame.parent_frame_id,
519                frame.caused_by_frame_id,
520                frame.turn_id,
521                frame.run_id,
522            ],
523        )
524        .map_err(into_boxed)?;
525        let inserted = select_frame_by_dedupe(&conn, &frame.dedupe_key)?
526            .ok_or_else(|| boxed_error("inserted console frame was not readable"))?;
527        Ok(AppendOutcome {
528            disposition: AppendDisposition::Inserted,
529            frame: inserted,
530        })
531    }
532
533    async fn update_frame_status(
534        &self,
535        frame_id: &str,
536        status: ConsoleFrameStatus,
537    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
538        let conn = self
539            .conn
540            .lock()
541            .map_err(|_| boxed_error("console log lock poisoned"))?;
542        conn.execute(
543            "UPDATE console_frames SET status = ?1, frame_version = frame_version + 1, updated_at_ms = ?2 WHERE id = ?3",
544            params![status.as_str(), current_time_ms() as i64, frame_id],
545        )
546        .map_err(into_boxed)?;
547        select_frame_by_id(&conn, frame_id)
548    }
549
550    async fn query_frames(
551        &self,
552        query: ConsoleTimelineQuery,
553    ) -> ConsoleLogResult<ConsoleTimelinePage> {
554        let page = self.query_windowed_frames(query.into()).await?;
555        Ok(ConsoleTimelinePage {
556            frames: page.frames,
557            next_cursor: page.next_cursor,
558        })
559    }
560
561    async fn query_windowed_frames(
562        &self,
563        query: ConsoleTimelineWindowQuery,
564    ) -> ConsoleLogResult<ConsoleTimelineWindowPage> {
565        let after_seq = query.after.as_ref().map(cursor_seq_i64).transpose()?;
566        let before_seq = query.before.as_ref().map(cursor_seq_i64).transpose()?;
567        let limit = normalize_limit(query.limit);
568        let scan_limit = limit.saturating_add(1);
569        let conn = self
570            .conn
571            .lock()
572            .map_err(|_| boxed_error("console log lock poisoned"))?;
573        let mut sql = String::from(
574            "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
575                    conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
576                    source_kind, source_cursor, source_event_id, interaction_id,
577                    parent_frame_id, caused_by_frame_id, turn_id, run_id
578             FROM console_frames WHERE cursor_seq > ?1 AND cursor_seq < ?2",
579        );
580        let mut next_param = 3usize;
581        if query.identity.is_some() {
582            sql.push_str(" AND identity = ?");
583            sql.push_str(&next_param.to_string());
584            next_param += 1;
585        }
586        if query.conversation_id.is_some() {
587            sql.push_str(" AND conversation_id = ?");
588            sql.push_str(&next_param.to_string());
589            next_param += 1;
590        }
591        match query.mode {
592            ConsoleTimelineMode::Since => sql.push_str(" ORDER BY cursor_seq ASC LIMIT ?"),
593            ConsoleTimelineMode::Recent => sql.push_str(" ORDER BY cursor_seq DESC LIMIT ?"),
594        }
595        sql.push_str(&next_param.to_string());
596
597        let after = after_seq.unwrap_or(0);
598        let before = before_seq.unwrap_or(i64::MAX);
599        let mut values = vec![
600            rusqlite::types::Value::Integer(after),
601            rusqlite::types::Value::Integer(before),
602        ];
603        if let Some(identity) = query.identity.as_ref() {
604            values.push(rusqlite::types::Value::Text(identity.clone()));
605        }
606        if let Some(conversation_id) = query.conversation_id.as_ref() {
607            values.push(rusqlite::types::Value::Text(conversation_id.clone()));
608        }
609        values.push(rusqlite::types::Value::Integer(scan_limit as i64));
610        let mut frames = query_sql_frames(&conn, &sql, rusqlite::params_from_iter(values))?;
611        if query.mode == ConsoleTimelineMode::Recent {
612            frames.reverse();
613        }
614        let exhausted = frames.len() <= limit;
615        if frames.len() > limit {
616            match query.mode {
617                ConsoleTimelineMode::Since => frames.truncate(limit),
618                ConsoleTimelineMode::Recent => {
619                    frames.remove(0);
620                }
621            }
622        }
623        let latest_cursor = latest_matching_cursor(
624            &conn,
625            after,
626            before,
627            query.identity.as_deref(),
628            query.conversation_id.as_deref(),
629        )?;
630        let next_cursor = frames.last().map(|frame| frame.cursor.clone());
631        Ok(ConsoleTimelineWindowPage {
632            frames,
633            next_cursor,
634            latest_cursor,
635            exhausted,
636        })
637    }
638
639    async fn frame_by_dedupe_key(
640        &self,
641        dedupe_key: &str,
642    ) -> ConsoleLogResult<Option<ConsoleFrame>> {
643        let conn = self
644            .conn
645            .lock()
646            .map_err(|_| boxed_error("console log lock poisoned"))?;
647        select_frame_by_dedupe(&conn, dedupe_key)
648    }
649
650    async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
651        let conn = self
652            .conn
653            .lock()
654            .map_err(|_| boxed_error("console log lock poisoned"))?;
655        let seq: Option<i64> = conn
656            .query_row(
657                "SELECT cursor_seq FROM console_frames ORDER BY cursor_seq DESC LIMIT 1",
658                [],
659                |row| row.get(0),
660            )
661            .optional()
662            .map_err(into_boxed)?;
663        Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
664    }
665
666    async fn clear_frames(&self) -> ConsoleLogResult<()> {
667        let conn = self
668            .conn
669            .lock()
670            .map_err(|_| boxed_error("console log lock poisoned"))?;
671        conn.execute("DELETE FROM console_frames", [])
672            .map_err(into_boxed)?;
673        conn.execute(
674            "DELETE FROM sqlite_sequence WHERE name = 'console_frames'",
675            [],
676        )
677        .ok();
678        Ok(())
679    }
680
681    async fn record_source_watermark(
682        &self,
683        runtime_key: &str,
684        source_kind: ConsoleFrameSourceKind,
685        source_cursor: &str,
686    ) -> ConsoleLogResult<()> {
687        let conn = self
688            .conn
689            .lock()
690            .map_err(|_| boxed_error("console log lock poisoned"))?;
691        conn.execute(
692            "INSERT INTO console_source_watermarks (
693                runtime_key, source_kind, source_cursor, last_ingested_at_ms
694            ) VALUES (?1, ?2, ?3, ?4)
695            ON CONFLICT(runtime_key, source_kind) DO UPDATE SET
696                source_cursor = excluded.source_cursor,
697                last_ingested_at_ms = excluded.last_ingested_at_ms",
698            params![
699                runtime_key,
700                source_kind.as_str(),
701                source_cursor,
702                current_time_ms() as i64,
703            ],
704        )
705        .map_err(into_boxed)?;
706        self.watermarks
707            .lock()
708            .map_err(|_| boxed_error("console watermark lock poisoned"))?
709            .insert(
710                (runtime_key.to_string(), source_kind.as_str().to_string()),
711                source_cursor.to_string(),
712            );
713        Ok(())
714    }
715
716    async fn source_watermark(
717        &self,
718        runtime_key: &str,
719        source_kind: ConsoleFrameSourceKind,
720    ) -> ConsoleLogResult<Option<String>> {
721        let watermarks = self
722            .watermarks
723            .lock()
724            .map_err(|_| boxed_error("console watermark lock poisoned"))?;
725        Ok(watermarks
726            .get(&(runtime_key.to_string(), source_kind.as_str().to_string()))
727            .cloned())
728    }
729}
730
731fn load_source_watermarks(
732    conn: &Connection,
733) -> ConsoleLogResult<HashMap<(String, String), String>> {
734    let mut stmt = conn
735        .prepare(
736            "SELECT runtime_key, source_kind, source_cursor
737             FROM console_source_watermarks",
738        )
739        .map_err(into_boxed)?;
740    let rows = stmt
741        .query_map([], |row| {
742            Ok((
743                (row.get::<_, String>(0)?, row.get::<_, String>(1)?),
744                row.get::<_, String>(2)?,
745            ))
746        })
747        .map_err(into_boxed)?;
748    let mut watermarks = HashMap::new();
749    for row in rows {
750        let (key, cursor) = row.map_err(into_boxed)?;
751        watermarks.insert(key, cursor);
752    }
753    Ok(watermarks)
754}
755
756fn initialize_schema(conn: &Connection) -> ConsoleLogResult<()> {
757    conn.execute_batch(
758        "CREATE TABLE IF NOT EXISTS console_frames (
759            cursor_seq INTEGER PRIMARY KEY AUTOINCREMENT,
760            id TEXT NOT NULL UNIQUE,
761            dedupe_key TEXT NOT NULL UNIQUE,
762            timestamp_ms INTEGER NOT NULL,
763            runtime_key TEXT NOT NULL,
764            identity TEXT NOT NULL,
765            conversation_id TEXT,
766            session_id TEXT,
767            kind TEXT NOT NULL,
768            status TEXT NOT NULL,
769            frame_version INTEGER NOT NULL DEFAULT 1,
770            updated_at_ms INTEGER,
771            payload_json TEXT NOT NULL,
772            source_kind TEXT NOT NULL,
773            source_cursor TEXT,
774            source_event_id TEXT,
775            interaction_id TEXT,
776            parent_frame_id TEXT,
777            caused_by_frame_id TEXT,
778            turn_id TEXT,
779            run_id TEXT
780        );
781        CREATE TABLE IF NOT EXISTS console_source_watermarks (
782            runtime_key TEXT NOT NULL,
783            source_kind TEXT NOT NULL,
784            source_cursor TEXT NOT NULL,
785            last_ingested_at_ms INTEGER NOT NULL,
786            PRIMARY KEY(runtime_key, source_kind)
787        );
788        CREATE INDEX IF NOT EXISTS idx_console_frames_identity_cursor
789            ON console_frames(identity, cursor_seq);
790        CREATE INDEX IF NOT EXISTS idx_console_frames_conversation_cursor
791            ON console_frames(conversation_id, cursor_seq);",
792    )
793    .map_err(into_boxed)
794}
795
796fn query_sql_frames<P: rusqlite::Params>(
797    conn: &Connection,
798    sql: &str,
799    params: P,
800) -> ConsoleLogResult<Vec<ConsoleFrame>> {
801    let mut stmt = conn.prepare(sql).map_err(into_boxed)?;
802    let rows = stmt.query_map(params, row_to_frame).map_err(into_boxed)?;
803    let mut frames = Vec::new();
804    for row in rows {
805        frames.push(row.map_err(into_boxed)?);
806    }
807    Ok(frames)
808}
809
810fn select_frame_by_dedupe(
811    conn: &Connection,
812    dedupe_key: &str,
813) -> ConsoleLogResult<Option<ConsoleFrame>> {
814    conn.query_row(
815        "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
816                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
817                source_kind, source_cursor, source_event_id, interaction_id,
818                parent_frame_id, caused_by_frame_id, turn_id, run_id
819         FROM console_frames WHERE dedupe_key = ?1",
820        params![dedupe_key],
821        row_to_frame,
822    )
823    .optional()
824    .map_err(into_boxed)
825}
826
827fn select_frame_by_id(conn: &Connection, id: &str) -> ConsoleLogResult<Option<ConsoleFrame>> {
828    conn.query_row(
829        "SELECT cursor_seq, id, dedupe_key, timestamp_ms, runtime_key, identity,
830                conversation_id, session_id, kind, status, frame_version, updated_at_ms, payload_json,
831                source_kind, source_cursor, source_event_id, interaction_id,
832                parent_frame_id, caused_by_frame_id, turn_id, run_id
833         FROM console_frames WHERE id = ?1",
834        params![id],
835        row_to_frame,
836    )
837    .optional()
838    .map_err(into_boxed)
839}
840
841fn latest_matching_cursor(
842    conn: &Connection,
843    after: i64,
844    before: i64,
845    identity: Option<&str>,
846    conversation_id: Option<&str>,
847) -> ConsoleLogResult<Option<ConsoleCursor>> {
848    let mut sql = String::from(
849        "SELECT cursor_seq FROM console_frames WHERE cursor_seq > ?1 AND cursor_seq < ?2",
850    );
851    if identity.is_some() {
852        sql.push_str(" AND identity = ?3");
853    }
854    if conversation_id.is_some() {
855        sql.push_str(" AND conversation_id = ?");
856        let next_param = 3 + usize::from(identity.is_some());
857        sql.push_str(&next_param.to_string());
858    }
859    sql.push_str(" ORDER BY cursor_seq DESC LIMIT 1");
860
861    let mut values = vec![
862        rusqlite::types::Value::Integer(after),
863        rusqlite::types::Value::Integer(before),
864    ];
865    if let Some(identity) = identity {
866        values.push(rusqlite::types::Value::Text(identity.to_string()));
867    }
868    if let Some(conversation_id) = conversation_id {
869        values.push(rusqlite::types::Value::Text(conversation_id.to_string()));
870    }
871    let seq: Option<i64> = conn
872        .query_row(&sql, rusqlite::params_from_iter(values), |row| row.get(0))
873        .optional()
874        .map_err(into_boxed)?;
875    Ok(seq.map(|value| ConsoleCursor::from_seq(value as u64)))
876}
877
878fn row_to_frame(row: &rusqlite::Row<'_>) -> rusqlite::Result<ConsoleFrame> {
879    let seq: i64 = row.get(0)?;
880    let payload_json: String = row.get(12)?;
881    let payload = serde_json::from_str(&payload_json).unwrap_or(serde_json::Value::Null);
882    let source_kind: String = row.get(13)?;
883    Ok(ConsoleFrame {
884        cursor: ConsoleCursor::from_seq(seq as u64),
885        id: row.get(1)?,
886        dedupe_key: row.get(2)?,
887        timestamp_ms: row.get::<_, i64>(3)? as u64,
888        runtime_key: row.get(4)?,
889        identity: row.get(5)?,
890        conversation_id: row.get(6)?,
891        session_id: row.get(7)?,
892        kind: row.get(8)?,
893        status: ConsoleFrameStatus::from_str(row.get::<_, String>(9)?.as_str()),
894        frame_version: row.get::<_, i64>(10)? as u64,
895        updated_at_ms: row.get::<_, Option<i64>>(11)?.map(|value| value as u64),
896        payload,
897        source: ConsoleFrameSource {
898            kind: ConsoleFrameSourceKind::from_str(&source_kind),
899            source_cursor: row.get(14)?,
900        },
901        source_event_id: row.get(15)?,
902        interaction_id: row.get(16)?,
903        parent_frame_id: row.get(17)?,
904        caused_by_frame_id: row.get(18)?,
905        turn_id: row.get(19)?,
906        run_id: row.get(20)?,
907    })
908}
909
910fn normalize_limit(limit: usize) -> usize {
911    limit.clamp(1, 1000)
912}
913
914fn cursor_seq(cursor: &ConsoleCursor) -> ConsoleLogResult<u64> {
915    cursor
916        .seq()
917        .ok_or_else(|| boxed_error(format!("invalid console cursor: {cursor}")))
918}
919
920fn cursor_seq_i64(cursor: &ConsoleCursor) -> ConsoleLogResult<i64> {
921    let seq = cursor_seq(cursor)?;
922    i64::try_from(seq).map_err(|_| boxed_error(format!("console cursor out of range: {cursor}")))
923}
924
925pub(crate) fn stable_frame_id(dedupe_key: &str) -> String {
926    let mut hasher = Sha256::new();
927    hasher.update(dedupe_key.as_bytes());
928    format!("console-frame-{}", to_hex(&hasher.finalize()))
929}
930
931fn to_hex(bytes: &[u8]) -> String {
932    const HEX: &[u8; 16] = b"0123456789abcdef";
933    let mut out = String::with_capacity(bytes.len() * 2);
934    for byte in bytes {
935        out.push(HEX[(byte >> 4) as usize] as char);
936        out.push(HEX[(byte & 0x0f) as usize] as char);
937    }
938    out
939}
940
941fn boxed_error(message: impl Into<String>) -> ConsoleLogError {
942    Box::new(std::io::Error::other(message.into()))
943}
944
945fn into_boxed<E>(error: E) -> ConsoleLogError
946where
947    E: std::error::Error + Send + Sync + 'static,
948{
949    Box::new(error)
950}
951
952fn current_time_ms() -> u64 {
953    match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
954        Ok(duration) => duration.as_millis() as u64,
955        Err(_) => 0,
956    }
957}
958
959#[cfg(test)]
960#[allow(clippy::expect_used)]
961mod tests {
962    use serde_json::json;
963
964    use super::*;
965
966    struct LegacyQueryOnlyStore;
967
968    #[async_trait::async_trait]
969    impl ConsoleLogStore for LegacyQueryOnlyStore {
970        async fn append_if_absent(
971            &self,
972            _frame: NewConsoleFrame,
973        ) -> ConsoleLogResult<AppendOutcome> {
974            Err(boxed_error("not implemented for test"))
975        }
976
977        async fn update_frame_status(
978            &self,
979            _frame_id: &str,
980            _status: ConsoleFrameStatus,
981        ) -> ConsoleLogResult<Option<ConsoleFrame>> {
982            Err(boxed_error("not implemented for test"))
983        }
984
985        async fn query_frames(
986            &self,
987            _query: ConsoleTimelineQuery,
988        ) -> ConsoleLogResult<ConsoleTimelinePage> {
989            Ok(ConsoleTimelinePage {
990                frames: Vec::new(),
991                next_cursor: None,
992            })
993        }
994
995        async fn frame_by_dedupe_key(
996            &self,
997            _dedupe_key: &str,
998        ) -> ConsoleLogResult<Option<ConsoleFrame>> {
999            Err(boxed_error("not implemented for test"))
1000        }
1001
1002        async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
1003            Ok(None)
1004        }
1005
1006        async fn clear_frames(&self) -> ConsoleLogResult<()> {
1007            Ok(())
1008        }
1009
1010        async fn record_source_watermark(
1011            &self,
1012            _runtime_key: &str,
1013            _source_kind: ConsoleFrameSourceKind,
1014            _source_cursor: &str,
1015        ) -> ConsoleLogResult<()> {
1016            Ok(())
1017        }
1018
1019        async fn source_watermark(
1020            &self,
1021            _runtime_key: &str,
1022            _source_kind: ConsoleFrameSourceKind,
1023        ) -> ConsoleLogResult<Option<String>> {
1024            Ok(None)
1025        }
1026    }
1027
1028    fn sample_frame(dedupe_key: &str, identity: &str) -> NewConsoleFrame {
1029        NewConsoleFrame {
1030            id: None,
1031            dedupe_key: dedupe_key.to_string(),
1032            timestamp_ms: 10,
1033            runtime_key: "runtime-a".to_string(),
1034            identity: identity.to_string(),
1035            conversation_id: Some(identity.to_string()),
1036            session_id: Some("session-1".to_string()),
1037            kind: "text_delta".to_string(),
1038            status: ConsoleFrameStatus::Delivered,
1039            payload: json!({ "delta": "hello" }),
1040            source: ConsoleFrameSource {
1041                kind: ConsoleFrameSourceKind::ConsoleEvent,
1042                source_cursor: None,
1043            },
1044            source_event_id: Some(dedupe_key.to_string()),
1045            interaction_id: None,
1046            turn_id: None,
1047            run_id: None,
1048            parent_frame_id: None,
1049            caused_by_frame_id: None,
1050        }
1051    }
1052
1053    #[tokio::test]
1054    async fn legacy_store_default_rejects_v04_window_queries_loudly() {
1055        let store = LegacyQueryOnlyStore;
1056
1057        let err = store
1058            .query_windowed_frames(ConsoleTimelineWindowQuery {
1059                mode: ConsoleTimelineMode::Recent,
1060                limit: 10,
1061                ..ConsoleTimelineWindowQuery::default()
1062            })
1063            .await
1064            .expect_err("legacy stores must implement recent windows explicitly");
1065        assert!(
1066            err.to_string()
1067                .contains("must implement query_windowed_frames")
1068        );
1069
1070        let err = store
1071            .query_windowed_frames(ConsoleTimelineWindowQuery {
1072                mode: ConsoleTimelineMode::Since,
1073                before: Some(ConsoleCursor::from_seq(10)),
1074                limit: 10,
1075                ..ConsoleTimelineWindowQuery::default()
1076            })
1077            .await
1078            .expect_err("legacy stores must implement before windows explicitly");
1079        assert!(
1080            err.to_string()
1081                .contains("must implement query_windowed_frames")
1082        );
1083
1084        let page = store
1085            .query_windowed_frames(ConsoleTimelineWindowQuery {
1086                mode: ConsoleTimelineMode::Since,
1087                limit: 10,
1088                ..ConsoleTimelineWindowQuery::default()
1089            })
1090            .await
1091            .expect("legacy since-only fallback remains source-compatible");
1092        assert!(page.frames.is_empty());
1093    }
1094
1095    #[tokio::test]
1096    async fn in_memory_log_assigns_monotonic_cursors_and_dedupes() {
1097        let store = InMemoryConsoleLogStore::new();
1098        let first = store
1099            .append_if_absent(sample_frame("event-1", "agent-a"))
1100            .await
1101            .expect("append first");
1102        let duplicate = store
1103            .append_if_absent(sample_frame("event-1", "agent-a"))
1104            .await
1105            .expect("append duplicate");
1106        let second = store
1107            .append_if_absent(sample_frame("event-2", "agent-a"))
1108            .await
1109            .expect("append second");
1110
1111        assert_eq!(first.disposition, AppendDisposition::Inserted);
1112        assert_eq!(duplicate.disposition, AppendDisposition::Existing);
1113        assert_eq!(first.frame.cursor.seq(), Some(1));
1114        assert_eq!(second.frame.cursor.seq(), Some(2));
1115    }
1116
1117    #[tokio::test]
1118    async fn sqlite_log_queries_by_identity_and_cursor() {
1119        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1120        let first = store
1121            .append_if_absent(sample_frame("event-1", "agent-a"))
1122            .await
1123            .expect("append first");
1124        store
1125            .append_if_absent(sample_frame("event-2", "agent-b"))
1126            .await
1127            .expect("append second");
1128        store
1129            .append_if_absent(sample_frame("event-3", "agent-a"))
1130            .await
1131            .expect("append third");
1132
1133        let page = store
1134            .query_windowed_frames(ConsoleTimelineWindowQuery {
1135                identity: Some("agent-a".to_string()),
1136                after: Some(first.frame.cursor),
1137                limit: 10,
1138                ..ConsoleTimelineWindowQuery::default()
1139            })
1140            .await
1141            .expect("query");
1142        assert_eq!(page.frames.len(), 1);
1143        assert_eq!(page.frames[0].dedupe_key, "event-3");
1144    }
1145
1146    #[tokio::test]
1147    async fn in_memory_log_queries_recent_window_in_display_order() {
1148        let store = InMemoryConsoleLogStore::new();
1149        for index in 1..=6 {
1150            store
1151                .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1152                .await
1153                .expect("append frame");
1154        }
1155
1156        let page = store
1157            .query_windowed_frames(ConsoleTimelineWindowQuery {
1158                identity: Some("agent-a".to_string()),
1159                mode: ConsoleTimelineMode::Recent,
1160                limit: 3,
1161                ..ConsoleTimelineWindowQuery::default()
1162            })
1163            .await
1164            .expect("query recent");
1165        assert_eq!(
1166            page.frames
1167                .iter()
1168                .map(|frame| frame.dedupe_key.as_str())
1169                .collect::<Vec<_>>(),
1170            vec!["event-4", "event-5", "event-6"]
1171        );
1172        assert_eq!(
1173            page.next_cursor.as_ref().and_then(ConsoleCursor::seq),
1174            Some(6)
1175        );
1176        assert_eq!(
1177            page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1178            Some(6)
1179        );
1180        assert!(!page.exhausted);
1181
1182        let older = store
1183            .query_windowed_frames(ConsoleTimelineWindowQuery {
1184                identity: Some("agent-a".to_string()),
1185                mode: ConsoleTimelineMode::Recent,
1186                before: page.frames.first().map(|frame| frame.cursor.clone()),
1187                limit: 3,
1188                ..ConsoleTimelineWindowQuery::default()
1189            })
1190            .await
1191            .expect("query older");
1192        assert_eq!(
1193            older
1194                .frames
1195                .iter()
1196                .map(|frame| frame.dedupe_key.as_str())
1197                .collect::<Vec<_>>(),
1198            vec!["event-1", "event-2", "event-3"]
1199        );
1200        assert_eq!(
1201            older.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1202            Some(3)
1203        );
1204    }
1205
1206    #[tokio::test]
1207    async fn in_memory_log_queries_sparse_identity_recent_window_without_global_tail_scan() {
1208        let store = InMemoryConsoleLogStore::new();
1209        store
1210            .append_if_absent(sample_frame("sparse-event", "sparse-agent"))
1211            .await
1212            .expect("append sparse frame");
1213        for index in 1..=25_000 {
1214            store
1215                .append_if_absent(sample_frame(&format!("busy-event-{index}"), "busy-agent"))
1216                .await
1217                .expect("append busy frame");
1218        }
1219
1220        let page = store
1221            .query_windowed_frames(ConsoleTimelineWindowQuery {
1222                identity: Some("sparse-agent".to_string()),
1223                mode: ConsoleTimelineMode::Recent,
1224                limit: 10,
1225                ..ConsoleTimelineWindowQuery::default()
1226            })
1227            .await
1228            .expect("query sparse recent");
1229
1230        assert_eq!(page.frames.len(), 1);
1231        assert_eq!(page.frames[0].dedupe_key, "sparse-event");
1232        assert_eq!(
1233            page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1234            Some(1)
1235        );
1236    }
1237
1238    #[tokio::test]
1239    async fn sqlite_log_queries_250k_sparse_identity_recent_window_with_index() {
1240        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1241        {
1242            let mut conn = store.conn.lock().expect("sqlite lock");
1243            let tx = conn.transaction().expect("begin transaction");
1244            {
1245                let mut insert = tx
1246                    .prepare(
1247                        "INSERT INTO console_frames (
1248                            id, dedupe_key, timestamp_ms, runtime_key, identity,
1249                            conversation_id, session_id, kind, status, frame_version, updated_at_ms,
1250                            payload_json, source_kind, source_cursor, source_event_id,
1251                            interaction_id, parent_frame_id, caused_by_frame_id, turn_id, run_id
1252                        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1, NULL, ?10, ?11, NULL, ?12, NULL, NULL, NULL, NULL, NULL)",
1253                    )
1254                    .expect("prepare insert");
1255                insert
1256                    .execute(rusqlite::params![
1257                        "sparse-frame",
1258                        "sparse-event",
1259                        1_i64,
1260                        "runtime-a",
1261                        "sparse-agent",
1262                        "sparse-agent",
1263                        "session-sparse",
1264                        "text_complete",
1265                        ConsoleFrameStatus::Completed.as_str(),
1266                        r#"{"text":"still visible"}"#,
1267                        ConsoleFrameSourceKind::ConsoleEvent.as_str(),
1268                        "sparse-event",
1269                    ])
1270                    .expect("insert sparse frame");
1271                for index in 2..=250_000_i64 {
1272                    insert
1273                        .execute(rusqlite::params![
1274                            format!("busy-frame-{index}"),
1275                            format!("busy-event-{index}"),
1276                            index,
1277                            "runtime-a",
1278                            "busy-agent",
1279                            "busy-agent",
1280                            "session-busy",
1281                            "text_delta",
1282                            ConsoleFrameStatus::Completed.as_str(),
1283                            format!(r#"{{"delta":{index}}}"#),
1284                            ConsoleFrameSourceKind::ConsoleEvent.as_str(),
1285                            format!("busy-event-{index}"),
1286                        ])
1287                        .expect("insert busy frame");
1288                }
1289            }
1290            let plan = tx
1291                .prepare(
1292                    "EXPLAIN QUERY PLAN
1293                     SELECT cursor_seq FROM console_frames
1294                     WHERE cursor_seq > ?1 AND cursor_seq < ?2 AND identity = ?3
1295                     ORDER BY cursor_seq DESC LIMIT ?4",
1296                )
1297                .expect("prepare query plan")
1298                .query_map(
1299                    rusqlite::params![0_i64, i64::MAX, "sparse-agent", 11_i64],
1300                    |row| row.get::<_, String>(3),
1301                )
1302                .expect("run query plan")
1303                .collect::<Result<Vec<_>, _>>()
1304                .expect("collect query plan")
1305                .join("\n")
1306                .to_lowercase();
1307            assert!(
1308                plan.contains("idx_console_frames_identity_cursor"),
1309                "sparse identity recent query should use identity/cursor index; plan was: {plan}"
1310            );
1311            tx.commit().expect("commit transaction");
1312        }
1313
1314        let page = store
1315            .query_windowed_frames(ConsoleTimelineWindowQuery {
1316                identity: Some("sparse-agent".to_string()),
1317                mode: ConsoleTimelineMode::Recent,
1318                limit: 10,
1319                ..ConsoleTimelineWindowQuery::default()
1320            })
1321            .await
1322            .expect("query sparse recent");
1323
1324        assert_eq!(page.frames.len(), 1);
1325        assert_eq!(page.frames[0].dedupe_key, "sparse-event");
1326        assert_eq!(
1327            page.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1328            Some(1)
1329        );
1330    }
1331
1332    #[tokio::test]
1333    async fn sqlite_log_queries_recent_window_with_before_cursor() {
1334        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1335        for index in 1..=6 {
1336            store
1337                .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1338                .await
1339                .expect("append frame");
1340        }
1341
1342        let page = store
1343            .query_windowed_frames(ConsoleTimelineWindowQuery {
1344                identity: Some("agent-a".to_string()),
1345                mode: ConsoleTimelineMode::Recent,
1346                limit: 2,
1347                ..ConsoleTimelineWindowQuery::default()
1348            })
1349            .await
1350            .expect("query recent");
1351        assert_eq!(
1352            page.frames
1353                .iter()
1354                .map(|frame| frame.dedupe_key.as_str())
1355                .collect::<Vec<_>>(),
1356            vec!["event-5", "event-6"]
1357        );
1358
1359        let older = store
1360            .query_windowed_frames(ConsoleTimelineWindowQuery {
1361                identity: Some("agent-a".to_string()),
1362                mode: ConsoleTimelineMode::Recent,
1363                before: page.frames.first().map(|frame| frame.cursor.clone()),
1364                limit: 2,
1365                ..ConsoleTimelineWindowQuery::default()
1366            })
1367            .await
1368            .expect("query older");
1369        assert_eq!(
1370            older
1371                .frames
1372                .iter()
1373                .map(|frame| frame.dedupe_key.as_str())
1374                .collect::<Vec<_>>(),
1375            vec!["event-3", "event-4"]
1376        );
1377        assert_eq!(
1378            older.latest_cursor.as_ref().and_then(ConsoleCursor::seq),
1379            Some(4)
1380        );
1381    }
1382
1383    #[tokio::test]
1384    async fn sqlite_log_reports_exhausted_on_exact_size_recent_final_page() {
1385        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1386        for index in 1..=400 {
1387            store
1388                .append_if_absent(sample_frame(&format!("event-{index}"), "agent-a"))
1389                .await
1390                .expect("append frame");
1391        }
1392
1393        let first = store
1394            .query_windowed_frames(ConsoleTimelineWindowQuery {
1395                identity: Some("agent-a".to_string()),
1396                mode: ConsoleTimelineMode::Recent,
1397                limit: 200,
1398                ..ConsoleTimelineWindowQuery::default()
1399            })
1400            .await
1401            .expect("query recent");
1402        assert!(!first.exhausted);
1403        assert_eq!(first.frames[0].dedupe_key, "event-201");
1404
1405        let older = store
1406            .query_windowed_frames(ConsoleTimelineWindowQuery {
1407                identity: Some("agent-a".to_string()),
1408                mode: ConsoleTimelineMode::Recent,
1409                before: first.frames.first().map(|frame| frame.cursor.clone()),
1410                limit: 200,
1411                ..ConsoleTimelineWindowQuery::default()
1412            })
1413            .await
1414            .expect("query older");
1415        assert!(older.exhausted);
1416        assert_eq!(older.frames.len(), 200);
1417        assert_eq!(older.frames[0].dedupe_key, "event-1");
1418    }
1419
1420    #[tokio::test]
1421    async fn sqlite_log_rejects_out_of_range_console_cursors() {
1422        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1423        store
1424            .append_if_absent(sample_frame("event-1", "agent-a"))
1425            .await
1426            .expect("append frame");
1427
1428        let err = store
1429            .query_windowed_frames(ConsoleTimelineWindowQuery {
1430                after: Some(ConsoleCursor::from("console:9223372036854775808")),
1431                limit: 10,
1432                ..ConsoleTimelineWindowQuery::default()
1433            })
1434            .await
1435            .expect_err("oversized after cursor should be rejected");
1436        assert!(err.to_string().contains("out of range"));
1437
1438        let err = store
1439            .query_windowed_frames(ConsoleTimelineWindowQuery {
1440                before: Some(ConsoleCursor::from("console:9223372036854775808")),
1441                limit: 10,
1442                ..ConsoleTimelineWindowQuery::default()
1443            })
1444            .await
1445            .expect_err("oversized before cursor should be rejected");
1446        assert!(err.to_string().contains("out of range"));
1447    }
1448
1449    #[tokio::test]
1450    async fn sqlite_log_updates_status() {
1451        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1452        let first = store
1453            .append_if_absent(sample_frame("event-1", "agent-a"))
1454            .await
1455            .expect("append first");
1456        let updated = store
1457            .update_frame_status(&first.frame.id, ConsoleFrameStatus::DeliveryFailed)
1458            .await
1459            .expect("update")
1460            .expect("updated frame");
1461        assert_eq!(updated.status, ConsoleFrameStatus::DeliveryFailed);
1462        assert_eq!(updated.frame_version, 2);
1463        assert!(updated.updated_at_ms.is_some());
1464    }
1465
1466    #[tokio::test]
1467    async fn sqlite_log_records_source_watermarks() {
1468        let store = SqliteConsoleLogStore::in_memory().expect("sqlite store");
1469        store
1470            .record_source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent, "evt-99")
1471            .await
1472            .expect("record watermark");
1473        let watermark = store
1474            .source_watermark("runtime-a", ConsoleFrameSourceKind::ConsoleEvent)
1475            .await
1476            .expect("read watermark");
1477        assert_eq!(watermark.as_deref(), Some("evt-99"));
1478    }
1479
1480    #[tokio::test]
1481    async fn sqlite_log_persists_frames_across_handles() {
1482        let temp_dir = tempfile::tempdir().expect("temp dir");
1483        let path = temp_dir.path().join("console.sqlite");
1484        let store = SqliteConsoleLogStore::open(&path).expect("open first handle");
1485        store
1486            .append_if_absent(sample_frame("event-1", "agent-a"))
1487            .await
1488            .expect("append frame");
1489        drop(store);
1490
1491        let reopened = SqliteConsoleLogStore::open(&path).expect("open second handle");
1492        let page = reopened
1493            .query_windowed_frames(ConsoleTimelineWindowQuery {
1494                identity: Some("agent-a".to_string()),
1495                limit: 10,
1496                ..ConsoleTimelineWindowQuery::default()
1497            })
1498            .await
1499            .expect("query frames");
1500        assert_eq!(page.frames.len(), 1);
1501        assert_eq!(page.frames[0].dedupe_key, "event-1");
1502    }
1503}