Skip to main content

lago_core/
journal.rs

1use crate::error::LagoResult;
2use crate::event::EventEnvelope;
3use crate::id::{BranchId, EventId, SeqNo, SessionId};
4use crate::session::Session;
5use futures::Stream;
6use std::pin::Pin;
7
8/// Query parameters for reading events.
9#[derive(Debug, Clone, Default)]
10pub struct EventQuery {
11    pub session_id: Option<SessionId>,
12    pub branch_id: Option<BranchId>,
13    pub after_seq: Option<SeqNo>,
14    pub before_seq: Option<SeqNo>,
15    pub limit: Option<usize>,
16    /// Filter events by metadata key-value pairs (all must match).
17    pub metadata_filters: Option<Vec<(String, String)>>,
18    /// Filter by event kind discriminant name (e.g. "HiveArtifactShared").
19    pub kind_filter: Option<Vec<String>>,
20}
21
22impl EventQuery {
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    pub fn session(mut self, id: SessionId) -> Self {
28        self.session_id = Some(id);
29        self
30    }
31
32    pub fn branch(mut self, id: BranchId) -> Self {
33        self.branch_id = Some(id);
34        self
35    }
36
37    pub fn after(mut self, seq: SeqNo) -> Self {
38        self.after_seq = Some(seq);
39        self
40    }
41
42    pub fn before(mut self, seq: SeqNo) -> Self {
43        self.before_seq = Some(seq);
44        self
45    }
46
47    pub fn limit(mut self, n: usize) -> Self {
48        self.limit = Some(n);
49        self
50    }
51
52    /// Add a metadata key-value filter. All metadata filters must match.
53    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
54        self.metadata_filters
55            .get_or_insert_with(Vec::new)
56            .push((key.into(), value.into()));
57        self
58    }
59
60    /// Add a kind filter. Events must match any of the specified kind names.
61    pub fn with_kind(mut self, kind_name: impl Into<String>) -> Self {
62        self.kind_filter
63            .get_or_insert_with(Vec::new)
64            .push(kind_name.into());
65        self
66    }
67}
68
69impl EventQuery {
70    /// Check if an event envelope matches the metadata and kind filters.
71    /// Used for post-deserialization filtering in journal implementations.
72    pub fn matches_filters(&self, envelope: &EventEnvelope) -> bool {
73        // Check metadata filters — all must match
74        if let Some(ref filters) = self.metadata_filters {
75            for (key, value) in filters {
76                match envelope.metadata.get(key) {
77                    Some(v) if v == value => {}
78                    _ => return false,
79                }
80            }
81        }
82
83        // Check kind filter — event must match any specified kind
84        if let Some(ref kinds) = self.kind_filter {
85            let event_json = serde_json::to_value(&envelope.payload).ok();
86            let event_type = event_json
87                .as_ref()
88                .and_then(|v| v.get("type"))
89                .and_then(|v| v.as_str())
90                .unwrap_or("");
91            if !kinds.iter().any(|k| k == event_type) {
92                return false;
93            }
94        }
95
96        true
97    }
98}
99
100/// Event stream type alias.
101pub type EventStream = Pin<Box<dyn Stream<Item = LagoResult<EventEnvelope>> + Send>>;
102
103/// Boxed future type alias for dyn-compatible async trait methods.
104type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
105
106/// The primary trait for event persistence.
107///
108/// Uses boxed futures for dyn-compatibility (`Arc<dyn Journal>`).
109pub trait Journal: Send + Sync {
110    /// Append a single event. Returns the assigned sequence number.
111    fn append(&self, event: EventEnvelope) -> BoxFuture<'_, LagoResult<SeqNo>>;
112
113    /// Append a batch of events atomically. Returns the last assigned sequence number.
114    fn append_batch(&self, events: Vec<EventEnvelope>) -> BoxFuture<'_, LagoResult<SeqNo>>;
115
116    /// Read events matching a query.
117    fn read(&self, query: EventQuery) -> BoxFuture<'_, LagoResult<Vec<EventEnvelope>>>;
118
119    /// Look up a single event by ID.
120    fn get_event(&self, event_id: &EventId) -> BoxFuture<'_, LagoResult<Option<EventEnvelope>>>;
121
122    /// Get the current head sequence number for a session+branch.
123    fn head_seq(
124        &self,
125        session_id: &SessionId,
126        branch_id: &BranchId,
127    ) -> BoxFuture<'_, LagoResult<SeqNo>>;
128
129    /// Create a tailing stream of events.
130    fn stream(
131        &self,
132        session_id: SessionId,
133        branch_id: BranchId,
134        after_seq: SeqNo,
135    ) -> BoxFuture<'_, LagoResult<EventStream>>;
136
137    /// Create or update a session.
138    fn put_session(&self, session: Session) -> BoxFuture<'_, LagoResult<()>>;
139
140    /// Get a session by ID.
141    fn get_session(&self, session_id: &SessionId) -> BoxFuture<'_, LagoResult<Option<Session>>>;
142
143    /// List all sessions.
144    fn list_sessions(&self) -> BoxFuture<'_, LagoResult<Vec<Session>>>;
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn event_query_default_is_empty() {
153        let q = EventQuery::new();
154        assert!(q.session_id.is_none());
155        assert!(q.branch_id.is_none());
156        assert!(q.after_seq.is_none());
157        assert!(q.before_seq.is_none());
158        assert!(q.limit.is_none());
159    }
160
161    #[test]
162    fn event_query_builder_chain() {
163        let q = EventQuery::new()
164            .session(SessionId::from_string("SESS001"))
165            .branch(BranchId::from_string("main"))
166            .after(10)
167            .before(100)
168            .limit(50);
169
170        assert_eq!(q.session_id.as_ref().unwrap().as_str(), "SESS001");
171        assert_eq!(q.branch_id.as_ref().unwrap().as_str(), "main");
172        assert_eq!(q.after_seq, Some(10));
173        assert_eq!(q.before_seq, Some(100));
174        assert_eq!(q.limit, Some(50));
175    }
176
177    #[test]
178    fn event_query_partial_builder() {
179        let q = EventQuery::new()
180            .session(SessionId::from_string("S1"))
181            .limit(5);
182        assert!(q.session_id.is_some());
183        assert!(q.branch_id.is_none());
184        assert!(q.after_seq.is_none());
185        assert_eq!(q.limit, Some(5));
186    }
187
188    #[test]
189    fn event_query_metadata_builder() {
190        let q = EventQuery::new()
191            .with_metadata("hive_task_id", "HIVE001")
192            .with_metadata("agent", "alpha");
193        let filters = q.metadata_filters.unwrap();
194        assert_eq!(filters.len(), 2);
195        assert_eq!(filters[0], ("hive_task_id".into(), "HIVE001".into()));
196    }
197
198    #[test]
199    fn event_query_kind_builder() {
200        let q = EventQuery::new()
201            .with_kind("HiveArtifactShared")
202            .with_kind("HiveSelectionMade");
203        let kinds = q.kind_filter.unwrap();
204        assert_eq!(kinds.len(), 2);
205        assert_eq!(kinds[0], "HiveArtifactShared");
206    }
207
208    #[test]
209    fn matches_filters_metadata() {
210        use crate::event::EventEnvelope;
211        use std::collections::HashMap;
212
213        let mut metadata = HashMap::new();
214        metadata.insert("hive_task_id".to_string(), "H1".to_string());
215        metadata.insert("agent".to_string(), "a1".to_string());
216
217        let envelope = EventEnvelope {
218            event_id: EventId::from_string("E1"),
219            session_id: SessionId::from_string("S1"),
220            branch_id: BranchId::from_string("main"),
221            run_id: None,
222            seq: 1,
223            timestamp: 100,
224            parent_id: None,
225            payload: crate::event::EventPayload::ErrorRaised {
226                message: "test".into(),
227            },
228            metadata,
229            schema_version: 1,
230        };
231
232        // Match
233        let q = EventQuery::new().with_metadata("hive_task_id", "H1");
234        assert!(q.matches_filters(&envelope));
235
236        // Mismatch
237        let q = EventQuery::new().with_metadata("hive_task_id", "H2");
238        assert!(!q.matches_filters(&envelope));
239
240        // No filters = match
241        let q = EventQuery::new();
242        assert!(q.matches_filters(&envelope));
243    }
244
245    #[test]
246    fn matches_filters_kind() {
247        use crate::event::EventEnvelope;
248
249        let envelope = EventEnvelope {
250            event_id: EventId::from_string("E1"),
251            session_id: SessionId::from_string("S1"),
252            branch_id: BranchId::from_string("main"),
253            run_id: None,
254            seq: 1,
255            timestamp: 100,
256            parent_id: None,
257            payload: crate::event::EventPayload::ErrorRaised {
258                message: "test".into(),
259            },
260            metadata: std::collections::HashMap::new(),
261            schema_version: 1,
262        };
263
264        let q = EventQuery::new().with_kind("ErrorRaised");
265        assert!(q.matches_filters(&envelope));
266
267        let q = EventQuery::new().with_kind("HiveTaskCreated");
268        assert!(!q.matches_filters(&envelope));
269    }
270}