Skip to main content

tracevault_cli/commands/
flush.rs

1use crate::api_client::{resolve_credentials, ApiClient};
2use crate::config::TracevaultConfig;
3use std::fs;
4use std::io::{BufRead, BufReader, Write};
5use std::path::Path;
6use tracevault_core::streaming::StreamEventRequest;
7
8pub async fn run_flush(project_root: &Path) -> Result<(), Box<dyn std::error::Error>> {
9    let config = TracevaultConfig::load(project_root).ok_or("config not found")?;
10    let org_slug = config.org_slug.ok_or("org_slug not configured")?;
11    let repo_id = config.repo_id.ok_or("repo_id not configured")?;
12
13    let (server_url, token) = resolve_credentials(project_root);
14    let server_url = server_url.ok_or("server_url not configured")?;
15    let client = ApiClient::new(&server_url, token.as_deref());
16
17    let sessions_dir = project_root.join(".tracevault").join("sessions");
18    if !sessions_dir.exists() {
19        println!("No sessions directory found. Nothing to flush.");
20        return Ok(());
21    }
22
23    let mut total_sent = 0u64;
24    let mut total_failed = 0u64;
25
26    let entries: Vec<_> = fs::read_dir(&sessions_dir)?
27        .filter_map(|e| e.ok())
28        .filter(|e| e.path().is_dir())
29        .collect();
30
31    for entry in entries {
32        let pending_path = entry.path().join("pending.jsonl");
33        if !pending_path.exists() {
34            continue;
35        }
36
37        let events = drain_pending(&pending_path)?;
38        if events.is_empty() {
39            continue;
40        }
41
42        let mut failed_events: Vec<StreamEventRequest> = Vec::new();
43
44        for event in events {
45            match client.stream_event(&org_slug, &repo_id, &event).await {
46                Ok(_) => {
47                    total_sent += 1;
48                }
49                Err(e) => {
50                    eprintln!(
51                        "Warning: failed to send event (session {}): {e}",
52                        event.session_id
53                    );
54                    failed_events.push(event);
55                    total_failed += 1;
56                }
57            }
58        }
59
60        // Re-enqueue failed events
61        if !failed_events.is_empty() {
62            append_pending(&pending_path, &failed_events)?;
63        }
64    }
65
66    println!("Flush complete: {total_sent} sent, {total_failed} failed");
67    Ok(())
68}
69
70/// Read and remove all events from a pending.jsonl file.
71fn drain_pending(path: &Path) -> Result<Vec<StreamEventRequest>, Box<dyn std::error::Error>> {
72    let file = fs::File::open(path)?;
73    let reader = BufReader::new(file);
74    let mut events = Vec::new();
75
76    for line in reader.lines() {
77        let line = line?;
78        let trimmed = line.trim();
79        if trimmed.is_empty() {
80            continue;
81        }
82        match serde_json::from_str::<StreamEventRequest>(trimmed) {
83            Ok(event) => events.push(event),
84            Err(e) => {
85                eprintln!("Warning: skipping malformed pending event: {e}");
86            }
87        }
88    }
89
90    // Truncate the file after reading
91    fs::write(path, "")?;
92
93    Ok(events)
94}
95
96/// Append events back to a pending.jsonl file (for re-enqueuing failures).
97fn append_pending(
98    path: &Path,
99    events: &[StreamEventRequest],
100) -> Result<(), Box<dyn std::error::Error>> {
101    let mut file = fs::OpenOptions::new()
102        .create(true)
103        .append(true)
104        .open(path)?;
105
106    for event in events {
107        let json = serde_json::to_string(event)?;
108        writeln!(file, "{json}")?;
109    }
110
111    Ok(())
112}