1use serde::Serialize;
8use serde::de::DeserializeOwned;
9use std::fs::{self, OpenOptions};
10use std::io::{self, BufRead, BufReader, Seek, SeekFrom, Write};
11use std::marker::PhantomData;
12use std::path::{Path, PathBuf};
13
14#[derive(Debug)]
19pub struct JsonlReader<T> {
20 path: PathBuf,
21 offset: u64,
22 _marker: PhantomData<T>,
23}
24
25impl<T: DeserializeOwned> JsonlReader<T> {
26 pub fn new(path: impl Into<PathBuf>) -> Self {
28 Self {
29 path: path.into(),
30 offset: 0,
31 _marker: PhantomData,
32 }
33 }
34
35 pub fn with_offset(path: impl Into<PathBuf>, offset: u64) -> Self {
40 Self {
41 path: path.into(),
42 offset,
43 _marker: PhantomData,
44 }
45 }
46
47 pub fn offset(&self) -> u64 {
49 self.offset
50 }
51
52 pub fn set_offset(&mut self, offset: u64) {
54 self.offset = offset;
55 }
56
57 pub fn skip_to_end(&mut self) -> io::Result<u64> {
61 match fs::metadata(&self.path) {
62 Ok(meta) => {
63 self.offset = meta.len();
64 Ok(self.offset)
65 }
66 Err(e) if e.kind() == io::ErrorKind::NotFound => {
67 self.offset = 0;
68 Ok(0)
69 }
70 Err(e) => Err(e),
71 }
72 }
73
74 pub fn poll(&mut self) -> io::Result<Vec<T>> {
79 if !self.path.exists() {
80 return Ok(Vec::new());
81 }
82
83 let file = fs::File::open(&self.path)?;
84 let file_len = file.metadata()?.len();
85
86 if file_len <= self.offset {
87 return Ok(Vec::new());
88 }
89
90 let mut reader = BufReader::new(file);
91 reader.seek(SeekFrom::Start(self.offset))?;
92
93 let mut records = Vec::new();
94 let mut line = String::new();
95
96 loop {
97 line.clear();
98 let bytes_read = reader.read_line(&mut line)?;
99 if bytes_read == 0 {
100 break;
101 }
102 self.offset += bytes_read as u64;
103
104 let trimmed = line.trim();
105 if trimmed.is_empty() {
106 continue;
107 }
108
109 if let Ok(record) = serde_json::from_str::<T>(trimmed) {
110 records.push(record);
111 }
112 }
114
115 Ok(records)
116 }
117}
118
119#[derive(Debug)]
123pub struct JsonlWriter<T> {
124 path: PathBuf,
125 _marker: PhantomData<T>,
126}
127
128impl<T: Serialize> JsonlWriter<T> {
129 pub fn new(path: impl Into<PathBuf>) -> Self {
131 Self {
132 path: path.into(),
133 _marker: PhantomData,
134 }
135 }
136
137 pub fn path(&self) -> &Path {
139 &self.path
140 }
141
142 pub fn append(&self, record: &T) -> io::Result<()> {
146 if let Some(parent) = self.path.parent() {
147 fs::create_dir_all(parent)?;
148 }
149
150 let mut file = OpenOptions::new()
151 .create(true)
152 .append(true)
153 .open(&self.path)?;
154
155 let json = serde_json::to_string(record)
156 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
157 writeln!(file, "{}", json)?;
158 Ok(())
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use serde::{Deserialize, Serialize};
166
167 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
168 struct TestMsg {
169 id: u32,
170 text: String,
171 }
172
173 #[test]
174 fn test_write_and_read() {
175 let dir = std::env::temp_dir().join("apiari-ipc-test-write-read");
176 let _ = fs::remove_dir_all(&dir);
177 fs::create_dir_all(&dir).unwrap();
178 let path = dir.join("test.jsonl");
179
180 let writer = JsonlWriter::<TestMsg>::new(&path);
181 let mut reader = JsonlReader::<TestMsg>::new(&path);
182
183 writer
185 .append(&TestMsg {
186 id: 1,
187 text: "hello".into(),
188 })
189 .unwrap();
190 writer
191 .append(&TestMsg {
192 id: 2,
193 text: "world".into(),
194 })
195 .unwrap();
196
197 let records = reader.poll().unwrap();
199 assert_eq!(records.len(), 2);
200 assert_eq!(records[0].id, 1);
201 assert_eq!(records[1].id, 2);
202
203 let records = reader.poll().unwrap();
205 assert!(records.is_empty());
206
207 writer
209 .append(&TestMsg {
210 id: 3,
211 text: "!".into(),
212 })
213 .unwrap();
214
215 let records = reader.poll().unwrap();
217 assert_eq!(records.len(), 1);
218 assert_eq!(records[0].id, 3);
219
220 let _ = fs::remove_dir_all(&dir);
221 }
222
223 #[test]
224 fn test_reader_nonexistent_file() {
225 let path = std::env::temp_dir().join("apiari-ipc-test-nonexistent.jsonl");
226 let _ = fs::remove_file(&path);
227
228 let mut reader = JsonlReader::<TestMsg>::new(&path);
229 let records = reader.poll().unwrap();
230 assert!(records.is_empty());
231 }
232
233 #[test]
234 fn test_skip_to_end() {
235 let dir = std::env::temp_dir().join("apiari-ipc-test-skip");
236 let _ = fs::remove_dir_all(&dir);
237 fs::create_dir_all(&dir).unwrap();
238 let path = dir.join("test.jsonl");
239
240 let writer = JsonlWriter::<TestMsg>::new(&path);
241 writer
242 .append(&TestMsg {
243 id: 1,
244 text: "old".into(),
245 })
246 .unwrap();
247
248 let mut reader = JsonlReader::<TestMsg>::new(&path);
249 reader.skip_to_end().unwrap();
250
251 let records = reader.poll().unwrap();
253 assert!(records.is_empty());
254
255 writer
257 .append(&TestMsg {
258 id: 2,
259 text: "new".into(),
260 })
261 .unwrap();
262 let records = reader.poll().unwrap();
263 assert_eq!(records.len(), 1);
264 assert_eq!(records[0].id, 2);
265
266 let _ = fs::remove_dir_all(&dir);
267 }
268
269 #[test]
270 fn test_malformed_lines_skipped() {
271 let dir = std::env::temp_dir().join("apiari-ipc-test-malformed");
272 let _ = fs::remove_dir_all(&dir);
273 fs::create_dir_all(&dir).unwrap();
274 let path = dir.join("test.jsonl");
275
276 let mut file = OpenOptions::new()
278 .create(true)
279 .append(true)
280 .open(&path)
281 .unwrap();
282 writeln!(file, r#"{{"id":1,"text":"good"}}"#).unwrap();
283 writeln!(file, "not valid json").unwrap();
284 writeln!(file, r#"{{"id":2,"text":"also good"}}"#).unwrap();
285
286 let mut reader = JsonlReader::<TestMsg>::new(&path);
287 let records = reader.poll().unwrap();
288 assert_eq!(records.len(), 2);
289 assert_eq!(records[0].id, 1);
290 assert_eq!(records[1].id, 2);
291
292 let _ = fs::remove_dir_all(&dir);
293 }
294
295 #[test]
296 fn test_with_offset() {
297 let dir = std::env::temp_dir().join("apiari-ipc-test-with-offset");
298 let _ = fs::remove_dir_all(&dir);
299 fs::create_dir_all(&dir).unwrap();
300 let path = dir.join("test.jsonl");
301
302 let writer = JsonlWriter::<TestMsg>::new(&path);
303 writer
304 .append(&TestMsg {
305 id: 1,
306 text: "first".into(),
307 })
308 .unwrap();
309
310 let mut reader = JsonlReader::<TestMsg>::new(&path);
312 let _ = reader.poll().unwrap();
313 let saved_offset = reader.offset();
314
315 writer
317 .append(&TestMsg {
318 id: 2,
319 text: "second".into(),
320 })
321 .unwrap();
322
323 let mut reader2 = JsonlReader::<TestMsg>::with_offset(&path, saved_offset);
325 let records = reader2.poll().unwrap();
326 assert_eq!(records.len(), 1);
327 assert_eq!(records[0].id, 2);
328
329 let _ = fs::remove_dir_all(&dir);
330 }
331}