tracevault_cli/commands/
flush.rs1use crate::api_client::{resolve_credentials, ApiClient};
2use crate::config::TracevaultConfig;
3use std::fs;
4use std::io::{BufRead, BufReader, Write};
5use std::path::Path;
6use tracevault_core::streaming::StreamEventRequest;
7
8pub async fn run_flush(project_root: &Path) -> Result<(), Box<dyn std::error::Error>> {
9 let config = TracevaultConfig::load(project_root).ok_or("config not found")?;
10 let org_slug = config.org_slug.ok_or("org_slug not configured")?;
11 let repo_id = config.repo_id.ok_or("repo_id not configured")?;
12
13 let (server_url, token) = resolve_credentials(project_root);
14 let server_url = server_url.ok_or("server_url not configured")?;
15 let client = ApiClient::new(&server_url, token.as_deref());
16
17 let sessions_dir = project_root.join(".tracevault").join("sessions");
18 if !sessions_dir.exists() {
19 println!("No sessions directory found. Nothing to flush.");
20 return Ok(());
21 }
22
23 let mut total_sent = 0u64;
24 let mut total_failed = 0u64;
25
26 let entries: Vec<_> = fs::read_dir(&sessions_dir)?
27 .filter_map(|e| e.ok())
28 .filter(|e| e.path().is_dir())
29 .collect();
30
31 for entry in entries {
32 let pending_path = entry.path().join("pending.jsonl");
33 if !pending_path.exists() {
34 continue;
35 }
36
37 let events = drain_pending(&pending_path)?;
38 if events.is_empty() {
39 continue;
40 }
41
42 let mut failed_events: Vec<StreamEventRequest> = Vec::new();
43
44 for event in events {
45 match client.stream_event(&org_slug, &repo_id, &event).await {
46 Ok(_) => {
47 total_sent += 1;
48 }
49 Err(e) => {
50 eprintln!(
51 "Warning: failed to send event (session {}): {e}",
52 event.session_id
53 );
54 failed_events.push(event);
55 total_failed += 1;
56 }
57 }
58 }
59
60 if !failed_events.is_empty() {
62 append_pending(&pending_path, &failed_events)?;
63 }
64 }
65
66 println!("Flush complete: {total_sent} sent, {total_failed} failed");
67 Ok(())
68}
69
70fn drain_pending(path: &Path) -> Result<Vec<StreamEventRequest>, Box<dyn std::error::Error>> {
72 let file = fs::File::open(path)?;
73 let reader = BufReader::new(file);
74 let mut events = Vec::new();
75
76 for line in reader.lines() {
77 let line = line?;
78 let trimmed = line.trim();
79 if trimmed.is_empty() {
80 continue;
81 }
82 match serde_json::from_str::<StreamEventRequest>(trimmed) {
83 Ok(event) => events.push(event),
84 Err(e) => {
85 eprintln!("Warning: skipping malformed pending event: {e}");
86 }
87 }
88 }
89
90 fs::write(path, "")?;
92
93 Ok(events)
94}
95
96fn append_pending(
98 path: &Path,
99 events: &[StreamEventRequest],
100) -> Result<(), Box<dyn std::error::Error>> {
101 let mut file = fs::OpenOptions::new()
102 .create(true)
103 .append(true)
104 .open(path)?;
105
106 for event in events {
107 let json = serde_json::to_string(event)?;
108 writeln!(file, "{json}")?;
109 }
110
111 Ok(())
112}