codetether_agent/rlm/oracle/storage/
sync.rs1use super::manager::OracleTraceStorage;
15use super::sync_stats::OracleTraceSyncStats;
16use crate::rlm::oracle::record::OracleTraceRecord;
17use anyhow::{Context, Result};
18
19impl OracleTraceStorage {
20 pub async fn sync_pending(&self) -> Result<OracleTraceSyncStats> {
32 tokio::fs::create_dir_all(&self.spool_dir)
33 .await
34 .with_context(|| format!("Create spool dir {}", self.spool_dir.display()))?;
35
36 let Some(remote) = &self.remote else {
37 let n = self.pending_count().await?;
38 return Ok(OracleTraceSyncStats {
39 retained: n,
40 pending_after: n,
41 ..Default::default()
42 });
43 };
44
45 let mut stats = OracleTraceSyncStats::default();
46 let mut dir = tokio::fs::read_dir(&self.spool_dir).await?;
47 while let Some(entry) = dir.next_entry().await? {
48 let path = entry.path();
49 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
50 continue;
51 }
52 let data = tokio::fs::read_to_string(&path).await?;
53 let record: OracleTraceRecord = match serde_json::from_str(&data) {
54 Ok(r) => r,
55 Err(e) => {
56 stats.failed += 1;
57 stats.retained += 1;
58 tracing::warn!(path = %path.display(), error = %e, "Invalid spool file");
59 continue;
60 }
61 };
62 match remote.upload_record(&record).await {
63 Ok(_) => {
64 stats.uploaded += 1;
65 if let Err(e) = tokio::fs::remove_file(&path).await {
66 stats.failed += 1;
67 stats.retained += 1;
68 tracing::warn!(path = %path.display(), error = %e, "Cleanup failed");
69 }
70 }
71 Err(e) => {
72 stats.failed += 1;
73 stats.retained += 1;
74 tracing::warn!(path = %path.display(), error = %e, "Sync failed");
75 }
76 }
77 }
78 stats.pending_after = self.pending_count().await?;
79 Ok(stats)
80 }
81}