use super::manager::OracleTraceStorage;
use super::sync_stats::OracleTraceSyncStats;
use crate::rlm::oracle::record::OracleTraceRecord;
use anyhow::{Context, Result};
impl OracleTraceStorage {
pub async fn sync_pending(&self) -> Result<OracleTraceSyncStats> {
tokio::fs::create_dir_all(&self.spool_dir)
.await
.with_context(|| format!("Create spool dir {}", self.spool_dir.display()))?;
let Some(remote) = &self.remote else {
let n = self.pending_count().await?;
return Ok(OracleTraceSyncStats {
retained: n,
pending_after: n,
..Default::default()
});
};
let mut stats = OracleTraceSyncStats::default();
let mut dir = tokio::fs::read_dir(&self.spool_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
let data = tokio::fs::read_to_string(&path).await?;
let record: OracleTraceRecord = match serde_json::from_str(&data) {
Ok(r) => r,
Err(e) => {
stats.failed += 1;
stats.retained += 1;
tracing::warn!(path = %path.display(), error = %e, "Invalid spool file");
continue;
}
};
match remote.upload_record(&record).await {
Ok(_) => {
stats.uploaded += 1;
if let Err(e) = tokio::fs::remove_file(&path).await {
stats.failed += 1;
stats.retained += 1;
tracing::warn!(path = %path.display(), error = %e, "Cleanup failed");
}
}
Err(e) => {
stats.failed += 1;
stats.retained += 1;
tracing::warn!(path = %path.display(), error = %e, "Sync failed");
}
}
}
stats.pending_after = self.pending_count().await?;
Ok(stats)
}
}