Skip to main content

harn_vm/event_log/
file.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use futures::stream::BoxStream;
6use serde::{Deserialize, Serialize};
7
8use super::util::{
9    dir_size_bytes, now_ms, prepare_event_after, sanitize_filename, stream_from_broadcast,
10    sync_tree, write_json_atomically, BroadcastMap,
11};
12use super::{
13    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
14    EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
15};
16
17#[derive(Serialize, Deserialize)]
18struct FileRecord {
19    id: EventId,
20    event: LogEvent,
21}
22
23pub struct FileEventLog {
24    root: PathBuf,
25    latest_ids: Mutex<HashMap<String, EventId>>,
26    write_lock: Mutex<()>,
27    pub(super) broadcasts: BroadcastMap,
28    pub(super) queue_depth: usize,
29}
30
31impl FileEventLog {
32    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
33        std::fs::create_dir_all(root.join("topics"))
34            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
35        std::fs::create_dir_all(root.join("consumers"))
36            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
37        Ok(Self {
38            root,
39            latest_ids: Mutex::new(HashMap::new()),
40            write_lock: Mutex::new(()),
41            broadcasts: BroadcastMap::default(),
42            queue_depth: queue_depth.max(1),
43        })
44    }
45
46    fn topic_path(&self, topic: &Topic) -> PathBuf {
47        self.root
48            .join("topics")
49            .join(format!("{}.jsonl", topic.as_str()))
50    }
51
52    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
53        self.root.join("consumers").join(format!(
54            "{}__{}.json",
55            topic.as_str(),
56            sanitize_filename(consumer.as_str())
57        ))
58    }
59
60    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
61        if let Some(event_id) = self
62            .latest_ids
63            .lock()
64            .expect("file event log latest ids poisoned")
65            .get(topic.as_str())
66            .copied()
67        {
68            return Ok(event_id);
69        }
70
71        let mut latest = 0;
72        let path = self.topic_path(topic);
73        if path.is_file() {
74            for record in read_file_records(&path)? {
75                latest = record.id;
76            }
77        }
78        self.latest_ids
79            .lock()
80            .expect("file event log latest ids poisoned")
81            .insert(topic.as_str().to_string(), latest);
82        Ok(latest)
83    }
84
85    fn read_range_sync(
86        &self,
87        topic: &Topic,
88        from: Option<EventId>,
89        limit: usize,
90    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
91        let path = self.topic_path(topic);
92        if !path.is_file() {
93            return Ok(Vec::new());
94        }
95        let from = from.unwrap_or(0);
96        let mut events = Vec::new();
97        for record in read_file_records(&path)? {
98            if record.id > from {
99                events.push((record.id, record.event));
100            }
101            if events.len() >= limit {
102                break;
103            }
104        }
105        Ok(events)
106    }
107
108    pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
109        let topics_dir = self.root.join("topics");
110        if !topics_dir.is_dir() {
111            return Ok(Vec::new());
112        }
113        let mut topics = Vec::new();
114        for entry in std::fs::read_dir(&topics_dir)
115            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
116        {
117            let entry = entry
118                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
119            let path = entry.path();
120            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
121                continue;
122            }
123            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
124                continue;
125            };
126            topics.push(Topic::new(stem.to_string())?);
127        }
128        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
129        Ok(topics)
130    }
131
132    pub(super) fn append_idempotent_by_header(
133        &self,
134        topic: &Topic,
135        header: &str,
136        value: &str,
137        event: LogEvent,
138    ) -> Result<AppendOutcome, LogError> {
139        let _guard = self
140            .write_lock
141            .lock()
142            .expect("file event log write lock poisoned");
143        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
144        if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
145            event
146                .headers
147                .get(header)
148                .is_some_and(|found| found == value)
149        }) {
150            return Ok(AppendOutcome {
151                event_id: *event_id,
152                event: existing.clone(),
153                inserted: false,
154            });
155        }
156
157        let next_id = self.latest_id_for_topic(topic)? + 1;
158        let previous = existing_events
159            .last()
160            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
161        let event = prepare_event_after(topic, next_id, previous, event)?;
162        self.append_record_locked(topic, next_id, event)
163    }
164
165    /// Read counterpart of [`Self::append_idempotent_by_header`]. The file
166    /// backend has no header index, so this scans the topic — acceptable for a
167    /// dev/test backend (SQLite is the durable default).
168    pub(super) fn read_idempotent_by_header(
169        &self,
170        topic: &Topic,
171        header: &str,
172        value: &str,
173    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
174        let events = self.read_range_sync(topic, None, usize::MAX)?;
175        Ok(events.into_iter().find(|(_, event)| {
176            event
177                .headers
178                .get(header)
179                .is_some_and(|found| found == value)
180        }))
181    }
182
183    fn append_record_locked(
184        &self,
185        topic: &Topic,
186        event_id: EventId,
187        event: LogEvent,
188    ) -> Result<AppendOutcome, LogError> {
189        let record = FileRecord {
190            id: event_id,
191            event: event.clone(),
192        };
193        let path = self.topic_path(topic);
194        if let Some(parent) = path.parent() {
195            std::fs::create_dir_all(parent)
196                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
197        }
198        let line = serde_json::to_string(&record)
199            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
200        use std::io::Write as _;
201        let mut file = std::fs::OpenOptions::new()
202            .create(true)
203            .append(true)
204            .open(&path)
205            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
206        writeln!(file, "{line}")
207            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
208        self.latest_ids
209            .lock()
210            .expect("file event log latest ids poisoned")
211            .insert(topic.as_str().to_string(), event_id);
212        self.broadcasts
213            .publish(topic, self.queue_depth, (event_id, event.clone()));
214        Ok(AppendOutcome {
215            event_id,
216            event,
217            inserted: true,
218        })
219    }
220}
221
222fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
223    let file = std::fs::File::open(path)
224        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
225    let mut reader = std::io::BufReader::new(file);
226    let mut records = Vec::new();
227    let mut line = Vec::new();
228    loop {
229        line.clear();
230        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
231            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
232        if bytes_read == 0 {
233            break;
234        }
235        if line.iter().all(u8::is_ascii_whitespace) {
236            continue;
237        }
238        let complete_line = line.ends_with(b"\n");
239        match serde_json::from_slice::<FileRecord>(&line) {
240            Ok(record) => records.push(record),
241            Err(_) if !complete_line => break,
242            Err(error) => {
243                return Err(LogError::Serde(format!("event log parse error: {error}")));
244            }
245        }
246    }
247    Ok(records)
248}
249
250impl EventLog for FileEventLog {
251    fn describe(&self) -> EventLogDescription {
252        EventLogDescription {
253            backend: EventLogBackendKind::File,
254            location: Some(self.root.clone()),
255            size_bytes: Some(dir_size_bytes(&self.root)),
256            queue_depth: self.queue_depth,
257        }
258    }
259
260    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
261        let _guard = self
262            .write_lock
263            .lock()
264            .expect("file event log write lock poisoned");
265        let next_id = self.latest_id_for_topic(topic)? + 1;
266        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
267        let previous = existing_events
268            .last()
269            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
270        let event = prepare_event_after(topic, next_id, previous, event)?;
271        self.append_record_locked(topic, next_id, event)
272            .map(|outcome| outcome.event_id)
273    }
274
275    async fn flush(&self) -> Result<(), LogError> {
276        sync_tree(&self.root)
277    }
278
279    async fn read_range(
280        &self,
281        topic: &Topic,
282        from: Option<EventId>,
283        limit: usize,
284    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
285        self.read_range_sync(topic, from, limit)
286    }
287
288    async fn read_range_bytes(
289        &self,
290        topic: &Topic,
291        from: Option<EventId>,
292        limit: usize,
293    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
294        self.read_range_sync(topic, from, limit)?
295            .into_iter()
296            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
297            .collect()
298    }
299
300    async fn subscribe(
301        self: Arc<Self>,
302        topic: &Topic,
303        from: Option<EventId>,
304    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
305        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
306        let history = self.read_range_sync(topic, from, usize::MAX)?;
307        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
308    }
309
310    async fn ack(
311        &self,
312        topic: &Topic,
313        consumer: &ConsumerId,
314        up_to: EventId,
315    ) -> Result<(), LogError> {
316        let path = self.consumer_path(topic, consumer);
317        let payload = serde_json::json!({
318            "topic": topic.as_str(),
319            "consumer_id": consumer.as_str(),
320            "cursor": up_to,
321            "updated_at_ms": now_ms(),
322        });
323        write_json_atomically(&path, &payload)
324    }
325
326    async fn consumer_cursor(
327        &self,
328        topic: &Topic,
329        consumer: &ConsumerId,
330    ) -> Result<Option<EventId>, LogError> {
331        let path = self.consumer_path(topic, consumer);
332        if !path.is_file() {
333            return Ok(None);
334        }
335        let raw = std::fs::read_to_string(&path)
336            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
337        let payload: serde_json::Value = serde_json::from_str(&raw)
338            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
339        let cursor = payload
340            .get("cursor")
341            .and_then(serde_json::Value::as_u64)
342            .ok_or_else(|| {
343                LogError::Serde("event log consumer record missing numeric cursor".to_string())
344            })?;
345        Ok(Some(cursor))
346    }
347
348    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
349        let latest = self.latest_id_for_topic(topic)?;
350        if latest == 0 {
351            Ok(None)
352        } else {
353            Ok(Some(latest))
354        }
355    }
356
357    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
358        let _guard = self
359            .write_lock
360            .lock()
361            .expect("file event log write lock poisoned");
362        let path = self.topic_path(topic);
363        if !path.is_file() {
364            return Ok(CompactReport::default());
365        }
366        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
367        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
368        if retained.is_empty() {
369            let _ = std::fs::remove_file(&path);
370        } else {
371            crate::atomic_io::atomic_write_with(&path, |writer| {
372                use std::io::Write as _;
373                for (event_id, event) in &retained {
374                    let line = serde_json::to_string(&FileRecord {
375                        id: *event_id,
376                        event: event.clone(),
377                    })
378                    .map_err(|error| std::io::Error::other(error.to_string()))?;
379                    writeln!(writer, "{line}")?;
380                }
381                Ok(())
382            })
383            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
384        }
385        let latest = retained.last().map(|(event_id, _)| *event_id);
386        self.latest_ids
387            .lock()
388            .expect("file event log latest ids poisoned")
389            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
390        Ok(CompactReport {
391            removed,
392            remaining: retained.len(),
393            latest,
394            checkpointed: false,
395        })
396    }
397}