1use crate::error::LagoResult;
2use crate::event::EventEnvelope;
3use crate::id::{BranchId, EventId, SeqNo, SessionId};
4use crate::session::Session;
5use futures::Stream;
6use std::pin::Pin;
7
8#[derive(Debug, Clone, Default)]
10pub struct EventQuery {
11 pub session_id: Option<SessionId>,
12 pub branch_id: Option<BranchId>,
13 pub after_seq: Option<SeqNo>,
14 pub before_seq: Option<SeqNo>,
15 pub limit: Option<usize>,
16 pub metadata_filters: Option<Vec<(String, String)>>,
18 pub kind_filter: Option<Vec<String>>,
20}
21
22impl EventQuery {
23 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn session(mut self, id: SessionId) -> Self {
28 self.session_id = Some(id);
29 self
30 }
31
32 pub fn branch(mut self, id: BranchId) -> Self {
33 self.branch_id = Some(id);
34 self
35 }
36
37 pub fn after(mut self, seq: SeqNo) -> Self {
38 self.after_seq = Some(seq);
39 self
40 }
41
42 pub fn before(mut self, seq: SeqNo) -> Self {
43 self.before_seq = Some(seq);
44 self
45 }
46
47 pub fn limit(mut self, n: usize) -> Self {
48 self.limit = Some(n);
49 self
50 }
51
52 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
54 self.metadata_filters
55 .get_or_insert_with(Vec::new)
56 .push((key.into(), value.into()));
57 self
58 }
59
60 pub fn with_kind(mut self, kind_name: impl Into<String>) -> Self {
62 self.kind_filter
63 .get_or_insert_with(Vec::new)
64 .push(kind_name.into());
65 self
66 }
67}
68
69impl EventQuery {
70 pub fn matches_filters(&self, envelope: &EventEnvelope) -> bool {
73 if let Some(ref filters) = self.metadata_filters {
75 for (key, value) in filters {
76 match envelope.metadata.get(key) {
77 Some(v) if v == value => {}
78 _ => return false,
79 }
80 }
81 }
82
83 if let Some(ref kinds) = self.kind_filter {
85 let event_json = serde_json::to_value(&envelope.payload).ok();
86 let event_type = event_json
87 .as_ref()
88 .and_then(|v| v.get("type"))
89 .and_then(|v| v.as_str())
90 .unwrap_or("");
91 if !kinds.iter().any(|k| k == event_type) {
92 return false;
93 }
94 }
95
96 true
97 }
98}
99
100pub type EventStream = Pin<Box<dyn Stream<Item = LagoResult<EventEnvelope>> + Send>>;
102
103type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
105
106pub trait Journal: Send + Sync {
110 fn append(&self, event: EventEnvelope) -> BoxFuture<'_, LagoResult<SeqNo>>;
112
113 fn append_batch(&self, events: Vec<EventEnvelope>) -> BoxFuture<'_, LagoResult<SeqNo>>;
115
116 fn read(&self, query: EventQuery) -> BoxFuture<'_, LagoResult<Vec<EventEnvelope>>>;
118
119 fn get_event(&self, event_id: &EventId) -> BoxFuture<'_, LagoResult<Option<EventEnvelope>>>;
121
122 fn head_seq(
124 &self,
125 session_id: &SessionId,
126 branch_id: &BranchId,
127 ) -> BoxFuture<'_, LagoResult<SeqNo>>;
128
129 fn stream(
131 &self,
132 session_id: SessionId,
133 branch_id: BranchId,
134 after_seq: SeqNo,
135 ) -> BoxFuture<'_, LagoResult<EventStream>>;
136
137 fn put_session(&self, session: Session) -> BoxFuture<'_, LagoResult<()>>;
139
140 fn get_session(&self, session_id: &SessionId) -> BoxFuture<'_, LagoResult<Option<Session>>>;
142
143 fn list_sessions(&self) -> BoxFuture<'_, LagoResult<Vec<Session>>>;
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn event_query_default_is_empty() {
153 let q = EventQuery::new();
154 assert!(q.session_id.is_none());
155 assert!(q.branch_id.is_none());
156 assert!(q.after_seq.is_none());
157 assert!(q.before_seq.is_none());
158 assert!(q.limit.is_none());
159 }
160
161 #[test]
162 fn event_query_builder_chain() {
163 let q = EventQuery::new()
164 .session(SessionId::from_string("SESS001"))
165 .branch(BranchId::from_string("main"))
166 .after(10)
167 .before(100)
168 .limit(50);
169
170 assert_eq!(q.session_id.as_ref().unwrap().as_str(), "SESS001");
171 assert_eq!(q.branch_id.as_ref().unwrap().as_str(), "main");
172 assert_eq!(q.after_seq, Some(10));
173 assert_eq!(q.before_seq, Some(100));
174 assert_eq!(q.limit, Some(50));
175 }
176
177 #[test]
178 fn event_query_partial_builder() {
179 let q = EventQuery::new()
180 .session(SessionId::from_string("S1"))
181 .limit(5);
182 assert!(q.session_id.is_some());
183 assert!(q.branch_id.is_none());
184 assert!(q.after_seq.is_none());
185 assert_eq!(q.limit, Some(5));
186 }
187
188 #[test]
189 fn event_query_metadata_builder() {
190 let q = EventQuery::new()
191 .with_metadata("hive_task_id", "HIVE001")
192 .with_metadata("agent", "alpha");
193 let filters = q.metadata_filters.unwrap();
194 assert_eq!(filters.len(), 2);
195 assert_eq!(filters[0], ("hive_task_id".into(), "HIVE001".into()));
196 }
197
198 #[test]
199 fn event_query_kind_builder() {
200 let q = EventQuery::new()
201 .with_kind("HiveArtifactShared")
202 .with_kind("HiveSelectionMade");
203 let kinds = q.kind_filter.unwrap();
204 assert_eq!(kinds.len(), 2);
205 assert_eq!(kinds[0], "HiveArtifactShared");
206 }
207
208 #[test]
209 fn matches_filters_metadata() {
210 use crate::event::EventEnvelope;
211 use std::collections::HashMap;
212
213 let mut metadata = HashMap::new();
214 metadata.insert("hive_task_id".to_string(), "H1".to_string());
215 metadata.insert("agent".to_string(), "a1".to_string());
216
217 let envelope = EventEnvelope {
218 event_id: EventId::from_string("E1"),
219 session_id: SessionId::from_string("S1"),
220 branch_id: BranchId::from_string("main"),
221 run_id: None,
222 seq: 1,
223 timestamp: 100,
224 parent_id: None,
225 payload: crate::event::EventPayload::ErrorRaised {
226 message: "test".into(),
227 },
228 metadata,
229 schema_version: 1,
230 };
231
232 let q = EventQuery::new().with_metadata("hive_task_id", "H1");
234 assert!(q.matches_filters(&envelope));
235
236 let q = EventQuery::new().with_metadata("hive_task_id", "H2");
238 assert!(!q.matches_filters(&envelope));
239
240 let q = EventQuery::new();
242 assert!(q.matches_filters(&envelope));
243 }
244
245 #[test]
246 fn matches_filters_kind() {
247 use crate::event::EventEnvelope;
248
249 let envelope = EventEnvelope {
250 event_id: EventId::from_string("E1"),
251 session_id: SessionId::from_string("S1"),
252 branch_id: BranchId::from_string("main"),
253 run_id: None,
254 seq: 1,
255 timestamp: 100,
256 parent_id: None,
257 payload: crate::event::EventPayload::ErrorRaised {
258 message: "test".into(),
259 },
260 metadata: std::collections::HashMap::new(),
261 schema_version: 1,
262 };
263
264 let q = EventQuery::new().with_kind("ErrorRaised");
265 assert!(q.matches_filters(&envelope));
266
267 let q = EventQuery::new().with_kind("HiveTaskCreated");
268 assert!(!q.matches_filters(&envelope));
269 }
270}