Skip to main content

hivemind/storage/
event_store.rs

1//! `EventStore` trait and implementations.
2//!
3//! Event stores are the persistence layer for events. All state is derived
4//! from events, so the event store is the single source of truth.
5
6use crate::core::events::{Event, EventId};
7use chrono::Duration as ChronoDuration;
8use chrono::{DateTime, Utc};
9use fs2::FileExt;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::mpsc::{self, Receiver};
13use std::sync::{Arc, RwLock};
14use std::thread;
15use std::time::Duration;
16use uuid::Uuid;
17
18fn normalize_concatenated_json_objects(line: &str) -> String {
19    let mut out = String::with_capacity(line.len());
20    let mut chars = line.chars().peekable();
21    let mut in_string = false;
22    let mut escape = false;
23
24    while let Some(c) = chars.next() {
25        if in_string {
26            if escape {
27                escape = false;
28            } else if c == '\\' {
29                escape = true;
30            } else if c == '"' {
31                in_string = false;
32            }
33        } else if c == '"' {
34            in_string = true;
35        }
36
37        out.push(c);
38
39        if !in_string && c == '}' && chars.peek().copied() == Some('{') {
40            out.push('\n');
41        }
42    }
43
44    out
45}
46
47/// Errors that can occur in the event store.
48#[derive(Debug, thiserror::Error)]
49pub enum EventStoreError {
50    #[error("IO error: {0}")]
51    Io(#[from] std::io::Error),
52    #[error("Serialization error: {0}")]
53    Serialization(#[from] serde_json::Error),
54    #[error("Event not found: {0}")]
55    NotFound(EventId),
56}
57
58/// Result type for event store operations.
59pub type Result<T> = std::result::Result<T, EventStoreError>;
60
61/// Filter for querying events.
62#[derive(Debug, Default, Clone)]
63pub struct EventFilter {
64    /// Filter by project ID.
65    pub project_id: Option<Uuid>,
66    /// Filter by graph ID.
67    pub graph_id: Option<Uuid>,
68    /// Filter by task ID.
69    pub task_id: Option<Uuid>,
70    /// Filter by flow ID.
71    pub flow_id: Option<Uuid>,
72    /// Filter by attempt ID.
73    pub attempt_id: Option<Uuid>,
74    /// Include events at or after this timestamp.
75    pub since: Option<DateTime<Utc>>,
76    /// Include events at or before this timestamp.
77    pub until: Option<DateTime<Utc>>,
78    /// Maximum number of events to return.
79    pub limit: Option<usize>,
80}
81
82impl EventFilter {
83    /// Creates an empty filter (matches all events).
84    #[must_use]
85    pub fn all() -> Self {
86        Self::default()
87    }
88
89    /// Filter by project ID.
90    #[must_use]
91    pub fn for_project(project_id: Uuid) -> Self {
92        Self {
93            project_id: Some(project_id),
94            ..Default::default()
95        }
96    }
97
98    #[must_use]
99    pub fn for_graph(graph_id: Uuid) -> Self {
100        Self {
101            graph_id: Some(graph_id),
102            ..Default::default()
103        }
104    }
105
106    /// Checks if an event matches this filter.
107    #[must_use]
108    pub fn matches(&self, event: &Event) -> bool {
109        if let Some(pid) = self.project_id {
110            if event.metadata.correlation.project_id != Some(pid) {
111                return false;
112            }
113        }
114        if let Some(gid) = self.graph_id {
115            if event.metadata.correlation.graph_id != Some(gid) {
116                return false;
117            }
118        }
119        if let Some(tid) = self.task_id {
120            if event.metadata.correlation.task_id != Some(tid) {
121                return false;
122            }
123        }
124        if let Some(fid) = self.flow_id {
125            if event.metadata.correlation.flow_id != Some(fid) {
126                return false;
127            }
128        }
129        if let Some(aid) = self.attempt_id {
130            if event.metadata.correlation.attempt_id != Some(aid) {
131                return false;
132            }
133        }
134        if let Some(since) = self.since {
135            if event.metadata.timestamp < since {
136                return false;
137            }
138        }
139        if let Some(until) = self.until {
140            if event.metadata.timestamp > until {
141                return false;
142            }
143        }
144        true
145    }
146}
147
148/// Trait for event storage backends.
149pub trait EventStore: Send + Sync {
150    /// Appends an event to the store, returning its assigned ID.
151    fn append(&self, event: Event) -> Result<EventId>;
152
153    /// Reads events matching the filter.
154    fn read(&self, filter: &EventFilter) -> Result<Vec<Event>>;
155
156    /// Streams events matching the filter.
157    ///
158    /// Implementations should emit matching historical events first, then continue
159    /// yielding new events as they are appended.
160    fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>>;
161
162    /// Reads all events in order.
163    fn read_all(&self) -> Result<Vec<Event>>;
164}
165
166/// In-memory event store for testing.
167#[derive(Debug, Default)]
168pub struct InMemoryEventStore {
169    events: Arc<RwLock<Vec<Event>>>,
170}
171
172impl InMemoryEventStore {
173    /// Creates a new empty in-memory store.
174    #[must_use]
175    pub fn new() -> Self {
176        Self {
177            events: Arc::new(RwLock::new(Vec::new())),
178        }
179    }
180}
181
182#[allow(clippy::significant_drop_tightening)]
183impl EventStore for InMemoryEventStore {
184    fn append(&self, mut event: Event) -> Result<EventId> {
185        let mut events = self.events.write().expect("lock poisoned");
186        let next_seq = events.len() as u64;
187        event.metadata.sequence = Some(next_seq);
188        event.metadata.id = EventId::from_ordered_u64(next_seq);
189        if let Some(last) = events.last() {
190            if event.metadata.timestamp <= last.metadata.timestamp {
191                event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
192            }
193        }
194        let id = event.id();
195        events.push(event);
196        Ok(id)
197    }
198
199    fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
200        let events = self.events.read().expect("lock poisoned");
201        let mut result: Vec<Event> = events
202            .iter()
203            .filter(|e| filter.matches(e))
204            .cloned()
205            .collect();
206        if let Some(limit) = filter.limit {
207            result.truncate(limit);
208        }
209        Ok(result)
210    }
211
212    fn read_all(&self) -> Result<Vec<Event>> {
213        let events = self.events.read().expect("lock poisoned");
214        Ok(events.clone())
215    }
216
217    fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
218        let (tx, rx) = mpsc::channel();
219        let filter = filter.clone();
220        let events = Arc::clone(&self.events);
221
222        thread::spawn(move || {
223            let mut sent = 0usize;
224            let mut seen = 0usize;
225
226            loop {
227                let snapshot = {
228                    let guard = events.read().expect("lock poisoned");
229                    guard.clone()
230                };
231
232                for ev in snapshot.iter().skip(seen) {
233                    if !filter.matches(ev) {
234                        continue;
235                    }
236
237                    if tx.send(ev.clone()).is_err() {
238                        return;
239                    }
240
241                    sent += 1;
242                    if let Some(limit) = filter.limit {
243                        if sent >= limit {
244                            return;
245                        }
246                    }
247                }
248
249                seen = snapshot.len();
250                thread::sleep(Duration::from_millis(200));
251            }
252        });
253
254        Ok(rx)
255    }
256}
257
258/// File-based event store (append-only JSON lines).
259#[derive(Debug)]
260pub struct FileEventStore {
261    path: PathBuf,
262    cache: RwLock<Vec<Event>>,
263}
264
265impl FileEventStore {
266    /// Creates or opens a file-based event store.
267    ///
268    /// # Errors
269    /// Returns an error if the file cannot be created or read.
270    pub fn open(path: PathBuf) -> Result<Self> {
271        if let Some(parent) = path.parent() {
272            std::fs::create_dir_all(parent)?;
273        }
274
275        let file = std::fs::OpenOptions::new()
276            .read(true)
277            .write(true)
278            .create(true)
279            .truncate(false)
280            .open(&path)?;
281        file.lock_shared()?;
282
283        let mut content = String::new();
284        {
285            use std::io::Read;
286            let mut reader = std::io::BufReader::new(&file);
287            reader.read_to_string(&mut content)?;
288        }
289
290        file.unlock()?;
291
292        let cache = content
293            .lines()
294            .filter(|l| !l.trim().is_empty())
295            .flat_map(|line| {
296                let normalized = normalize_concatenated_json_objects(line);
297                serde_json::Deserializer::from_str(&normalized)
298                    .into_iter::<Event>()
299                    .collect::<Vec<_>>()
300            })
301            .collect::<std::result::Result<Vec<Event>, _>>()?;
302
303        Ok(Self {
304            path,
305            cache: RwLock::new(cache),
306        })
307    }
308
309    /// Returns the path to the event file.
310    #[must_use]
311    pub const fn path(&self) -> &PathBuf {
312        &self.path
313    }
314}
315
316#[allow(clippy::significant_drop_tightening)]
317impl EventStore for FileEventStore {
318    fn append(&self, mut event: Event) -> Result<EventId> {
319        use std::fs::OpenOptions;
320        use std::io::Write;
321
322        let mut file = OpenOptions::new()
323            .read(true)
324            .create(true)
325            .append(true)
326            .open(&self.path)?;
327        file.lock_exclusive()?;
328
329        let mut content = String::new();
330        {
331            use std::io::{Read, Seek};
332            let _ = file.rewind();
333            let mut reader = std::io::BufReader::new(&file);
334            reader.read_to_string(&mut content)?;
335        }
336
337        let disk_events = content
338            .lines()
339            .filter(|l| !l.trim().is_empty())
340            .flat_map(|line| {
341                let normalized = normalize_concatenated_json_objects(line);
342                serde_json::Deserializer::from_str(&normalized)
343                    .into_iter::<Event>()
344                    .collect::<Vec<_>>()
345            })
346            .collect::<std::result::Result<Vec<Event>, _>>()?;
347
348        let mut cache = self.cache.write().expect("lock poisoned");
349        cache.clone_from(&disk_events);
350
351        let next_seq = cache.len() as u64;
352        event.metadata.sequence = Some(next_seq);
353        event.metadata.id = EventId::from_ordered_u64(next_seq);
354        if let Some(last) = cache.last() {
355            if event.metadata.timestamp <= last.metadata.timestamp {
356                event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
357            }
358        }
359        let id = event.id();
360
361        let json = serde_json::to_string(&event)?;
362        writeln!(file, "{json}")?;
363        let _ = file.flush();
364        let _ = file.unlock();
365
366        cache.push(event);
367        Ok(id)
368    }
369
370    fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
371        let cache = self.cache.read().expect("lock poisoned");
372        let mut result: Vec<Event> = cache
373            .iter()
374            .filter(|e| filter.matches(e))
375            .cloned()
376            .collect();
377        if let Some(limit) = filter.limit {
378            result.truncate(limit);
379        }
380        Ok(result)
381    }
382
383    fn read_all(&self) -> Result<Vec<Event>> {
384        let cache = self.cache.read().expect("lock poisoned");
385        Ok(cache.clone())
386    }
387
388    fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
389        let (tx, rx) = mpsc::channel();
390        let filter = filter.clone();
391        let path = self.path.clone();
392
393        thread::spawn(move || {
394            let mut sent = 0usize;
395            let mut seen = 0usize;
396
397            loop {
398                let Ok(content) = std::fs::read_to_string(&path) else {
399                    thread::sleep(Duration::from_millis(200));
400                    continue;
401                };
402
403                let Ok(events) = content
404                    .lines()
405                    .filter(|l| !l.trim().is_empty())
406                    .flat_map(|line| {
407                        let normalized = normalize_concatenated_json_objects(line);
408                        serde_json::Deserializer::from_str(&normalized)
409                            .into_iter::<Event>()
410                            .collect::<Vec<_>>()
411                    })
412                    .collect::<std::result::Result<Vec<Event>, _>>()
413                else {
414                    thread::sleep(Duration::from_millis(200));
415                    continue;
416                };
417
418                for ev in events.iter().skip(seen) {
419                    if !filter.matches(ev) {
420                        continue;
421                    }
422
423                    if tx.send(ev.clone()).is_err() {
424                        return;
425                    }
426
427                    sent += 1;
428                    if let Some(limit) = filter.limit {
429                        if sent >= limit {
430                            return;
431                        }
432                    }
433                }
434
435                seen = events.len();
436                thread::sleep(Duration::from_millis(200));
437            }
438        });
439
440        Ok(rx)
441    }
442}
443
444#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
445struct RegistryIndexDisk {
446    projects: HashMap<String, String>,
447}
448
449/// Event store that maintains per-project and per-flow append-only logs plus an index.
450///
451/// Reads and streams are currently served from the global `events.jsonl` file to preserve
452/// backwards compatibility and keep read ordering identical to the legacy store.
453#[derive(Debug)]
454pub struct IndexedEventStore {
455    index_path: PathBuf,
456    projects_dir: PathBuf,
457    flows_dir: PathBuf,
458    global: FileEventStore,
459}
460
461impl IndexedEventStore {
462    /// Opens (or creates) an indexed store rooted at `base_dir`.
463    ///
464    /// This will create:
465    /// - `base_dir/index.json`
466    /// - `base_dir/projects/<project_id>/events.jsonl`
467    /// - `base_dir/flows/<flow_id>/events.jsonl`
468    /// - `base_dir/events.jsonl` (global, legacy-compatible)
469    pub fn open(base_dir: &Path) -> Result<Self> {
470        std::fs::create_dir_all(base_dir)?;
471
472        let index_path = base_dir.join("index.json");
473        let projects_dir = base_dir.join("projects");
474        let flows_dir = base_dir.join("flows");
475        std::fs::create_dir_all(&projects_dir)?;
476        std::fs::create_dir_all(&flows_dir)?;
477
478        if !index_path.exists() {
479            let disk = RegistryIndexDisk::default();
480            std::fs::write(&index_path, serde_json::to_string_pretty(&disk)?)?;
481        }
482
483        let global = FileEventStore::open(base_dir.join("events.jsonl"))?;
484
485        Ok(Self {
486            index_path,
487            projects_dir,
488            flows_dir,
489            global,
490        })
491    }
492
493    fn project_log_rel(project_id: Uuid) -> String {
494        format!("projects/{project_id}/events.jsonl")
495    }
496
497    fn flow_log_path(&self, flow_id: Uuid) -> PathBuf {
498        self.flows_dir
499            .join(flow_id.to_string())
500            .join("events.jsonl")
501    }
502
503    fn ensure_project_index(&self, project_id: Uuid) -> Result<PathBuf> {
504        use std::io::{Read, Seek, Write};
505
506        let rel = Self::project_log_rel(project_id);
507        let abs = self
508            .projects_dir
509            .join(project_id.to_string())
510            .join("events.jsonl");
511
512        if let Some(parent) = abs.parent() {
513            std::fs::create_dir_all(parent)?;
514        }
515
516        let mut file = std::fs::OpenOptions::new()
517            .read(true)
518            .write(true)
519            .create(true)
520            .truncate(false)
521            .open(&self.index_path)?;
522        file.lock_exclusive()?;
523
524        let mut content = String::new();
525        {
526            let _ = file.rewind();
527            let mut reader = std::io::BufReader::new(&file);
528            reader.read_to_string(&mut content)?;
529        }
530
531        let mut disk: RegistryIndexDisk = if content.trim().is_empty() {
532            RegistryIndexDisk::default()
533        } else {
534            serde_json::from_str(&content).unwrap_or_default()
535        };
536
537        disk.projects
538            .entry(project_id.to_string())
539            .or_insert_with(|| rel.clone());
540
541        let json = serde_json::to_string_pretty(&disk)?;
542        {
543            let _ = file.rewind();
544            file.set_len(0)?;
545            file.write_all(json.as_bytes())?;
546            let _ = file.flush();
547        }
548        let _ = file.unlock();
549
550        Ok(abs)
551    }
552
553    fn append_mirror(path: &PathBuf, event: &Event) -> Result<()> {
554        use std::io::Write;
555        if let Some(parent) = path.parent() {
556            std::fs::create_dir_all(parent)?;
557        }
558        let mut file = std::fs::OpenOptions::new()
559            .read(true)
560            .create(true)
561            .append(true)
562            .open(path)?;
563        file.lock_exclusive()?;
564        let json = serde_json::to_string(event)?;
565        writeln!(file, "{json}")?;
566        let _ = file.flush();
567        let _ = file.unlock();
568        Ok(())
569    }
570}
571
572#[allow(clippy::significant_drop_tightening)]
573impl EventStore for IndexedEventStore {
574    fn append(&self, mut event: Event) -> Result<EventId> {
575        use std::fs::OpenOptions;
576        use std::io::Write;
577
578        // Append to the global log using the same semantics as FileEventStore,
579        // while keeping the fully-populated event value for mirroring.
580        let mut file = OpenOptions::new()
581            .read(true)
582            .create(true)
583            .append(true)
584            .open(&self.global.path)?;
585        file.lock_exclusive()?;
586
587        let mut content = String::new();
588        {
589            use std::io::{Read, Seek};
590            let _ = file.rewind();
591            let mut reader = std::io::BufReader::new(&file);
592            reader.read_to_string(&mut content)?;
593        }
594
595        let disk_events = content
596            .lines()
597            .filter(|l| !l.trim().is_empty())
598            .flat_map(|line| {
599                let normalized = normalize_concatenated_json_objects(line);
600                serde_json::Deserializer::from_str(&normalized)
601                    .into_iter::<Event>()
602                    .collect::<Vec<_>>()
603            })
604            .collect::<std::result::Result<Vec<Event>, _>>()?;
605
606        let mut cache = self.global.cache.write().expect("lock poisoned");
607        cache.clone_from(&disk_events);
608
609        let next_seq = cache.len() as u64;
610        event.metadata.sequence = Some(next_seq);
611        event.metadata.id = EventId::from_ordered_u64(next_seq);
612        if let Some(last) = cache.last() {
613            if event.metadata.timestamp <= last.metadata.timestamp {
614                event.metadata.timestamp = last.metadata.timestamp + ChronoDuration::nanoseconds(1);
615            }
616        }
617        let id = event.id();
618
619        let json = serde_json::to_string(&event)?;
620        writeln!(file, "{json}")?;
621        let _ = file.flush();
622        let _ = file.unlock();
623
624        cache.push(event.clone());
625        drop(cache);
626
627        if let Some(project_id) = event.metadata.correlation.project_id {
628            let project_path = self.ensure_project_index(project_id)?;
629            Self::append_mirror(&project_path, &event)?;
630        }
631        if let Some(flow_id) = event.metadata.correlation.flow_id {
632            let flow_path = self.flow_log_path(flow_id);
633            Self::append_mirror(&flow_path, &event)?;
634        }
635
636        Ok(id)
637    }
638
639    fn read(&self, filter: &EventFilter) -> Result<Vec<Event>> {
640        self.global.read(filter)
641    }
642
643    fn stream(&self, filter: &EventFilter) -> Result<Receiver<Event>> {
644        self.global.stream(filter)
645    }
646
647    fn read_all(&self) -> Result<Vec<Event>> {
648        self.global.read_all()
649    }
650}
651
652/// Thread-safe wrapper for any event store.
653pub type SharedEventStore = Arc<dyn EventStore>;
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658    use crate::core::events::{CorrelationIds, EventPayload};
659
660    #[test]
661    fn in_memory_store_append_and_read() {
662        let store = InMemoryEventStore::new();
663        let project_id = Uuid::new_v4();
664
665        let event = Event::new(
666            EventPayload::ProjectCreated {
667                id: project_id,
668                name: "test".to_string(),
669                description: None,
670            },
671            CorrelationIds::for_project(project_id),
672        );
673
674        let id = store.append(event).unwrap();
675        let events = store.read_all().unwrap();
676
677        assert_eq!(events.len(), 1);
678        assert_eq!(events[0].id(), id);
679    }
680
681    #[test]
682    fn in_memory_store_filter_by_project() {
683        let store = InMemoryEventStore::new();
684        let project1 = Uuid::new_v4();
685        let project2 = Uuid::new_v4();
686
687        store
688            .append(Event::new(
689                EventPayload::ProjectCreated {
690                    id: project1,
691                    name: "p1".to_string(),
692                    description: None,
693                },
694                CorrelationIds::for_project(project1),
695            ))
696            .unwrap();
697
698        store
699            .append(Event::new(
700                EventPayload::ProjectCreated {
701                    id: project2,
702                    name: "p2".to_string(),
703                    description: None,
704                },
705                CorrelationIds::for_project(project2),
706            ))
707            .unwrap();
708
709        let filter = EventFilter::for_project(project1);
710        let events = store.read(&filter).unwrap();
711
712        assert_eq!(events.len(), 1);
713    }
714
715    #[test]
716    fn in_memory_store_filter_by_graph() {
717        let store = InMemoryEventStore::new();
718        let project = Uuid::new_v4();
719        let graph1 = Uuid::new_v4();
720        let graph2 = Uuid::new_v4();
721
722        store
723            .append(Event::new(
724                EventPayload::TaskGraphCreated {
725                    graph_id: graph1,
726                    project_id: project,
727                    name: "g1".to_string(),
728                    description: None,
729                },
730                CorrelationIds::for_graph(project, graph1),
731            ))
732            .unwrap();
733
734        store
735            .append(Event::new(
736                EventPayload::TaskGraphCreated {
737                    graph_id: graph2,
738                    project_id: project,
739                    name: "g2".to_string(),
740                    description: None,
741                },
742                CorrelationIds::for_graph(project, graph2),
743            ))
744            .unwrap();
745
746        let filter = EventFilter::for_graph(graph1);
747        let events = store.read(&filter).unwrap();
748        assert_eq!(events.len(), 1);
749        assert_eq!(events[0].metadata.correlation.graph_id, Some(graph1));
750    }
751
752    #[test]
753    fn in_memory_store_filter_by_time_range() {
754        let store = InMemoryEventStore::new();
755        let project = Uuid::new_v4();
756
757        store
758            .append(Event::new(
759                EventPayload::ProjectCreated {
760                    id: project,
761                    name: "p1".to_string(),
762                    description: None,
763                },
764                CorrelationIds::for_project(project),
765            ))
766            .unwrap();
767        store
768            .append(Event::new(
769                EventPayload::ProjectUpdated {
770                    id: project,
771                    name: Some("p2".to_string()),
772                    description: None,
773                },
774                CorrelationIds::for_project(project),
775            ))
776            .unwrap();
777
778        let all = store.read_all().unwrap();
779        assert_eq!(all.len(), 2);
780        let first_ts = all[0].metadata.timestamp;
781        let second_ts = all[1].metadata.timestamp;
782
783        let mut filter = EventFilter::all();
784        filter.since = Some(second_ts);
785        let since_events = store.read(&filter).unwrap();
786        assert_eq!(since_events.len(), 1);
787        assert_eq!(since_events[0].metadata.timestamp, second_ts);
788
789        let mut filter = EventFilter::all();
790        filter.until = Some(first_ts);
791        let until_events = store.read(&filter).unwrap();
792        assert_eq!(until_events.len(), 1);
793        assert_eq!(until_events[0].metadata.timestamp, first_ts);
794    }
795
796    #[test]
797    fn file_store_persist_and_reload() {
798        let dir = tempfile::tempdir().unwrap();
799        let path = dir.path().join("events.jsonl");
800
801        let project_id = Uuid::new_v4();
802        let event = Event::new(
803            EventPayload::ProjectCreated {
804                id: project_id,
805                name: "persist-test".to_string(),
806                description: None,
807            },
808            CorrelationIds::for_project(project_id),
809        );
810
811        // Write
812        {
813            let store = FileEventStore::open(path.clone()).unwrap();
814            store.append(event.clone()).unwrap();
815        }
816
817        // Reload
818        {
819            let store = FileEventStore::open(path).unwrap();
820            let events = store.read_all().unwrap();
821            assert_eq!(events.len(), 1);
822            assert_eq!(events[0].payload, event.payload);
823        }
824    }
825
826    #[test]
827    fn file_store_ignores_unknown_event_payload_types() {
828        let dir = tempfile::tempdir().unwrap();
829        let path = dir.path().join("events.jsonl");
830
831        let project_id = Uuid::new_v4();
832        let event = Event::new(
833            EventPayload::ProjectCreated {
834                id: project_id,
835                name: "persist-test".to_string(),
836                description: None,
837            },
838            CorrelationIds::for_project(project_id),
839        );
840
841        let mut value = serde_json::to_value(&event).unwrap();
842        value["payload"]["type"] = serde_json::json!("future_event_type");
843        value["payload"]["some_new_field"] = serde_json::json!("some_value");
844        let unknown_line = serde_json::to_string(&value).unwrap();
845
846        std::fs::write(&path, format!("{unknown_line}\n")).unwrap();
847
848        let store = FileEventStore::open(path).unwrap();
849        let events = store.read_all().unwrap();
850        assert_eq!(events.len(), 1);
851        assert_eq!(events[0].payload, EventPayload::Unknown);
852    }
853}