Skip to main content

atomcode_telemetry/queue/
mod.rs

1//! Append-only NDJSON segment queue on disk.
2
3pub mod roll;
4
5use crate::event::Record;
6use anyhow::{Context, Result};
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufWriter, ErrorKind, Write};
9use std::path::{Path, PathBuf};
10use uuid::Uuid;
11
12const READY_EXT: &str = "ndjson";
13const PARTIAL_EXT: &str = "partial";
14const SENDING_MARKER: &str = ".sending-";
15
16pub struct Queue {
17    dir: PathBuf,
18    current: Option<Segment>,
19    /// Cumulative dropped count (in-memory or on-disk FIFO eviction).
20    pub dropped: u64,
21}
22
23pub struct Segment {
24    pub path: PathBuf,
25    ready_path: PathBuf,
26    writer: BufWriter<File>,
27    events: u32,
28    bytes: u64,
29}
30
31impl Segment {
32    fn new(path: PathBuf, ready_path: PathBuf) -> Result<Self> {
33        let f = OpenOptions::new()
34            .create_new(true)
35            .append(true)
36            .open(&path)
37            .with_context(|| format!("creating segment {}", path.display()))?;
38        Ok(Self {
39            path,
40            ready_path,
41            writer: BufWriter::new(f),
42            events: 0,
43            bytes: 0,
44        })
45    }
46
47    fn append(&mut self, r: &Record) -> Result<()> {
48        let line = serde_json::to_string(r)?;
49        self.writer.write_all(line.as_bytes())?;
50        self.writer.write_all(b"\n")?;
51        self.events += 1;
52        self.bytes += line.len() as u64 + 1;
53        Ok(())
54    }
55
56    fn fsync(&mut self) -> Result<()> {
57        self.writer.flush()?;
58        self.writer.get_ref().sync_all()?;
59        Ok(())
60    }
61
62    fn finish(mut self) -> Result<PathBuf> {
63        self.fsync()?;
64        let partial_path = self.path.clone();
65        let ready_path = self.ready_path.clone();
66        drop(self.writer);
67        fs::rename(&partial_path, &ready_path).with_context(|| {
68            format!(
69                "rolling segment {} -> {}",
70                partial_path.display(),
71                ready_path.display()
72            )
73        })?;
74        Ok(ready_path)
75    }
76}
77
78impl Queue {
79    pub fn open(dir: PathBuf) -> Result<Self> {
80        fs::create_dir_all(&dir).with_context(|| format!("mkdir {}", dir.display()))?;
81
82        // Recover from previous crash / kill:
83        //   1. .sending-* files are claimed segments whose HTTP POST never
84        //      completed (process died mid-send).  Rename them back to
85        //      .ndjson so the new process can retry.
86        //   2. Empty .partial files are segments created but never written
87        //      to before the process exited.  Safe to delete.
88        recover_stale_files(&dir)?;
89
90        Ok(Self {
91            dir,
92            current: None,
93            dropped: 0,
94        })
95    }
96
97    /// Append and return true if a roll happened (current segment was closed).
98    pub fn append(&mut self, r: &Record) -> Result<bool> {
99        if self.current.is_none() {
100            self.start_new_segment()?;
101        }
102        let seg = self.current.as_mut().unwrap();
103        seg.append(r)?;
104
105        if roll::should_roll(seg.events, seg.bytes) {
106            self.roll()?;
107            return Ok(true);
108        }
109        Ok(false)
110    }
111
112    /// Force roll even if segment isn't full (used on tick flush).
113    /// Returns `Ok(None)` if nothing to roll.
114    pub fn force_roll(&mut self) -> Result<Option<PathBuf>> {
115        if let Some(seg) = self.current.take() {
116            if seg.events == 0 {
117                // empty: drop the file
118                let path = seg.path.clone();
119                drop(seg.writer);
120                let _ = fs::remove_file(path);
121                return Ok(None);
122            }
123            let p = seg.finish()?;
124            self.enforce_cap()?;
125            return Ok(Some(p));
126        }
127        Ok(None)
128    }
129
130    /// Closed, immutable segments ready to be sent.
131    pub fn ready_segments_sorted(&self) -> Result<Vec<PathBuf>> {
132        self.segments_with_extension(READY_EXT)
133    }
134
135    pub fn segments_sorted(&self) -> Result<Vec<PathBuf>> {
136        self.ready_segments_sorted()
137    }
138
139    pub fn claim_oldest_segment(&self) -> Result<Option<PathBuf>> {
140        for ready in self.ready_segments_sorted()? {
141            let Some(name) = ready.file_name().and_then(|s| s.to_str()) else {
142                continue;
143            };
144            let claimed = ready.with_file_name(format!(
145                "{}{}{}-{}",
146                name,
147                SENDING_MARKER,
148                std::process::id(),
149                Uuid::new_v4()
150            ));
151            match fs::rename(&ready, &claimed) {
152                Ok(()) => return Ok(Some(claimed)),
153                Err(e) if e.kind() == ErrorKind::NotFound => continue,
154                Err(e) => {
155                    return Err(e).with_context(|| {
156                        format!(
157                            "claiming segment {} -> {}",
158                            ready.display(),
159                            claimed.display()
160                        )
161                    });
162                }
163            }
164        }
165        Ok(None)
166    }
167
168    pub fn complete_claim(&self, path: &Path) -> Result<()> {
169        self.delete(path)
170    }
171
172    pub fn restore_claim(&self, path: &Path) -> Result<Option<PathBuf>> {
173        if !path.exists() {
174            return Ok(None);
175        }
176        let Some(ready) = ready_path_for_claim(path) else {
177            return Ok(None);
178        };
179        fs::rename(path, &ready)
180            .with_context(|| format!("restoring claimed segment {}", path.display()))?;
181        Ok(Some(ready))
182    }
183
184    fn segments_with_extension(&self, ext: &str) -> Result<Vec<PathBuf>> {
185        let mut v: Vec<PathBuf> = fs::read_dir(&self.dir)?
186            .filter_map(|e| e.ok().map(|e| e.path()))
187            .filter(|p| p.extension().and_then(|s| s.to_str()) == Some(ext))
188            .collect();
189        v.sort();
190        Ok(v)
191    }
192
193    pub fn delete(&self, path: &Path) -> Result<()> {
194        fs::remove_file(path)?;
195        Ok(())
196    }
197
198    pub fn stats(&self) -> Result<QueueStats> {
199        let segs = self.segments_sorted()?;
200        let mut total_bytes = 0u64;
201        let mut total_events = 0u64;
202        for p in &segs {
203            let meta = fs::metadata(p)?;
204            total_bytes += meta.len();
205            // Line-count approx via file size / avg_line is avoided here for accuracy:
206            let contents = fs::read_to_string(p).unwrap_or_default();
207            total_events += contents.lines().filter(|l| !l.is_empty()).count() as u64;
208        }
209        Ok(QueueStats {
210            segment_count: segs.len(),
211            total_bytes,
212            total_events,
213            oldest: segs.first().cloned(),
214        })
215    }
216
217    fn start_new_segment(&mut self) -> Result<()> {
218        let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
219        let id = Uuid::new_v4();
220        let path = self.dir.join(format!("{}-{}.{}", ts, id, PARTIAL_EXT));
221        let ready_path = self.dir.join(format!("{}-{}.{}", ts, id, READY_EXT));
222        self.current = Some(Segment::new(path, ready_path)?);
223        Ok(())
224    }
225
226    fn roll(&mut self) -> Result<()> {
227        self.force_roll()?;
228        Ok(())
229    }
230
231    /// Delete oldest segments if over cap; bumps `dropped` by lines evicted.
232    fn enforce_cap(&mut self) -> Result<()> {
233        loop {
234            let segs = self.segments_sorted()?;
235            let total_bytes: u64 = segs
236                .iter()
237                .filter_map(|p| fs::metadata(p).ok().map(|m| m.len()))
238                .sum();
239            if !roll::over_cap(segs.len(), total_bytes) {
240                break;
241            }
242            if let Some(oldest) = segs.first() {
243                // Count lines before delete to update dropped.
244                if let Ok(contents) = fs::read_to_string(oldest) {
245                    self.dropped += contents.lines().filter(|l| !l.is_empty()).count() as u64;
246                }
247                fs::remove_file(oldest)?;
248            } else {
249                break;
250            }
251        }
252        Ok(())
253    }
254}
255
256#[derive(Debug, Clone)]
257pub struct QueueStats {
258    pub segment_count: usize,
259    pub total_bytes: u64,
260    pub total_events: u64,
261    pub oldest: Option<PathBuf>,
262}
263
264fn ready_path_for_claim(path: &Path) -> Option<PathBuf> {
265    let name = path.file_name()?.to_str()?;
266    let marker_start = name.rfind(SENDING_MARKER)?;
267    let ready_name = &name[..marker_start];
268    Some(path.with_file_name(ready_name))
269}
270
271/// Scan the queue directory for stale artifacts left by a previous process
272/// that exited before completing its send or cleanup:
273///
274/// - `.sending-*` files → rename back to `.ndjson` so they can be re-sent.
275/// - Empty `.partial` files → delete (they contain no events).
276fn recover_stale_files(dir: &Path) -> Result<()> {
277    let entries: Vec<PathBuf> = fs::read_dir(dir)
278        .with_context(|| format!("reading queue dir {}", dir.display()))?
279        .filter_map(|e| e.ok().map(|e| e.path()))
280        .collect();
281
282    for path in entries {
283        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
284            continue;
285        };
286
287        // Recover .sending-* files: these were claimed for HTTP POST but the
288        // process died before the request completed or restore_claim ran.
289        // Rename back to the original .ndjson so the sender retries them.
290        if let Some(marker_start) = name.find(SENDING_MARKER) {
291            let ready_name = &name[..marker_start];
292            let ready_path = path.with_file_name(ready_name);
293            match fs::rename(&path, &ready_path) {
294                Ok(()) => {
295                    tracing::info!(
296                        "recovered stale .sending segment -> {}",
297                        ready_path.display()
298                    );
299                }
300                Err(e) => {
301                    tracing::warn!(
302                        ?e,
303                        "failed to recover stale .sending segment {}",
304                        path.display()
305                    );
306                }
307            }
308            continue;
309        }
310
311        // Clean up empty .partial files: these were created but never
312        // received any events before the process exited.
313        if name.ends_with(PARTIAL_EXT) {
314            if let Ok(meta) = fs::metadata(&path) {
315                if meta.len() == 0 {
316                    match fs::remove_file(&path) {
317                        Ok(()) => {
318                            tracing::info!("removed empty .partial segment {}", path.display());
319                        }
320                        Err(e) => {
321                            tracing::warn!(
322                                ?e,
323                                "failed to remove empty .partial segment {}",
324                                path.display()
325                            );
326                        }
327                    }
328                }
329            }
330        }
331    }
332
333    Ok(())
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::event::*;
340    use tempfile::TempDir;
341
342    fn rec() -> Record {
343        Record {
344            envelope: Envelope {
345                device_id: Uuid::nil(),
346                launch_id: Uuid::nil(),
347                account_id: None,
348                session_id: Uuid::nil(),
349                turn_id: None,
350                ts: 0,
351                schema_version: 1,
352                app_version: "x".into(),
353                os: "linux".into(),
354                arch: "x86_64".into(),
355                locale: "en".into(),
356                provider: None,
357                provider_host: None,
358                model: None,
359                repo_origin: None,
360                mode: None,
361            },
362            event: Event::OpenAtomcode,
363        }
364    }
365
366    #[test]
367    fn append_rolls_after_500() {
368        let d = TempDir::new().unwrap();
369        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
370        for _ in 0..499 {
371            assert!(!q.append(&rec()).unwrap());
372        }
373        assert!(q.append(&rec()).unwrap(), "500th append should roll");
374        let segs = q.segments_sorted().unwrap();
375        assert_eq!(segs.len(), 1);
376    }
377
378    #[test]
379    fn active_partial_segment_is_not_ready() {
380        let d = TempDir::new().unwrap();
381        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
382        q.append(&rec()).unwrap();
383
384        assert!(
385            q.ready_segments_sorted().unwrap().is_empty(),
386            "active .partial segment must not be visible to senders"
387        );
388        let partials: Vec<_> = fs::read_dir(d.path())
389            .unwrap()
390            .filter_map(|e| e.ok().map(|e| e.path()))
391            .filter(|p| p.extension().and_then(|s| s.to_str()) == Some(PARTIAL_EXT))
392            .collect();
393        assert_eq!(partials.len(), 1);
394    }
395
396    #[test]
397    fn force_roll_empty_is_noop() {
398        let d = TempDir::new().unwrap();
399        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
400        assert!(q.force_roll().unwrap().is_none());
401    }
402
403    #[test]
404    fn force_roll_closes_current_and_deletes_empty() {
405        let d = TempDir::new().unwrap();
406        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
407        q.append(&rec()).unwrap();
408        let p = q.force_roll().unwrap().unwrap();
409        assert!(p.exists(), "rolled segment should remain");
410        assert_eq!(p.extension().and_then(|s| s.to_str()), Some(READY_EXT));
411        let c = fs::read_to_string(&p).unwrap();
412        assert!(
413            c.contains(r#""event_id":"open_atomcode""#),
414            "rolled segment should contain appended event"
415        );
416        assert!(
417            q.force_roll().unwrap().is_none(),
418            "no current segment after roll"
419        );
420    }
421
422    #[test]
423    fn claim_oldest_segment_is_exclusive() {
424        let d = TempDir::new().unwrap();
425        let mut q1 = Queue::open(d.path().to_path_buf()).unwrap();
426        q1.append(&rec()).unwrap();
427        q1.force_roll().unwrap();
428
429        let q2 = Queue::open(d.path().to_path_buf()).unwrap();
430        let claimed = q1.claim_oldest_segment().unwrap();
431        assert!(claimed.is_some(), "first claimant should get the segment");
432        assert!(
433            q2.claim_oldest_segment().unwrap().is_none(),
434            "second claimant should not see the claimed segment"
435        );
436    }
437
438    #[test]
439    fn restore_claim_makes_segment_ready_again() {
440        let d = TempDir::new().unwrap();
441        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
442        q.append(&rec()).unwrap();
443        q.force_roll().unwrap();
444
445        let claimed = q.claim_oldest_segment().unwrap().unwrap();
446        assert!(q.ready_segments_sorted().unwrap().is_empty());
447
448        let restored = q.restore_claim(&claimed).unwrap().unwrap();
449        assert!(restored.exists());
450        assert_eq!(q.ready_segments_sorted().unwrap(), vec![restored]);
451    }
452
453    #[test]
454    fn open_recovers_stale_sending_files() {
455        let d = TempDir::new().unwrap();
456
457        // Simulate a previous process: create a .ndjson, then "claim" it
458        // by renaming to .sending-* (as if HTTP POST was in-flight when
459        // the process crashed).
460        let mut q = Queue::open(d.path().to_path_buf()).unwrap();
461        q.append(&rec()).unwrap();
462        let rolled = q.force_roll().unwrap().unwrap();
463        let claimed = rolled.with_file_name(format!(
464            "{}{}12345-abcdef",
465            rolled.file_name().unwrap().to_str().unwrap(),
466            SENDING_MARKER
467        ));
468        fs::rename(&rolled, &claimed).unwrap();
469
470        // The .sending file should not appear in ready_segments.
471        assert!(q.ready_segments_sorted().unwrap().is_empty());
472
473        // Re-open the queue — recover_stale_files should restore it.
474        let q2 = Queue::open(d.path().to_path_buf()).unwrap();
475        let ready = q2.ready_segments_sorted().unwrap();
476        assert_eq!(ready.len(), 1, "stale .sending file should be recovered as .ndjson");
477        assert!(
478            !claimed.exists(),
479            "original .sending file should have been renamed away"
480        );
481
482        // The recovered file should contain the original event.
483        let contents = fs::read_to_string(&ready[0]).unwrap();
484        assert!(
485            contents.contains(r#""event_id":"open_atomcode""#),
486            "recovered segment should contain original event data"
487        );
488    }
489
490    #[test]
491    fn open_removes_empty_partial_files() {
492        let d = TempDir::new().unwrap();
493
494        // Simulate stale empty .partial files left by a previous crash.
495        let empty_partial = d.path().join("20260512-000000-deadbeef.partial");
496        fs::File::create(&empty_partial).unwrap();
497        assert_eq!(fs::metadata(&empty_partial).unwrap().len(), 0);
498
499        // Also create a non-empty .partial that should NOT be deleted.
500        let nonempty_partial = d.path().join("20260512-000001-alivecafe.partial");
501        fs::write(&nonempty_partial, b"some data\n").unwrap();
502
503        let _q = Queue::open(d.path().to_path_buf()).unwrap();
504
505        assert!(
506            !empty_partial.exists(),
507            "empty .partial file should be removed on Queue::open"
508        );
509        assert!(
510            nonempty_partial.exists(),
511            "non-empty .partial file should be kept"
512        );
513    }
514}