1use crate::core::events::{Event, EventId};
7use chrono::Duration as ChronoDuration;
8use chrono::{DateTime, Utc};
9use fs2::FileExt;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::mpsc::{self, Receiver};
13use std::sync::{Arc, RwLock};
14use std::thread;
15use std::time::Duration;
16use uuid::Uuid;
17
18fn normalize_concatenated_json_objects(line: &str) -> String {
19 let mut out = String::with_capacity(line.len());
20 let mut chars = line.chars().peekable();
21 let mut in_string = false;
22 let mut escape = false;
23
24 while let Some(c) = chars.next() {
25 if in_string {
26 if escape {
27 escape = false;
28 } else if c == '\\' {
29 escape = true;
30 } else if c == '"' {
31 in_string = false;
32 }
33 } else if c == '"' {
34 in_string = true;
35 }
36
37 out.push(c);
38
39 if !in_string && c == '}' && chars.peek().copied() == Some('{') {
40 out.push('\n');
41 }
42 }
43
44 out
45}
46
47#[derive(Debug, thiserror::Error)]
49pub enum EventStoreError {
50 #[error("IO error: {0}")]
51 Io(#[from] std::io::Error),
52 #[error("Serialization error: {0}")]
53 Serialization(#[from] serde_json::Error),
54 #[error("Event not found: {0}")]
55 NotFound(EventId),
56}
57
58pub type Result<T> = std::result::Result<T, EventStoreError>;
60
61#[derive(Debug, Default, Clone)]
63pub struct EventFilter {
64 pub project_id: Option<Uuid>,
66 pub graph_id: Option<Uuid>,
68 pub task_id: Option<Uuid>,
70 pub flow_id: Option<Uuid>,
72 pub attempt_id: Option<Uuid>,
74 pub since: Option<DateTime<Utc>>,
76 pub until: Option<DateTime<Utc>>,
78 pub limit: Option<usize>,
80}
81
82impl EventFilter {
83 #[must_use]
85 pub fn all() -> Self {
86 Self::default()
87 }
88
89 #[must_use]
91 pub fn for_project(project_id: Uuid) -> Self {
92 Self {
93 project_id: Some(project_id),
94 ..Default::default()
95 }
96 }
97
98 #[must_use]
99 pub fn for_graph(graph_id: Uuid) -> Self {
100 Self {
101 graph_id: Some(graph_id),
102 ..Default::default()
103 }
104 }
105
106 #[must_use]
108 pub fn matches(&self, event: &Event) -> bool {
109 if let Some(pid) = self.project_id {
110 if event.metadata.correlation.project_id != Some(pid) {
111 return false;
112 }
113 }
114 if let Some(gid) = self.graph_id {
115 if event.metadata.correlation.graph_id != Some(gid) {
116 return false;
117 }
118 }
119 if let Some(tid) = self.task_id {
120 if event.metadata.correlation.task_id != Some(tid) {
121 return false;
122 }
123 }
124 if let Some(fid) = self.flow_id {
125 if event.metadata.correlation.flow_id != Some(fid) {
126 return false;
127 }
128 }
129 if let Some(aid) = self.attempt_id {
130 if event.metadata.correlation.attempt_id != Some(aid) {
131 return false;
132 }
133 }
134 if let Some(since) = self.since {
135 if event.metadata.timestamp < since {
136 return false;
137 }
138 }
139 if let Some(until) = self.until {
140 if event.metadata.timestamp > until {
141 return false;
142 }
143 }
144 true
145 }
146}
147
148pub trait EventStore: Send + Sync {
150 fn append(&self, event: Event) -> Result<EventId>;
152
153 fn read(&self, filter: &EventFilter) -> Result<Vec<Event>>;
155
156 fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>>;
161
162 fn read_all(&self) -> Result<Vec<Event>>;
164}
165
166#[derive(Debug, Default)]
168pub struct InMemoryEventStore {
169 events: Arc<RwLock<Vec<Event>>>,
170}
171
172impl InMemoryEventStore {
173 #[must_use]
175 pub fn new() -> Self {
176 Self {
177 events: Arc::new(RwLock::new(Vec::new())),
178 }
179 }
180}
181
182#[allow(clippy::significant_drop_tightening)]
183impl EventStore for InMemoryEventStore {
184 fn append(&self, mut event: Event) -> Result<EventId> {
185 let mut events = self.events.write().expect("lock poisoned");
186 let next_seq = events.len() as u64;
187 event.metadata.sequence = Some(next_seq);
188 event.metadata.id = EventId::from_ordered_u64(next_seq);
189 if let Some(last) = events.last() {
190 if event.metadata.timestamp <= last.metadata.timestamp {
191 event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
192 }
193 }
194 let id = event.id();
195 events.push(event);
196 Ok(id)
197 }
198
199 fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
200 let events = self.events.read().expect("lock poisoned");
201 let mut result: Vec<Event> = events
202 .iter()
203 .filter(|e| filter.matches(e))
204 .cloned()
205 .collect();
206 if let Some(limit) = filter.limit {
207 result.truncate(limit);
208 }
209 Ok(result)
210 }
211
212 fn read_all(&self) -> Result<Vec<Event>> {
213 let events = self.events.read().expect("lock poisoned");
214 Ok(events.clone())
215 }
216
217 fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
218 let (tx, rx) = mpsc::channel();
219 let filter = filter.clone();
220 let events = Arc::clone(&self.events);
221
222 thread::spawn(move || {
223 let mut sent = 0usize;
224 let mut seen = 0usize;
225
226 loop {
227 let snapshot = {
228 let guard = events.read().expect("lock poisoned");
229 guard.clone()
230 };
231
232 for ev in snapshot.iter().skip(seen) {
233 if !filter.matches(ev) {
234 continue;
235 }
236
237 if tx.send(ev.clone()).is_err() {
238 return;
239 }
240
241 sent += 1;
242 if let Some(limit) = filter.limit {
243 if sent >= limit {
244 return;
245 }
246 }
247 }
248
249 seen = snapshot.len();
250 thread::sleep(Duration::from_millis(200));
251 }
252 });
253
254 Ok(rx)
255 }
256}
257
258#[derive(Debug)]
260pub struct FileEventStore {
261 path: PathBuf,
262 cache: RwLock<Vec<Event>>,
263}
264
265impl FileEventStore {
266 pub fn open(path: PathBuf) -> Result<Self> {
271 if let Some(parent) = path.parent() {
272 std::fs::create_dir_all(parent)?;
273 }
274
275 let file = std::fs::OpenOptions::new()
276 .read(true)
277 .write(true)
278 .create(true)
279 .truncate(false)
280 .open(&path)?;
281 file.lock_shared()?;
282
283 let mut content = String::new();
284 {
285 use std::io::Read;
286 let mut reader = std::io::BufReader::new(&file);
287 reader.read_to_string(&mut content)?;
288 }
289
290 file.unlock()?;
291
292 let cache = content
293 .lines()
294 .filter(|l| !l.trim().is_empty())
295 .flat_map(|line| {
296 let normalized = normalize_concatenated_json_objects(line);
297 serde_json::Deserializer::from_str(&normalized)
298 .into_iter::<Event>()
299 .collect::<Vec<_>>()
300 })
301 .collect::<std::result::Result<Vec<Event>, _>>()?;
302
303 Ok(Self {
304 path,
305 cache: RwLock::new(cache),
306 })
307 }
308
309 #[must_use]
311 pub const fn path(&self) -> &PathBuf {
312 &self.path
313 }
314}
315
316#[allow(clippy::significant_drop_tightening)]
317impl EventStore for FileEventStore {
318 fn append(&self, mut event: Event) -> Result<EventId> {
319 use std::fs::OpenOptions;
320 use std::io::Write;
321
322 let mut file = OpenOptions::new()
323 .read(true)
324 .create(true)
325 .append(true)
326 .open(&self.path)?;
327 file.lock_exclusive()?;
328
329 let mut content = String::new();
330 {
331 use std::io::{Read, Seek};
332 let _ = file.rewind();
333 let mut reader = std::io::BufReader::new(&file);
334 reader.read_to_string(&mut content)?;
335 }
336
337 let disk_events = content
338 .lines()
339 .filter(|l| !l.trim().is_empty())
340 .flat_map(|line| {
341 let normalized = normalize_concatenated_json_objects(line);
342 serde_json::Deserializer::from_str(&normalized)
343 .into_iter::<Event>()
344 .collect::<Vec<_>>()
345 })
346 .collect::<std::result::Result<Vec<Event>, _>>()?;
347
348 let mut cache = self.cache.write().expect("lock poisoned");
349 cache.clone_from(&disk_events);
350
351 let next_seq = cache.len() as u64;
352 event.metadata.sequence = Some(next_seq);
353 event.metadata.id = EventId::from_ordered_u64(next_seq);
354 if let Some(last) = cache.last() {
355 if event.metadata.timestamp <= last.metadata.timestamp {
356 event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
357 }
358 }
359 let id = event.id();
360
361 let json = serde_json::to_string(&event)?;
362 writeln!(file, "{json}")?;
363 let _ = file.flush();
364 let _ = file.unlock();
365
366 cache.push(event);
367 Ok(id)
368 }
369
370 fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
371 let cache = self.cache.read().expect("lock poisoned");
372 let mut result: Vec<Event> = cache
373 .iter()
374 .filter(|e| filter.matches(e))
375 .cloned()
376 .collect();
377 if let Some(limit) = filter.limit {
378 result.truncate(limit);
379 }
380 Ok(result)
381 }
382
383 fn read_all(&self) -> Result<Vec<Event>> {
384 let cache = self.cache.read().expect("lock poisoned");
385 Ok(cache.clone())
386 }
387
388 fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
389 let (tx, rx) = mpsc::channel();
390 let filter = filter.clone();
391 let path = self.path.clone();
392
393 thread::spawn(move || {
394 let mut sent = 0usize;
395 let mut seen = 0usize;
396
397 loop {
398 let Ok(content) = std::fs::read_to_string(&path) else {
399 thread::sleep(Duration::from_millis(200));
400 continue;
401 };
402
403 let Ok(events) = content
404 .lines()
405 .filter(|l| !l.trim().is_empty())
406 .flat_map(|line| {
407 let normalized = normalize_concatenated_json_objects(line);
408 serde_json::Deserializer::from_str(&normalized)
409 .into_iter::<Event>()
410 .collect::<Vec<_>>()
411 })
412 .collect::<std::result::Result<Vec<Event>, _>>()
413 else {
414 thread::sleep(Duration::from_millis(200));
415 continue;
416 };
417
418 for ev in events.iter().skip(seen) {
419 if !filter.matches(ev) {
420 continue;
421 }
422
423 if tx.send(ev.clone()).is_err() {
424 return;
425 }
426
427 sent += 1;
428 if let Some(limit) = filter.limit {
429 if sent >= limit {
430 return;
431 }
432 }
433 }
434
435 seen = events.len();
436 thread::sleep(Duration::from_millis(200));
437 }
438 });
439
440 Ok(rx)
441 }
442}
443
444#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
445struct RegistryIndexDisk {
446 projects: HashMap<String, String>,
447}
448
449#[derive(Debug)]
454pub struct IndexedEventStore {
455 index_path: PathBuf,
456 projects_dir: PathBuf,
457 flows_dir: PathBuf,
458 global: FileEventStore,
459}
460
461impl IndexedEventStore {
462 pub fn open(base_dir: &Path) -> Result<Self> {
470 std::fs::create_dir_all(base_dir)?;
471
472 let index_path = base_dir.join("index.json");
473 let projects_dir = base_dir.join("projects");
474 let flows_dir = base_dir.join("flows");
475 std::fs::create_dir_all(&projects_dir)?;
476 std::fs::create_dir_all(&flows_dir)?;
477
478 if !index_path.exists() {
479 let disk = RegistryIndexDisk::default();
480 std::fs::write(&index_path, serde_json::to_string_pretty(&disk)?)?;
481 }
482
483 let global = FileEventStore::open(base_dir.join("events.jsonl"))?;
484
485 Ok(Self {
486 index_path,
487 projects_dir,
488 flows_dir,
489 global,
490 })
491 }
492
493 fn project_log_rel(project_id: Uuid) -> String {
494 format!("projects/{project_id}/events.jsonl")
495 }
496
497 fn flow_log_path(&self, flow_id: Uuid) -> PathBuf {
498 self.flows_dir
499 .join(flow_id.to_string())
500 .join("events.jsonl")
501 }
502
503 fn ensure_project_index(&self, project_id: Uuid) -> Result<PathBuf> {
504 use std::io::{Read, Seek, Write};
505
506 let rel = Self::project_log_rel(project_id);
507 let abs = self
508 .projects_dir
509 .join(project_id.to_string())
510 .join("events.jsonl");
511
512 if let Some(parent) = abs.parent() {
513 std::fs::create_dir_all(parent)?;
514 }
515
516 let mut file = std::fs::OpenOptions::new()
517 .read(true)
518 .write(true)
519 .create(true)
520 .truncate(false)
521 .open(&self.index_path)?;
522 file.lock_exclusive()?;
523
524 let mut content = String::new();
525 {
526 let _ = file.rewind();
527 let mut reader = std::io::BufReader::new(&file);
528 reader.read_to_string(&mut content)?;
529 }
530
531 let mut disk: RegistryIndexDisk = if content.trim().is_empty() {
532 RegistryIndexDisk::default()
533 } else {
534 serde_json::from_str(&content).unwrap_or_default()
535 };
536
537 disk.projects
538 .entry(project_id.to_string())
539 .or_insert_with(|| rel.clone());
540
541 let json = serde_json::to_string_pretty(&disk)?;
542 {
543 let _ = file.rewind();
544 file.set_len(0)?;
545 file.write_all(json.as_bytes())?;
546 let _ = file.flush();
547 }
548 let _ = file.unlock();
549
550 Ok(abs)
551 }
552
553 fn append_mirror(path: &PathBuf, event: &Event) -> Result<()> {
554 use std::io::Write;
555 if let Some(parent) = path.parent() {
556 std::fs::create_dir_all(parent)?;
557 }
558 let mut file = std::fs::OpenOptions::new()
559 .read(true)
560 .create(true)
561 .append(true)
562 .open(path)?;
563 file.lock_exclusive()?;
564 let json = serde_json::to_string(event)?;
565 writeln!(file, "{json}")?;
566 let _ = file.flush();
567 let _ = file.unlock();
568 Ok(())
569 }
570}
571
572#[allow(clippy::significant_drop_tightening)]
573impl EventStore for IndexedEventStore {
574 fn append(&self, mut event: Event) -> Result<EventId> {
575 use std::fs::OpenOptions;
576 use std::io::Write;
577
578 let mut file = OpenOptions::new()
581 .read(true)
582 .create(true)
583 .append(true)
584 .open(&self.global.path)?;
585 file.lock_exclusive()?;
586
587 let mut content = String::new();
588 {
589 use std::io::{Read, Seek};
590 let _ = file.rewind();
591 let mut reader = std::io::BufReader::new(&file);
592 reader.read_to_string(&mut content)?;
593 }
594
595 let disk_events = content
596 .lines()
597 .filter(|l| !l.trim().is_empty())
598 .flat_map(|line| {
599 let normalized = normalize_concatenated_json_objects(line);
600 serde_json::Deserializer::from_str(&normalized)
601 .into_iter::<Event>()
602 .collect::<Vec<_>>()
603 })
604 .collect::<std::result::Result<Vec<Event>, _>>()?;
605
606 let mut cache = self.global.cache.write().expect("lock poisoned");
607 cache.clone_from(&disk_events);
608
609 let next_seq = cache.len() as u64;
610 event.metadata.sequence = Some(next_seq);
611 event.metadata.id = EventId::from_ordered_u64(next_seq);
612 if let Some(last) = cache.last() {
613 if event.metadata.timestamp <= last.metadata.timestamp {
614 event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
615 }
616 }
617 let id = event.id();
618
619 let json = serde_json::to_string(&event)?;
620 writeln!(file, "{json}")?;
621 let _ = file.flush();
622 let _ = file.unlock();
623
624 cache.push(event.clone());
625 drop(cache);
626
627 if let Some(project_id) = event.metadata.correlation.project_id {
628 let project_path = self.ensure_project_index(project_id)?;
629 Self::append_mirror(&project_path, &event)?;
630 }
631 if let Some(flow_id) = event.metadata.correlation.flow_id {
632 let flow_path = self.flow_log_path(flow_id);
633 Self::append_mirror(&flow_path, &event)?;
634 }
635
636 Ok(id)
637 }
638
639 fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
640 self.global.read(filter)
641 }
642
643 fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
644 self.global.stream(filter)
645 }
646
647 fn read_all(&self) -> Result<Vec<Event>> {
648 self.global.read_all()
649 }
650}
651
652pub type SharedEventStore = Arc<dyn EventStore>;
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use crate::core::events::{CorrelationIds, EventPayload};
659
660 #[test]
661 fn in_memory_store_append_and_read() {
662 let store = InMemoryEventStore::new();
663 let project_id = Uuid::new_v4();
664
665 let event = Event::new(
666 EventPayload::ProjectCreated {
667 id: project_id,
668 name: "test".to_string(),
669 description: None,
670 },
671 CorrelationIds::for_project(project_id),
672 );
673
674 let id = store.append(event).unwrap();
675 let events = store.read_all().unwrap();
676
677 assert_eq!(events.len(), 1);
678 assert_eq!(events[0].id(), id);
679 }
680
681 #[test]
682 fn in_memory_store_filter_by_project() {
683 let store = InMemoryEventStore::new();
684 let project1 = Uuid::new_v4();
685 let project2 = Uuid::new_v4();
686
687 store
688 .append(Event::new(
689 EventPayload::ProjectCreated {
690 id: project1,
691 name: "p1".to_string(),
692 description: None,
693 },
694 CorrelationIds::for_project(project1),
695 ))
696 .unwrap();
697
698 store
699 .append(Event::new(
700 EventPayload::ProjectCreated {
701 id: project2,
702 name: "p2".to_string(),
703 description: None,
704 },
705 CorrelationIds::for_project(project2),
706 ))
707 .unwrap();
708
709 let filter = EventFilter::for_project(project1);
710 let events = store.read(&filter).unwrap();
711
712 assert_eq!(events.len(), 1);
713 }
714
715 #[test]
716 fn in_memory_store_filter_by_graph() {
717 let store = InMemoryEventStore::new();
718 let project = Uuid::new_v4();
719 let graph1 = Uuid::new_v4();
720 let graph2 = Uuid::new_v4();
721
722 store
723 .append(Event::new(
724 EventPayload::TaskGraphCreated {
725 graph_id: graph1,
726 project_id: project,
727 name: "g1".to_string(),
728 description: None,
729 },
730 CorrelationIds::for_graph(project, graph1),
731 ))
732 .unwrap();
733
734 store
735 .append(Event::new(
736 EventPayload::TaskGraphCreated {
737 graph_id: graph2,
738 project_id: project,
739 name: "g2".to_string(),
740 description: None,
741 },
742 CorrelationIds::for_graph(project, graph2),
743 ))
744 .unwrap();
745
746 let filter = EventFilter::for_graph(graph1);
747 let events = store.read(&filter).unwrap();
748 assert_eq!(events.len(), 1);
749 assert_eq!(events[0].metadata.correlation.graph_id, Some(graph1));
750 }
751
752 #[test]
753 fn in_memory_store_filter_by_time_range() {
754 let store = InMemoryEventStore::new();
755 let project = Uuid::new_v4();
756
757 store
758 .append(Event::new(
759 EventPayload::ProjectCreated {
760 id: project,
761 name: "p1".to_string(),
762 description: None,
763 },
764 CorrelationIds::for_project(project),
765 ))
766 .unwrap();
767 store
768 .append(Event::new(
769 EventPayload::ProjectUpdated {
770 id: project,
771 name: Some("p2".to_string()),
772 description: None,
773 },
774 CorrelationIds::for_project(project),
775 ))
776 .unwrap();
777
778 let all = store.read_all().unwrap();
779 assert_eq!(all.len(), 2);
780 let first_ts = all[0].metadata.timestamp;
781 let second_ts = all[1].metadata.timestamp;
782
783 let mut filter = EventFilter::all();
784 filter.since = Some(second_ts);
785 let since_events = store.read(&filter).unwrap();
786 assert_eq!(since_events.len(), 1);
787 assert_eq!(since_events[0].metadata.timestamp, second_ts);
788
789 let mut filter = EventFilter::all();
790 filter.until = Some(first_ts);
791 let until_events = store.read(&filter).unwrap();
792 assert_eq!(until_events.len(), 1);
793 assert_eq!(until_events[0].metadata.timestamp, first_ts);
794 }
795
796 #[test]
797 fn file_store_persist_and_reload() {
798 let dir = tempfile::tempdir().unwrap();
799 let path = dir.path().join("events.jsonl");
800
801 let project_id = Uuid::new_v4();
802 let event = Event::new(
803 EventPayload::ProjectCreated {
804 id: project_id,
805 name: "persist-test".to_string(),
806 description: None,
807 },
808 CorrelationIds::for_project(project_id),
809 );
810
811 {
813 let store = FileEventStore::open(path.clone()).unwrap();
814 store.append(event.clone()).unwrap();
815 }
816
817 {
819 let store = FileEventStore::open(path).unwrap();
820 let events = store.read_all().unwrap();
821 assert_eq!(events.len(), 1);
822 assert_eq!(events[0].payload, event.payload);
823 }
824 }
825
826 #[test]
827 fn file_store_ignores_unknown_event_payload_types() {
828 let dir = tempfile::tempdir().unwrap();
829 let path = dir.path().join("events.jsonl");
830
831 let project_id = Uuid::new_v4();
832 let event = Event::new(
833 EventPayload::ProjectCreated {
834 id: project_id,
835 name: "persist-test".to_string(),
836 description: None,
837 },
838 CorrelationIds::for_project(project_id),
839 );
840
841 let mut value = serde_json::to_value(&event).unwrap();
842 value["payload"]["type"] = serde_json::json!("future_event_type");
843 value["payload"]["some_new_field"] = serde_json::json!("some_value");
844 let unknown_line = serde_json::to_string(&value).unwrap();
845
846 std::fs::write(&path, format!("{unknown_line}\n")).unwrap();
847
848 let store = FileEventStore::open(path).unwrap();
849 let events = store.read_all().unwrap();
850 assert_eq!(events.len(), 1);
851 assert_eq!(events[0].payload, EventPayload::Unknown);
852 }
853}