Skip to main content

harn_vm/event_log/
file.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use futures::stream::BoxStream;
6use serde::{Deserialize, Serialize};
7
8use super::util::{
9    dir_size_bytes, now_ms, prepare_event_after, sanitize_filename, stream_from_broadcast,
10    sync_tree, write_json_atomically, BroadcastMap,
11};
12use super::{
13    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
14    EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
15};
16
17#[derive(Serialize, Deserialize)]
18struct FileRecord {
19    id: EventId,
20    event: LogEvent,
21}
22
23pub struct FileEventLog {
24    root: PathBuf,
25    latest_ids: Mutex<HashMap<String, EventId>>,
26    write_lock: Mutex<()>,
27    pub(super) broadcasts: BroadcastMap,
28    pub(super) queue_depth: usize,
29}
30
31impl FileEventLog {
32    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
33        std::fs::create_dir_all(root.join("topics"))
34            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
35        std::fs::create_dir_all(root.join("consumers"))
36            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
37        Ok(Self {
38            root,
39            latest_ids: Mutex::new(HashMap::new()),
40            write_lock: Mutex::new(()),
41            broadcasts: BroadcastMap::default(),
42            queue_depth: queue_depth.max(1),
43        })
44    }
45
46    fn topic_path(&self, topic: &Topic) -> PathBuf {
47        self.root
48            .join("topics")
49            .join(format!("{}.jsonl", topic.as_str()))
50    }
51
52    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
53        self.root.join("consumers").join(format!(
54            "{}__{}.json",
55            topic.as_str(),
56            sanitize_filename(consumer.as_str())
57        ))
58    }
59
60    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
61        if let Some(event_id) = self
62            .latest_ids
63            .lock()
64            .expect("file event log latest ids poisoned")
65            .get(topic.as_str())
66            .copied()
67        {
68            return Ok(event_id);
69        }
70
71        let mut latest = 0;
72        let path = self.topic_path(topic);
73        if path.is_file() {
74            for record in read_file_records(&path)? {
75                latest = record.id;
76            }
77        }
78        self.latest_ids
79            .lock()
80            .expect("file event log latest ids poisoned")
81            .insert(topic.as_str().to_string(), latest);
82        Ok(latest)
83    }
84
85    fn read_range_sync(
86        &self,
87        topic: &Topic,
88        from: Option<EventId>,
89        limit: usize,
90    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
91        let path = self.topic_path(topic);
92        if !path.is_file() {
93            return Ok(Vec::new());
94        }
95        let from = from.unwrap_or(0);
96        let mut events = Vec::new();
97        for record in read_file_records(&path)? {
98            if record.id > from {
99                events.push((record.id, record.event));
100            }
101            if events.len() >= limit {
102                break;
103            }
104        }
105        Ok(events)
106    }
107
108    pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
109        let topics_dir = self.root.join("topics");
110        if !topics_dir.is_dir() {
111            return Ok(Vec::new());
112        }
113        let mut topics = Vec::new();
114        for entry in std::fs::read_dir(&topics_dir)
115            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
116        {
117            let entry = entry
118                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
119            let path = entry.path();
120            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
121                continue;
122            }
123            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
124                continue;
125            };
126            topics.push(Topic::new(stem.to_string())?);
127        }
128        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
129        Ok(topics)
130    }
131
132    pub(super) fn append_idempotent_by_header(
133        &self,
134        topic: &Topic,
135        header: &str,
136        value: &str,
137        event: LogEvent,
138    ) -> Result<AppendOutcome, LogError> {
139        let _guard = self
140            .write_lock
141            .lock()
142            .expect("file event log write lock poisoned");
143        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
144        if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
145            event
146                .headers
147                .get(header)
148                .is_some_and(|found| found == value)
149        }) {
150            return Ok(AppendOutcome {
151                event_id: *event_id,
152                event: existing.clone(),
153                inserted: false,
154            });
155        }
156
157        let next_id = self.latest_id_for_topic(topic)? + 1;
158        let previous = existing_events
159            .last()
160            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
161        let event = prepare_event_after(topic, next_id, previous, event)?;
162        self.append_record_locked(topic, next_id, event)
163    }
164
165    fn append_record_locked(
166        &self,
167        topic: &Topic,
168        event_id: EventId,
169        event: LogEvent,
170    ) -> Result<AppendOutcome, LogError> {
171        let record = FileRecord {
172            id: event_id,
173            event: event.clone(),
174        };
175        let path = self.topic_path(topic);
176        if let Some(parent) = path.parent() {
177            std::fs::create_dir_all(parent)
178                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
179        }
180        let line = serde_json::to_string(&record)
181            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
182        use std::io::Write as _;
183        let mut file = std::fs::OpenOptions::new()
184            .create(true)
185            .append(true)
186            .open(&path)
187            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
188        writeln!(file, "{line}")
189            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
190        self.latest_ids
191            .lock()
192            .expect("file event log latest ids poisoned")
193            .insert(topic.as_str().to_string(), event_id);
194        self.broadcasts
195            .publish(topic, self.queue_depth, (event_id, event.clone()));
196        Ok(AppendOutcome {
197            event_id,
198            event,
199            inserted: true,
200        })
201    }
202}
203
204fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
205    let file = std::fs::File::open(path)
206        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
207    let mut reader = std::io::BufReader::new(file);
208    let mut records = Vec::new();
209    let mut line = Vec::new();
210    loop {
211        line.clear();
212        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
213            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
214        if bytes_read == 0 {
215            break;
216        }
217        if line.iter().all(u8::is_ascii_whitespace) {
218            continue;
219        }
220        let complete_line = line.ends_with(b"\n");
221        match serde_json::from_slice::<FileRecord>(&line) {
222            Ok(record) => records.push(record),
223            Err(_) if !complete_line => break,
224            Err(error) => {
225                return Err(LogError::Serde(format!("event log parse error: {error}")));
226            }
227        }
228    }
229    Ok(records)
230}
231
232impl EventLog for FileEventLog {
233    fn describe(&self) -> EventLogDescription {
234        EventLogDescription {
235            backend: EventLogBackendKind::File,
236            location: Some(self.root.clone()),
237            size_bytes: Some(dir_size_bytes(&self.root)),
238            queue_depth: self.queue_depth,
239        }
240    }
241
242    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
243        let _guard = self
244            .write_lock
245            .lock()
246            .expect("file event log write lock poisoned");
247        let next_id = self.latest_id_for_topic(topic)? + 1;
248        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
249        let previous = existing_events
250            .last()
251            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
252        let event = prepare_event_after(topic, next_id, previous, event)?;
253        self.append_record_locked(topic, next_id, event)
254            .map(|outcome| outcome.event_id)
255    }
256
257    async fn flush(&self) -> Result<(), LogError> {
258        sync_tree(&self.root)
259    }
260
261    async fn read_range(
262        &self,
263        topic: &Topic,
264        from: Option<EventId>,
265        limit: usize,
266    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
267        self.read_range_sync(topic, from, limit)
268    }
269
270    async fn read_range_bytes(
271        &self,
272        topic: &Topic,
273        from: Option<EventId>,
274        limit: usize,
275    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
276        self.read_range_sync(topic, from, limit)?
277            .into_iter()
278            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
279            .collect()
280    }
281
282    async fn subscribe(
283        self: Arc<Self>,
284        topic: &Topic,
285        from: Option<EventId>,
286    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
287        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
288        let history = self.read_range_sync(topic, from, usize::MAX)?;
289        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
290    }
291
292    async fn ack(
293        &self,
294        topic: &Topic,
295        consumer: &ConsumerId,
296        up_to: EventId,
297    ) -> Result<(), LogError> {
298        let path = self.consumer_path(topic, consumer);
299        let payload = serde_json::json!({
300            "topic": topic.as_str(),
301            "consumer_id": consumer.as_str(),
302            "cursor": up_to,
303            "updated_at_ms": now_ms(),
304        });
305        write_json_atomically(&path, &payload)
306    }
307
308    async fn consumer_cursor(
309        &self,
310        topic: &Topic,
311        consumer: &ConsumerId,
312    ) -> Result<Option<EventId>, LogError> {
313        let path = self.consumer_path(topic, consumer);
314        if !path.is_file() {
315            return Ok(None);
316        }
317        let raw = std::fs::read_to_string(&path)
318            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
319        let payload: serde_json::Value = serde_json::from_str(&raw)
320            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
321        let cursor = payload
322            .get("cursor")
323            .and_then(serde_json::Value::as_u64)
324            .ok_or_else(|| {
325                LogError::Serde("event log consumer record missing numeric cursor".to_string())
326            })?;
327        Ok(Some(cursor))
328    }
329
330    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
331        let latest = self.latest_id_for_topic(topic)?;
332        if latest == 0 {
333            Ok(None)
334        } else {
335            Ok(Some(latest))
336        }
337    }
338
339    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
340        let _guard = self
341            .write_lock
342            .lock()
343            .expect("file event log write lock poisoned");
344        let path = self.topic_path(topic);
345        if !path.is_file() {
346            return Ok(CompactReport::default());
347        }
348        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
349        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
350        if retained.is_empty() {
351            let _ = std::fs::remove_file(&path);
352        } else {
353            crate::atomic_io::atomic_write_with(&path, |writer| {
354                use std::io::Write as _;
355                for (event_id, event) in &retained {
356                    let line = serde_json::to_string(&FileRecord {
357                        id: *event_id,
358                        event: event.clone(),
359                    })
360                    .map_err(|error| std::io::Error::other(error.to_string()))?;
361                    writeln!(writer, "{line}")?;
362                }
363                Ok(())
364            })
365            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
366        }
367        let latest = retained.last().map(|(event_id, _)| *event_id);
368        self.latest_ids
369            .lock()
370            .expect("file event log latest ids poisoned")
371            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
372        Ok(CompactReport {
373            removed,
374            remaining: retained.len(),
375            latest,
376            checkpointed: false,
377        })
378    }
379}