Skip to main content

apiari_common/
ipc.rs

1//! Generic JSONL read/write with cursor-based polling.
2//!
3//! Provides [`JsonlReader`] and [`JsonlWriter`] for line-delimited JSON files.
4//! The reader tracks a byte offset so that each call to [`JsonlReader::poll`]
5//! only returns newly appended records since the last read.
6
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use std::fs::{self, OpenOptions};
10use std::io::{self, BufRead, BufReader, Seek, SeekFrom, Write};
11use std::marker::PhantomData;
12use std::path::{Path, PathBuf};
13
14/// Reads JSONL records from a file, tracking the byte offset so that
15/// each poll only returns lines appended since the previous read.
16///
17/// Generic over any `T: DeserializeOwned`.
18#[derive(Debug)]
19pub struct JsonlReader<T> {
20    path: PathBuf,
21    offset: u64,
22    _marker: PhantomData<T>,
23}
24
25impl<T: DeserializeOwned> JsonlReader<T> {
26    /// Create a new reader for the given path, starting at byte offset 0.
27    pub fn new(path: impl Into<PathBuf>) -> Self {
28        Self {
29            path: path.into(),
30            offset: 0,
31            _marker: PhantomData,
32        }
33    }
34
35    /// Create a new reader starting at the given byte offset.
36    ///
37    /// Useful when restoring from persisted state — you can resume reading
38    /// from where you left off without replaying old messages.
39    pub fn with_offset(path: impl Into<PathBuf>, offset: u64) -> Self {
40        Self {
41            path: path.into(),
42            offset,
43            _marker: PhantomData,
44        }
45    }
46
47    /// Return the current byte offset.
48    pub fn offset(&self) -> u64 {
49        self.offset
50    }
51
52    /// Set the byte offset (e.g. when restoring from persisted state).
53    pub fn set_offset(&mut self, offset: u64) {
54        self.offset = offset;
55    }
56
57    /// Skip to the end of the file so that subsequent polls only see new data.
58    ///
59    /// Returns the new offset, or 0 if the file does not exist.
60    pub fn skip_to_end(&mut self) -> io::Result<u64> {
61        match fs::metadata(&self.path) {
62            Ok(meta) => {
63                self.offset = meta.len();
64                Ok(self.offset)
65            }
66            Err(e) if e.kind() == io::ErrorKind::NotFound => {
67                self.offset = 0;
68                Ok(0)
69            }
70            Err(e) => Err(e),
71        }
72    }
73
74    /// Read any new lines appended since the last poll.
75    ///
76    /// Returns a vector of successfully deserialized records. Malformed lines
77    /// are silently skipped (the offset still advances past them).
78    pub fn poll(&mut self) -> io::Result<Vec<T>> {
79        if !self.path.exists() {
80            return Ok(Vec::new());
81        }
82
83        let file = fs::File::open(&self.path)?;
84        let file_len = file.metadata()?.len();
85
86        if file_len <= self.offset {
87            return Ok(Vec::new());
88        }
89
90        let mut reader = BufReader::new(file);
91        reader.seek(SeekFrom::Start(self.offset))?;
92
93        let mut records = Vec::new();
94        let mut line = String::new();
95
96        loop {
97            line.clear();
98            let bytes_read = reader.read_line(&mut line)?;
99            if bytes_read == 0 {
100                break;
101            }
102            self.offset += bytes_read as u64;
103
104            let trimmed = line.trim();
105            if trimmed.is_empty() {
106                continue;
107            }
108
109            if let Ok(record) = serde_json::from_str::<T>(trimmed) {
110                records.push(record);
111            }
112            // Malformed lines are silently skipped.
113        }
114
115        Ok(records)
116    }
117}
118
119/// Appends JSONL records to a file, creating parent directories as needed.
120///
121/// Generic over any `T: Serialize`.
122#[derive(Debug)]
123pub struct JsonlWriter<T> {
124    path: PathBuf,
125    _marker: PhantomData<T>,
126}
127
128impl<T: Serialize> JsonlWriter<T> {
129    /// Create a new writer for the given path.
130    pub fn new(path: impl Into<PathBuf>) -> Self {
131        Self {
132            path: path.into(),
133            _marker: PhantomData,
134        }
135    }
136
137    /// Return the file path.
138    pub fn path(&self) -> &Path {
139        &self.path
140    }
141
142    /// Append a single record as a JSON line.
143    ///
144    /// Creates parent directories and the file itself if they don't exist.
145    pub fn append(&self, record: &T) -> io::Result<()> {
146        if let Some(parent) = self.path.parent() {
147            fs::create_dir_all(parent)?;
148        }
149
150        let mut file = OpenOptions::new()
151            .create(true)
152            .append(true)
153            .open(&self.path)?;
154
155        let json = serde_json::to_string(record)
156            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
157        writeln!(file, "{}", json)?;
158        Ok(())
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use serde::{Deserialize, Serialize};
166
167    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
168    struct TestMsg {
169        id: u32,
170        text: String,
171    }
172
173    #[test]
174    fn test_write_and_read() {
175        let dir = std::env::temp_dir().join("apiari-ipc-test-write-read");
176        let _ = fs::remove_dir_all(&dir);
177        fs::create_dir_all(&dir).unwrap();
178        let path = dir.join("test.jsonl");
179
180        let writer = JsonlWriter::<TestMsg>::new(&path);
181        let mut reader = JsonlReader::<TestMsg>::new(&path);
182
183        // Write two records
184        writer
185            .append(&TestMsg {
186                id: 1,
187                text: "hello".into(),
188            })
189            .unwrap();
190        writer
191            .append(&TestMsg {
192                id: 2,
193                text: "world".into(),
194            })
195            .unwrap();
196
197        // Poll should return both
198        let records = reader.poll().unwrap();
199        assert_eq!(records.len(), 2);
200        assert_eq!(records[0].id, 1);
201        assert_eq!(records[1].id, 2);
202
203        // Poll again with no new data
204        let records = reader.poll().unwrap();
205        assert!(records.is_empty());
206
207        // Write a third record
208        writer
209            .append(&TestMsg {
210                id: 3,
211                text: "!".into(),
212            })
213            .unwrap();
214
215        // Poll should return only the new record
216        let records = reader.poll().unwrap();
217        assert_eq!(records.len(), 1);
218        assert_eq!(records[0].id, 3);
219
220        let _ = fs::remove_dir_all(&dir);
221    }
222
223    #[test]
224    fn test_reader_nonexistent_file() {
225        let path = std::env::temp_dir().join("apiari-ipc-test-nonexistent.jsonl");
226        let _ = fs::remove_file(&path);
227
228        let mut reader = JsonlReader::<TestMsg>::new(&path);
229        let records = reader.poll().unwrap();
230        assert!(records.is_empty());
231    }
232
233    #[test]
234    fn test_skip_to_end() {
235        let dir = std::env::temp_dir().join("apiari-ipc-test-skip");
236        let _ = fs::remove_dir_all(&dir);
237        fs::create_dir_all(&dir).unwrap();
238        let path = dir.join("test.jsonl");
239
240        let writer = JsonlWriter::<TestMsg>::new(&path);
241        writer
242            .append(&TestMsg {
243                id: 1,
244                text: "old".into(),
245            })
246            .unwrap();
247
248        let mut reader = JsonlReader::<TestMsg>::new(&path);
249        reader.skip_to_end().unwrap();
250
251        // Should not see the old record
252        let records = reader.poll().unwrap();
253        assert!(records.is_empty());
254
255        // New record should be visible
256        writer
257            .append(&TestMsg {
258                id: 2,
259                text: "new".into(),
260            })
261            .unwrap();
262        let records = reader.poll().unwrap();
263        assert_eq!(records.len(), 1);
264        assert_eq!(records[0].id, 2);
265
266        let _ = fs::remove_dir_all(&dir);
267    }
268
269    #[test]
270    fn test_malformed_lines_skipped() {
271        let dir = std::env::temp_dir().join("apiari-ipc-test-malformed");
272        let _ = fs::remove_dir_all(&dir);
273        fs::create_dir_all(&dir).unwrap();
274        let path = dir.join("test.jsonl");
275
276        // Write a valid record, a malformed line, and another valid record
277        let mut file = OpenOptions::new()
278            .create(true)
279            .append(true)
280            .open(&path)
281            .unwrap();
282        writeln!(file, r#"{{"id":1,"text":"good"}}"#).unwrap();
283        writeln!(file, "not valid json").unwrap();
284        writeln!(file, r#"{{"id":2,"text":"also good"}}"#).unwrap();
285
286        let mut reader = JsonlReader::<TestMsg>::new(&path);
287        let records = reader.poll().unwrap();
288        assert_eq!(records.len(), 2);
289        assert_eq!(records[0].id, 1);
290        assert_eq!(records[1].id, 2);
291
292        let _ = fs::remove_dir_all(&dir);
293    }
294
295    #[test]
296    fn test_with_offset() {
297        let dir = std::env::temp_dir().join("apiari-ipc-test-with-offset");
298        let _ = fs::remove_dir_all(&dir);
299        fs::create_dir_all(&dir).unwrap();
300        let path = dir.join("test.jsonl");
301
302        let writer = JsonlWriter::<TestMsg>::new(&path);
303        writer
304            .append(&TestMsg {
305                id: 1,
306                text: "first".into(),
307            })
308            .unwrap();
309
310        // Read first to get the offset
311        let mut reader = JsonlReader::<TestMsg>::new(&path);
312        let _ = reader.poll().unwrap();
313        let saved_offset = reader.offset();
314
315        // Write another record
316        writer
317            .append(&TestMsg {
318                id: 2,
319                text: "second".into(),
320            })
321            .unwrap();
322
323        // Create a new reader from the saved offset
324        let mut reader2 = JsonlReader::<TestMsg>::with_offset(&path, saved_offset);
325        let records = reader2.poll().unwrap();
326        assert_eq!(records.len(), 1);
327        assert_eq!(records[0].id, 2);
328
329        let _ = fs::remove_dir_all(&dir);
330    }
331}