1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use futures::stream::BoxStream;
6use serde::{Deserialize, Serialize};
7
8use super::util::{
9 dir_size_bytes, now_ms, prepare_event_after, sanitize_filename, stream_from_broadcast,
10 sync_tree, write_json_atomically, BroadcastMap,
11};
12use super::{
13 AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
14 EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
15};
16
17#[derive(Serialize, Deserialize)]
18struct FileRecord {
19 id: EventId,
20 event: LogEvent,
21}
22
23pub struct FileEventLog {
24 root: PathBuf,
25 latest_ids: Mutex<HashMap<String, EventId>>,
26 write_lock: Mutex<()>,
27 pub(super) broadcasts: BroadcastMap,
28 pub(super) queue_depth: usize,
29}
30
31impl FileEventLog {
32 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
33 std::fs::create_dir_all(root.join("topics"))
34 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
35 std::fs::create_dir_all(root.join("consumers"))
36 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
37 Ok(Self {
38 root,
39 latest_ids: Mutex::new(HashMap::new()),
40 write_lock: Mutex::new(()),
41 broadcasts: BroadcastMap::default(),
42 queue_depth: queue_depth.max(1),
43 })
44 }
45
46 fn topic_path(&self, topic: &Topic) -> PathBuf {
47 self.root
48 .join("topics")
49 .join(format!("{}.jsonl", topic.as_str()))
50 }
51
52 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
53 self.root.join("consumers").join(format!(
54 "{}__{}.json",
55 topic.as_str(),
56 sanitize_filename(consumer.as_str())
57 ))
58 }
59
60 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
61 if let Some(event_id) = self
62 .latest_ids
63 .lock()
64 .expect("file event log latest ids poisoned")
65 .get(topic.as_str())
66 .copied()
67 {
68 return Ok(event_id);
69 }
70
71 let mut latest = 0;
72 let path = self.topic_path(topic);
73 if path.is_file() {
74 for record in read_file_records(&path)? {
75 latest = record.id;
76 }
77 }
78 self.latest_ids
79 .lock()
80 .expect("file event log latest ids poisoned")
81 .insert(topic.as_str().to_string(), latest);
82 Ok(latest)
83 }
84
85 fn read_range_sync(
86 &self,
87 topic: &Topic,
88 from: Option<EventId>,
89 limit: usize,
90 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
91 let path = self.topic_path(topic);
92 if !path.is_file() {
93 return Ok(Vec::new());
94 }
95 let from = from.unwrap_or(0);
96 let mut events = Vec::new();
97 for record in read_file_records(&path)? {
98 if record.id > from {
99 events.push((record.id, record.event));
100 }
101 if events.len() >= limit {
102 break;
103 }
104 }
105 Ok(events)
106 }
107
108 pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
109 let topics_dir = self.root.join("topics");
110 if !topics_dir.is_dir() {
111 return Ok(Vec::new());
112 }
113 let mut topics = Vec::new();
114 for entry in std::fs::read_dir(&topics_dir)
115 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
116 {
117 let entry = entry
118 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
119 let path = entry.path();
120 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
121 continue;
122 }
123 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
124 continue;
125 };
126 topics.push(Topic::new(stem.to_string())?);
127 }
128 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
129 Ok(topics)
130 }
131
132 pub(super) fn append_idempotent_by_header(
133 &self,
134 topic: &Topic,
135 header: &str,
136 value: &str,
137 event: LogEvent,
138 ) -> Result<AppendOutcome, LogError> {
139 let _guard = self
140 .write_lock
141 .lock()
142 .expect("file event log write lock poisoned");
143 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
144 if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
145 event
146 .headers
147 .get(header)
148 .is_some_and(|found| found == value)
149 }) {
150 return Ok(AppendOutcome {
151 event_id: *event_id,
152 event: existing.clone(),
153 inserted: false,
154 });
155 }
156
157 let next_id = self.latest_id_for_topic(topic)? + 1;
158 let previous = existing_events
159 .last()
160 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
161 let event = prepare_event_after(topic, next_id, previous, event)?;
162 self.append_record_locked(topic, next_id, event)
163 }
164
165 fn append_record_locked(
166 &self,
167 topic: &Topic,
168 event_id: EventId,
169 event: LogEvent,
170 ) -> Result<AppendOutcome, LogError> {
171 let record = FileRecord {
172 id: event_id,
173 event: event.clone(),
174 };
175 let path = self.topic_path(topic);
176 if let Some(parent) = path.parent() {
177 std::fs::create_dir_all(parent)
178 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
179 }
180 let line = serde_json::to_string(&record)
181 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
182 use std::io::Write as _;
183 let mut file = std::fs::OpenOptions::new()
184 .create(true)
185 .append(true)
186 .open(&path)
187 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
188 writeln!(file, "{line}")
189 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
190 self.latest_ids
191 .lock()
192 .expect("file event log latest ids poisoned")
193 .insert(topic.as_str().to_string(), event_id);
194 self.broadcasts
195 .publish(topic, self.queue_depth, (event_id, event.clone()));
196 Ok(AppendOutcome {
197 event_id,
198 event,
199 inserted: true,
200 })
201 }
202}
203
204fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
205 let file = std::fs::File::open(path)
206 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
207 let mut reader = std::io::BufReader::new(file);
208 let mut records = Vec::new();
209 let mut line = Vec::new();
210 loop {
211 line.clear();
212 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
213 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
214 if bytes_read == 0 {
215 break;
216 }
217 if line.iter().all(u8::is_ascii_whitespace) {
218 continue;
219 }
220 let complete_line = line.ends_with(b"\n");
221 match serde_json::from_slice::<FileRecord>(&line) {
222 Ok(record) => records.push(record),
223 Err(_) if !complete_line => break,
224 Err(error) => {
225 return Err(LogError::Serde(format!("event log parse error: {error}")));
226 }
227 }
228 }
229 Ok(records)
230}
231
232impl EventLog for FileEventLog {
233 fn describe(&self) -> EventLogDescription {
234 EventLogDescription {
235 backend: EventLogBackendKind::File,
236 location: Some(self.root.clone()),
237 size_bytes: Some(dir_size_bytes(&self.root)),
238 queue_depth: self.queue_depth,
239 }
240 }
241
242 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
243 let _guard = self
244 .write_lock
245 .lock()
246 .expect("file event log write lock poisoned");
247 let next_id = self.latest_id_for_topic(topic)? + 1;
248 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
249 let previous = existing_events
250 .last()
251 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
252 let event = prepare_event_after(topic, next_id, previous, event)?;
253 self.append_record_locked(topic, next_id, event)
254 .map(|outcome| outcome.event_id)
255 }
256
257 async fn flush(&self) -> Result<(), LogError> {
258 sync_tree(&self.root)
259 }
260
261 async fn read_range(
262 &self,
263 topic: &Topic,
264 from: Option<EventId>,
265 limit: usize,
266 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
267 self.read_range_sync(topic, from, limit)
268 }
269
270 async fn read_range_bytes(
271 &self,
272 topic: &Topic,
273 from: Option<EventId>,
274 limit: usize,
275 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
276 self.read_range_sync(topic, from, limit)?
277 .into_iter()
278 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
279 .collect()
280 }
281
282 async fn subscribe(
283 self: Arc<Self>,
284 topic: &Topic,
285 from: Option<EventId>,
286 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
287 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
288 let history = self.read_range_sync(topic, from, usize::MAX)?;
289 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
290 }
291
292 async fn ack(
293 &self,
294 topic: &Topic,
295 consumer: &ConsumerId,
296 up_to: EventId,
297 ) -> Result<(), LogError> {
298 let path = self.consumer_path(topic, consumer);
299 let payload = serde_json::json!({
300 "topic": topic.as_str(),
301 "consumer_id": consumer.as_str(),
302 "cursor": up_to,
303 "updated_at_ms": now_ms(),
304 });
305 write_json_atomically(&path, &payload)
306 }
307
308 async fn consumer_cursor(
309 &self,
310 topic: &Topic,
311 consumer: &ConsumerId,
312 ) -> Result<Option<EventId>, LogError> {
313 let path = self.consumer_path(topic, consumer);
314 if !path.is_file() {
315 return Ok(None);
316 }
317 let raw = std::fs::read_to_string(&path)
318 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
319 let payload: serde_json::Value = serde_json::from_str(&raw)
320 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
321 let cursor = payload
322 .get("cursor")
323 .and_then(serde_json::Value::as_u64)
324 .ok_or_else(|| {
325 LogError::Serde("event log consumer record missing numeric cursor".to_string())
326 })?;
327 Ok(Some(cursor))
328 }
329
330 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
331 let latest = self.latest_id_for_topic(topic)?;
332 if latest == 0 {
333 Ok(None)
334 } else {
335 Ok(Some(latest))
336 }
337 }
338
339 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
340 let _guard = self
341 .write_lock
342 .lock()
343 .expect("file event log write lock poisoned");
344 let path = self.topic_path(topic);
345 if !path.is_file() {
346 return Ok(CompactReport::default());
347 }
348 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
349 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
350 if retained.is_empty() {
351 let _ = std::fs::remove_file(&path);
352 } else {
353 crate::atomic_io::atomic_write_with(&path, |writer| {
354 use std::io::Write as _;
355 for (event_id, event) in &retained {
356 let line = serde_json::to_string(&FileRecord {
357 id: *event_id,
358 event: event.clone(),
359 })
360 .map_err(|error| std::io::Error::other(error.to_string()))?;
361 writeln!(writer, "{line}")?;
362 }
363 Ok(())
364 })
365 .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
366 }
367 let latest = retained.last().map(|(event_id, _)| *event_id);
368 self.latest_ids
369 .lock()
370 .expect("file event log latest ids poisoned")
371 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
372 Ok(CompactReport {
373 removed,
374 remaining: retained.len(),
375 latest,
376 checkpointed: false,
377 })
378 }
379}