Skip to main content

nebu_ctx/
sync_cli.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5pub struct SyncOutboxStatus {
6    pub queued: usize,
7    pub failed: usize,
8    pub telemetry: usize,
9    pub server_tool_calls: usize,
10    pub code_index_syncs: usize,
11    pub path: String,
12    pub readable: bool,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SyncReport {
17    pub schema_version: u32,
18    pub generated_at: DateTime<Utc>,
19    pub action: String,
20    pub before: SyncOutboxStatus,
21    pub after: Option<SyncOutboxStatus>,
22    pub attempted: usize,
23    pub warnings: Vec<String>,
24}
25
26pub fn run_cli(args: &[String]) -> i32 {
27    let json = args.iter().any(|arg| arg == "--json");
28    let help = args.iter().any(|arg| arg == "--help" || arg == "-h");
29    let action = args
30        .iter()
31        .find(|arg| !arg.starts_with('-'))
32        .map(|arg| arg.as_str())
33        .unwrap_or("status");
34
35    if help {
36        println!("Usage:");
37        println!("  nebu-ctx sync status [--json]");
38        println!("  nebu-ctx sync flush [--json]");
39        return 0;
40    }
41
42    match action {
43        "status" => print_report(build_status_report("status", 0, None), json),
44        "flush" | "replay" => {
45            let before = build_outbox_status();
46            let attempted = crate::core::telemetry_queue::flush_pending();
47            let after = build_outbox_status();
48            print_report(
49                build_status_report("flush", attempted, Some((before, after))),
50                json,
51            )
52        }
53        _ => {
54            eprintln!("Unknown sync action: {action}");
55            eprintln!("Usage: nebu-ctx sync status|flush [--json]");
56            2
57        }
58    }
59}
60
61pub fn build_outbox_status() -> SyncOutboxStatus {
62    let path = outbox_path();
63    match crate::core::sync_outbox::load_entries() {
64        Ok(entries) => SyncOutboxStatus {
65            queued: entries.len(),
66            failed: entries
67                .iter()
68                .filter(|entry| entry.last_error.is_some())
69                .count(),
70            telemetry: entries
71                .iter()
72                .filter(|entry| {
73                    entry.kind == crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest
74                })
75                .count(),
76            server_tool_calls: entries
77                .iter()
78                .filter(|entry| {
79                    entry.kind == crate::core::sync_outbox::OutboxOperationKind::ServerToolCall
80                })
81                .count(),
82            code_index_syncs: entries
83                .iter()
84                .filter(|entry| {
85                    entry.kind == crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync
86                })
87                .count(),
88            path: path.to_string_lossy().to_string(),
89            readable: true,
90        },
91        Err(_) => SyncOutboxStatus {
92            queued: 0,
93            failed: 0,
94            telemetry: 0,
95            server_tool_calls: 0,
96            code_index_syncs: 0,
97            path: path.to_string_lossy().to_string(),
98            readable: false,
99        },
100    }
101}
102
103fn build_status_report(
104    action: &str,
105    attempted: usize,
106    flush_statuses: Option<(SyncOutboxStatus, SyncOutboxStatus)>,
107) -> SyncReport {
108    let before = flush_statuses
109        .as_ref()
110        .map(|(before, _)| before.clone())
111        .unwrap_or_else(build_outbox_status);
112    let after = flush_statuses.map(|(_, after)| after);
113    let mut warnings = Vec::new();
114    if !before.readable || after.as_ref().is_some_and(|status| !status.readable) {
115        warnings.push("sync outbox is not readable".to_string());
116    }
117
118    SyncReport {
119        schema_version: 1,
120        generated_at: Utc::now(),
121        action: action.to_string(),
122        before,
123        after,
124        attempted,
125        warnings,
126    }
127}
128
129fn print_report(report: SyncReport, json: bool) -> i32 {
130    if json {
131        println!(
132            "{}",
133            serde_json::to_string_pretty(&report).unwrap_or_else(|_| "{}".to_string())
134        );
135    } else {
136        print_human(&report);
137    }
138
139    if report.before.readable && report.after.as_ref().is_none_or(|status| status.readable) {
140        0
141    } else {
142        1
143    }
144}
145
146fn print_human(report: &SyncReport) {
147    println!("nebu-ctx sync {}", report.action);
148    println!(
149        "  before: {} queued, {} failed ({} telemetry, {} server calls, {} index syncs)",
150        report.before.queued,
151        report.before.failed,
152        report.before.telemetry,
153        report.before.server_tool_calls,
154        report.before.code_index_syncs
155    );
156    if let Some(after) = &report.after {
157        println!(
158            "  after:  {} queued, {} failed (attempted {})",
159            after.queued, after.failed, report.attempted
160        );
161    }
162    println!("  outbox: {}", report.before.path);
163}
164
165fn outbox_path() -> std::path::PathBuf {
166    crate::core::data_dir::nebu_ctx_data_dir()
167        .map(|dir| dir.join("sync/outbox"))
168        .unwrap_or_else(|_| std::path::PathBuf::from("sync/outbox"))
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn build_outbox_status_counts_kinds() {
177        let _lock = crate::core::data_dir::test_env_lock();
178        let tmp = tempfile::tempdir().unwrap();
179        std::env::set_var("NEBU_CTX_DATA_DIR", tmp.path());
180
181        crate::core::sync_outbox::enqueue(
182            crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest,
183            serde_json::json!({"tool_name":"ctx_read"}),
184        )
185        .unwrap();
186        crate::core::sync_outbox::enqueue(
187            crate::core::sync_outbox::OutboxOperationKind::ServerToolCall,
188            serde_json::json!({"tool_name":"ctx_brain"}),
189        )
190        .unwrap();
191        crate::core::sync_outbox::enqueue(
192            crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync,
193            serde_json::json!({"project_id":"proj_test"}),
194        )
195        .unwrap();
196
197        let status = build_outbox_status();
198        assert!(status.readable);
199        assert_eq!(status.queued, 3);
200        assert_eq!(status.telemetry, 1);
201        assert_eq!(status.server_tool_calls, 1);
202        assert_eq!(status.code_index_syncs, 1);
203    }
204}