atomcode_telemetry/queue/
mod.rs1pub mod roll;
4
5use crate::event::Record;
6use anyhow::{Context, Result};
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufWriter, ErrorKind, Write};
9use std::path::{Path, PathBuf};
10use uuid::Uuid;
11
12const READY_EXT: &str = "ndjson";
13const PARTIAL_EXT: &str = "partial";
14const SENDING_MARKER: &str = ".sending-";
15
16pub struct Queue {
17 dir: PathBuf,
18 current: Option<Segment>,
19 pub dropped: u64,
21}
22
23pub struct Segment {
24 pub path: PathBuf,
25 ready_path: PathBuf,
26 writer: BufWriter<File>,
27 events: u32,
28 bytes: u64,
29}
30
31impl Segment {
32 fn new(path: PathBuf, ready_path: PathBuf) -> Result<Self> {
33 let f = OpenOptions::new()
34 .create_new(true)
35 .append(true)
36 .open(&path)
37 .with_context(|| format!("creating segment {}", path.display()))?;
38 Ok(Self {
39 path,
40 ready_path,
41 writer: BufWriter::new(f),
42 events: 0,
43 bytes: 0,
44 })
45 }
46
47 fn append(&mut self, r: &Record) -> Result<()> {
48 let line = serde_json::to_string(r)?;
49 self.writer.write_all(line.as_bytes())?;
50 self.writer.write_all(b"\n")?;
51 self.events += 1;
52 self.bytes += line.len() as u64 + 1;
53 Ok(())
54 }
55
56 fn fsync(&mut self) -> Result<()> {
57 self.writer.flush()?;
58 self.writer.get_ref().sync_all()?;
59 Ok(())
60 }
61
62 fn finish(mut self) -> Result<PathBuf> {
63 self.fsync()?;
64 let partial_path = self.path.clone();
65 let ready_path = self.ready_path.clone();
66 drop(self.writer);
67 fs::rename(&partial_path, &ready_path).with_context(|| {
68 format!(
69 "rolling segment {} -> {}",
70 partial_path.display(),
71 ready_path.display()
72 )
73 })?;
74 Ok(ready_path)
75 }
76}
77
78impl Queue {
79 pub fn open(dir: PathBuf) -> Result<Self> {
80 fs::create_dir_all(&dir).with_context(|| format!("mkdir {}", dir.display()))?;
81
82 recover_stale_files(&dir)?;
89
90 Ok(Self {
91 dir,
92 current: None,
93 dropped: 0,
94 })
95 }
96
97 pub fn append(&mut self, r: &Record) -> Result<bool> {
99 if self.current.is_none() {
100 self.start_new_segment()?;
101 }
102 let seg = self.current.as_mut().unwrap();
103 seg.append(r)?;
104
105 if roll::should_roll(seg.events, seg.bytes) {
106 self.roll()?;
107 return Ok(true);
108 }
109 Ok(false)
110 }
111
112 pub fn force_roll(&mut self) -> Result<Option<PathBuf>> {
115 if let Some(seg) = self.current.take() {
116 if seg.events == 0 {
117 let path = seg.path.clone();
119 drop(seg.writer);
120 let _ = fs::remove_file(path);
121 return Ok(None);
122 }
123 let p = seg.finish()?;
124 self.enforce_cap()?;
125 return Ok(Some(p));
126 }
127 Ok(None)
128 }
129
130 pub fn ready_segments_sorted(&self) -> Result<Vec<PathBuf>> {
132 self.segments_with_extension(READY_EXT)
133 }
134
135 pub fn segments_sorted(&self) -> Result<Vec<PathBuf>> {
136 self.ready_segments_sorted()
137 }
138
139 pub fn claim_oldest_segment(&self) -> Result<Option<PathBuf>> {
140 for ready in self.ready_segments_sorted()? {
141 let Some(name) = ready.file_name().and_then(|s| s.to_str()) else {
142 continue;
143 };
144 let claimed = ready.with_file_name(format!(
145 "{}{}{}-{}",
146 name,
147 SENDING_MARKER,
148 std::process::id(),
149 Uuid::new_v4()
150 ));
151 match fs::rename(&ready, &claimed) {
152 Ok(()) => return Ok(Some(claimed)),
153 Err(e) if e.kind() == ErrorKind::NotFound => continue,
154 Err(e) => {
155 return Err(e).with_context(|| {
156 format!(
157 "claiming segment {} -> {}",
158 ready.display(),
159 claimed.display()
160 )
161 });
162 }
163 }
164 }
165 Ok(None)
166 }
167
168 pub fn complete_claim(&self, path: &Path) -> Result<()> {
169 self.delete(path)
170 }
171
172 pub fn restore_claim(&self, path: &Path) -> Result<Option<PathBuf>> {
173 if !path.exists() {
174 return Ok(None);
175 }
176 let Some(ready) = ready_path_for_claim(path) else {
177 return Ok(None);
178 };
179 fs::rename(path, &ready)
180 .with_context(|| format!("restoring claimed segment {}", path.display()))?;
181 Ok(Some(ready))
182 }
183
184 fn segments_with_extension(&self, ext: &str) -> Result<Vec<PathBuf>> {
185 let mut v: Vec<PathBuf> = fs::read_dir(&self.dir)?
186 .filter_map(|e| e.ok().map(|e| e.path()))
187 .filter(|p| p.extension().and_then(|s| s.to_str()) == Some(ext))
188 .collect();
189 v.sort();
190 Ok(v)
191 }
192
193 pub fn delete(&self, path: &Path) -> Result<()> {
194 fs::remove_file(path)?;
195 Ok(())
196 }
197
198 pub fn stats(&self) -> Result<QueueStats> {
199 let segs = self.segments_sorted()?;
200 let mut total_bytes = 0u64;
201 let mut total_events = 0u64;
202 for p in &segs {
203 let meta = fs::metadata(p)?;
204 total_bytes += meta.len();
205 let contents = fs::read_to_string(p).unwrap_or_default();
207 total_events += contents.lines().filter(|l| !l.is_empty()).count() as u64;
208 }
209 Ok(QueueStats {
210 segment_count: segs.len(),
211 total_bytes,
212 total_events,
213 oldest: segs.first().cloned(),
214 })
215 }
216
217 fn start_new_segment(&mut self) -> Result<()> {
218 let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
219 let id = Uuid::new_v4();
220 let path = self.dir.join(format!("{}-{}.{}", ts, id, PARTIAL_EXT));
221 let ready_path = self.dir.join(format!("{}-{}.{}", ts, id, READY_EXT));
222 self.current = Some(Segment::new(path, ready_path)?);
223 Ok(())
224 }
225
226 fn roll(&mut self) -> Result<()> {
227 self.force_roll()?;
228 Ok(())
229 }
230
231 fn enforce_cap(&mut self) -> Result<()> {
233 loop {
234 let segs = self.segments_sorted()?;
235 let total_bytes: u64 = segs
236 .iter()
237 .filter_map(|p| fs::metadata(p).ok().map(|m| m.len()))
238 .sum();
239 if !roll::over_cap(segs.len(), total_bytes) {
240 break;
241 }
242 if let Some(oldest) = segs.first() {
243 if let Ok(contents) = fs::read_to_string(oldest) {
245 self.dropped += contents.lines().filter(|l| !l.is_empty()).count() as u64;
246 }
247 fs::remove_file(oldest)?;
248 } else {
249 break;
250 }
251 }
252 Ok(())
253 }
254}
255
256#[derive(Debug, Clone)]
257pub struct QueueStats {
258 pub segment_count: usize,
259 pub total_bytes: u64,
260 pub total_events: u64,
261 pub oldest: Option<PathBuf>,
262}
263
264fn ready_path_for_claim(path: &Path) -> Option<PathBuf> {
265 let name = path.file_name()?.to_str()?;
266 let marker_start = name.rfind(SENDING_MARKER)?;
267 let ready_name = &name[..marker_start];
268 Some(path.with_file_name(ready_name))
269}
270
271fn recover_stale_files(dir: &Path) -> Result<()> {
277 let entries: Vec<PathBuf> = fs::read_dir(dir)
278 .with_context(|| format!("reading queue dir {}", dir.display()))?
279 .filter_map(|e| e.ok().map(|e| e.path()))
280 .collect();
281
282 for path in entries {
283 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
284 continue;
285 };
286
287 if let Some(marker_start) = name.find(SENDING_MARKER) {
291 let ready_name = &name[..marker_start];
292 let ready_path = path.with_file_name(ready_name);
293 match fs::rename(&path, &ready_path) {
294 Ok(()) => {
295 tracing::info!(
296 "recovered stale .sending segment -> {}",
297 ready_path.display()
298 );
299 }
300 Err(e) => {
301 tracing::warn!(
302 ?e,
303 "failed to recover stale .sending segment {}",
304 path.display()
305 );
306 }
307 }
308 continue;
309 }
310
311 if name.ends_with(PARTIAL_EXT) {
314 if let Ok(meta) = fs::metadata(&path) {
315 if meta.len() == 0 {
316 match fs::remove_file(&path) {
317 Ok(()) => {
318 tracing::info!("removed empty .partial segment {}", path.display());
319 }
320 Err(e) => {
321 tracing::warn!(
322 ?e,
323 "failed to remove empty .partial segment {}",
324 path.display()
325 );
326 }
327 }
328 }
329 }
330 }
331 }
332
333 Ok(())
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::event::*;
340 use tempfile::TempDir;
341
342 fn rec() -> Record {
343 Record {
344 envelope: Envelope {
345 device_id: Uuid::nil(),
346 launch_id: Uuid::nil(),
347 account_id: None,
348 session_id: Uuid::nil(),
349 turn_id: None,
350 ts: 0,
351 schema_version: 1,
352 app_version: "x".into(),
353 os: "linux".into(),
354 arch: "x86_64".into(),
355 locale: "en".into(),
356 provider: None,
357 provider_host: None,
358 model: None,
359 repo_origin: None,
360 mode: None,
361 },
362 event: Event::OpenAtomcode,
363 }
364 }
365
366 #[test]
367 fn append_rolls_after_500() {
368 let d = TempDir::new().unwrap();
369 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
370 for _ in 0..499 {
371 assert!(!q.append(&rec()).unwrap());
372 }
373 assert!(q.append(&rec()).unwrap(), "500th append should roll");
374 let segs = q.segments_sorted().unwrap();
375 assert_eq!(segs.len(), 1);
376 }
377
378 #[test]
379 fn active_partial_segment_is_not_ready() {
380 let d = TempDir::new().unwrap();
381 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
382 q.append(&rec()).unwrap();
383
384 assert!(
385 q.ready_segments_sorted().unwrap().is_empty(),
386 "active .partial segment must not be visible to senders"
387 );
388 let partials: Vec<_> = fs::read_dir(d.path())
389 .unwrap()
390 .filter_map(|e| e.ok().map(|e| e.path()))
391 .filter(|p| p.extension().and_then(|s| s.to_str()) == Some(PARTIAL_EXT))
392 .collect();
393 assert_eq!(partials.len(), 1);
394 }
395
396 #[test]
397 fn force_roll_empty_is_noop() {
398 let d = TempDir::new().unwrap();
399 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
400 assert!(q.force_roll().unwrap().is_none());
401 }
402
403 #[test]
404 fn force_roll_closes_current_and_deletes_empty() {
405 let d = TempDir::new().unwrap();
406 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
407 q.append(&rec()).unwrap();
408 let p = q.force_roll().unwrap().unwrap();
409 assert!(p.exists(), "rolled segment should remain");
410 assert_eq!(p.extension().and_then(|s| s.to_str()), Some(READY_EXT));
411 let c = fs::read_to_string(&p).unwrap();
412 assert!(
413 c.contains(r#""event_id":"open_atomcode""#),
414 "rolled segment should contain appended event"
415 );
416 assert!(
417 q.force_roll().unwrap().is_none(),
418 "no current segment after roll"
419 );
420 }
421
422 #[test]
423 fn claim_oldest_segment_is_exclusive() {
424 let d = TempDir::new().unwrap();
425 let mut q1 = Queue::open(d.path().to_path_buf()).unwrap();
426 q1.append(&rec()).unwrap();
427 q1.force_roll().unwrap();
428
429 let q2 = Queue::open(d.path().to_path_buf()).unwrap();
430 let claimed = q1.claim_oldest_segment().unwrap();
431 assert!(claimed.is_some(), "first claimant should get the segment");
432 assert!(
433 q2.claim_oldest_segment().unwrap().is_none(),
434 "second claimant should not see the claimed segment"
435 );
436 }
437
438 #[test]
439 fn restore_claim_makes_segment_ready_again() {
440 let d = TempDir::new().unwrap();
441 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
442 q.append(&rec()).unwrap();
443 q.force_roll().unwrap();
444
445 let claimed = q.claim_oldest_segment().unwrap().unwrap();
446 assert!(q.ready_segments_sorted().unwrap().is_empty());
447
448 let restored = q.restore_claim(&claimed).unwrap().unwrap();
449 assert!(restored.exists());
450 assert_eq!(q.ready_segments_sorted().unwrap(), vec![restored]);
451 }
452
453 #[test]
454 fn open_recovers_stale_sending_files() {
455 let d = TempDir::new().unwrap();
456
457 let mut q = Queue::open(d.path().to_path_buf()).unwrap();
461 q.append(&rec()).unwrap();
462 let rolled = q.force_roll().unwrap().unwrap();
463 let claimed = rolled.with_file_name(format!(
464 "{}{}12345-abcdef",
465 rolled.file_name().unwrap().to_str().unwrap(),
466 SENDING_MARKER
467 ));
468 fs::rename(&rolled, &claimed).unwrap();
469
470 assert!(q.ready_segments_sorted().unwrap().is_empty());
472
473 let q2 = Queue::open(d.path().to_path_buf()).unwrap();
475 let ready = q2.ready_segments_sorted().unwrap();
476 assert_eq!(ready.len(), 1, "stale .sending file should be recovered as .ndjson");
477 assert!(
478 !claimed.exists(),
479 "original .sending file should have been renamed away"
480 );
481
482 let contents = fs::read_to_string(&ready[0]).unwrap();
484 assert!(
485 contents.contains(r#""event_id":"open_atomcode""#),
486 "recovered segment should contain original event data"
487 );
488 }
489
490 #[test]
491 fn open_removes_empty_partial_files() {
492 let d = TempDir::new().unwrap();
493
494 let empty_partial = d.path().join("20260512-000000-deadbeef.partial");
496 fs::File::create(&empty_partial).unwrap();
497 assert_eq!(fs::metadata(&empty_partial).unwrap().len(), 0);
498
499 let nonempty_partial = d.path().join("20260512-000001-alivecafe.partial");
501 fs::write(&nonempty_partial, b"some data\n").unwrap();
502
503 let _q = Queue::open(d.path().to_path_buf()).unwrap();
504
505 assert!(
506 !empty_partial.exists(),
507 "empty .partial file should be removed on Queue::open"
508 );
509 assert!(
510 nonempty_partial.exists(),
511 "non-empty .partial file should be kept"
512 );
513 }
514}