Skip to main content

codetether_agent/rlm/oracle/storage/
sync.rs

1//! Bulk sync of pending spool files to remote storage.
2//!
3//! Iterates every `.jsonl` file in the spool directory, attempts
4//! to upload each one, and removes the local copy on success.
5//! Invalid or upload-failed files are retained for the next pass.
6//!
7//! # Examples
8//!
9//! ```ignore
10//! let stats = storage.sync_pending().await?;
11//! println!("uploaded={} failed={}", stats.uploaded, stats.failed);
12//! ```
13
14use super::manager::OracleTraceStorage;
15use super::sync_stats::OracleTraceSyncStats;
16use crate::rlm::oracle::record::OracleTraceRecord;
17use anyhow::{Context, Result};
18
19impl OracleTraceStorage {
20    /// Upload all pending spool records to the remote backend.
21    ///
22    /// Files that fail to parse or upload remain on disk so a
23    /// later invocation can retry. Returns aggregate counters.
24    ///
25    /// # Examples
26    ///
27    /// ```ignore
28    /// let stats = storage.sync_pending().await?;
29    /// assert_eq!(stats.pending_after, 0);
30    /// ```
31    pub async fn sync_pending(&self) -> Result<OracleTraceSyncStats> {
32        tokio::fs::create_dir_all(&self.spool_dir)
33            .await
34            .with_context(|| format!("Create spool dir {}", self.spool_dir.display()))?;
35
36        let Some(remote) = &self.remote else {
37            let n = self.pending_count().await?;
38            return Ok(OracleTraceSyncStats {
39                retained: n,
40                pending_after: n,
41                ..Default::default()
42            });
43        };
44
45        let mut stats = OracleTraceSyncStats::default();
46        let mut dir = tokio::fs::read_dir(&self.spool_dir).await?;
47        while let Some(entry) = dir.next_entry().await? {
48            let path = entry.path();
49            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
50                continue;
51            }
52            let data = tokio::fs::read_to_string(&path).await?;
53            let record: OracleTraceRecord = match serde_json::from_str(&data) {
54                Ok(r) => r,
55                Err(e) => {
56                    stats.failed += 1;
57                    stats.retained += 1;
58                    tracing::warn!(path = %path.display(), error = %e, "Invalid spool file");
59                    continue;
60                }
61            };
62            match remote.upload_record(&record).await {
63                Ok(_) => {
64                    stats.uploaded += 1;
65                    if let Err(e) = tokio::fs::remove_file(&path).await {
66                        stats.failed += 1;
67                        stats.retained += 1;
68                        tracing::warn!(path = %path.display(), error = %e, "Cleanup failed");
69                    }
70                }
71                Err(e) => {
72                    stats.failed += 1;
73                    stats.retained += 1;
74                    tracing::warn!(path = %path.display(), error = %e, "Sync failed");
75                }
76            }
77        }
78        stats.pending_after = self.pending_count().await?;
79        Ok(stats)
80    }
81}