1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5pub struct SyncOutboxStatus {
6 pub queued: usize,
7 pub failed: usize,
8 pub telemetry: usize,
9 pub server_tool_calls: usize,
10 pub code_index_syncs: usize,
11 pub path: String,
12 pub readable: bool,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SyncReport {
17 pub schema_version: u32,
18 pub generated_at: DateTime<Utc>,
19 pub action: String,
20 pub before: SyncOutboxStatus,
21 pub after: Option<SyncOutboxStatus>,
22 pub attempted: usize,
23 pub warnings: Vec<String>,
24}
25
26pub fn run_cli(args: &[String]) -> i32 {
27 let json = args.iter().any(|arg| arg == "--json");
28 let help = args.iter().any(|arg| arg == "--help" || arg == "-h");
29 let action = args
30 .iter()
31 .find(|arg| !arg.starts_with('-'))
32 .map(|arg| arg.as_str())
33 .unwrap_or("status");
34
35 if help {
36 println!("Usage:");
37 println!(" nebu-ctx sync status [--json]");
38 println!(" nebu-ctx sync flush [--json]");
39 return 0;
40 }
41
42 match action {
43 "status" => print_report(build_status_report("status", 0, None), json),
44 "flush" | "replay" => {
45 let before = build_outbox_status();
46 let attempted = crate::core::telemetry_queue::flush_pending();
47 let after = build_outbox_status();
48 print_report(
49 build_status_report("flush", attempted, Some((before, after))),
50 json,
51 )
52 }
53 _ => {
54 eprintln!("Unknown sync action: {action}");
55 eprintln!("Usage: nebu-ctx sync status|flush [--json]");
56 2
57 }
58 }
59}
60
61pub fn build_outbox_status() -> SyncOutboxStatus {
62 let path = outbox_path();
63 match crate::core::sync_outbox::load_entries() {
64 Ok(entries) => SyncOutboxStatus {
65 queued: entries.len(),
66 failed: entries
67 .iter()
68 .filter(|entry| entry.last_error.is_some())
69 .count(),
70 telemetry: entries
71 .iter()
72 .filter(|entry| {
73 entry.kind == crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest
74 })
75 .count(),
76 server_tool_calls: entries
77 .iter()
78 .filter(|entry| {
79 entry.kind == crate::core::sync_outbox::OutboxOperationKind::ServerToolCall
80 })
81 .count(),
82 code_index_syncs: entries
83 .iter()
84 .filter(|entry| {
85 entry.kind == crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync
86 })
87 .count(),
88 path: path.to_string_lossy().to_string(),
89 readable: true,
90 },
91 Err(_) => SyncOutboxStatus {
92 queued: 0,
93 failed: 0,
94 telemetry: 0,
95 server_tool_calls: 0,
96 code_index_syncs: 0,
97 path: path.to_string_lossy().to_string(),
98 readable: false,
99 },
100 }
101}
102
103fn build_status_report(
104 action: &str,
105 attempted: usize,
106 flush_statuses: Option<(SyncOutboxStatus, SyncOutboxStatus)>,
107) -> SyncReport {
108 let before = flush_statuses
109 .as_ref()
110 .map(|(before, _)| before.clone())
111 .unwrap_or_else(build_outbox_status);
112 let after = flush_statuses.map(|(_, after)| after);
113 let mut warnings = Vec::new();
114 if !before.readable || after.as_ref().is_some_and(|status| !status.readable) {
115 warnings.push("sync outbox is not readable".to_string());
116 }
117
118 SyncReport {
119 schema_version: 1,
120 generated_at: Utc::now(),
121 action: action.to_string(),
122 before,
123 after,
124 attempted,
125 warnings,
126 }
127}
128
129fn print_report(report: SyncReport, json: bool) -> i32 {
130 if json {
131 println!(
132 "{}",
133 serde_json::to_string_pretty(&report).unwrap_or_else(|_| "{}".to_string())
134 );
135 } else {
136 print_human(&report);
137 }
138
139 if report.before.readable && report.after.as_ref().is_none_or(|status| status.readable) {
140 0
141 } else {
142 1
143 }
144}
145
146fn print_human(report: &SyncReport) {
147 println!("nebu-ctx sync {}", report.action);
148 println!(
149 " before: {} queued, {} failed ({} telemetry, {} server calls, {} index syncs)",
150 report.before.queued,
151 report.before.failed,
152 report.before.telemetry,
153 report.before.server_tool_calls,
154 report.before.code_index_syncs
155 );
156 if let Some(after) = &report.after {
157 println!(
158 " after: {} queued, {} failed (attempted {})",
159 after.queued, after.failed, report.attempted
160 );
161 }
162 println!(" outbox: {}", report.before.path);
163}
164
165fn outbox_path() -> std::path::PathBuf {
166 crate::core::data_dir::nebu_ctx_data_dir()
167 .map(|dir| dir.join("sync/outbox"))
168 .unwrap_or_else(|_| std::path::PathBuf::from("sync/outbox"))
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn build_outbox_status_counts_kinds() {
177 let _lock = crate::core::data_dir::test_env_lock();
178 let tmp = tempfile::tempdir().unwrap();
179 std::env::set_var("NEBU_CTX_DATA_DIR", tmp.path());
180
181 crate::core::sync_outbox::enqueue(
182 crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest,
183 serde_json::json!({"tool_name":"ctx_read"}),
184 )
185 .unwrap();
186 crate::core::sync_outbox::enqueue(
187 crate::core::sync_outbox::OutboxOperationKind::ServerToolCall,
188 serde_json::json!({"tool_name":"ctx_brain"}),
189 )
190 .unwrap();
191 crate::core::sync_outbox::enqueue(
192 crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync,
193 serde_json::json!({"project_id":"proj_test"}),
194 )
195 .unwrap();
196
197 let status = build_outbox_status();
198 assert!(status.readable);
199 assert_eq!(status.queued, 3);
200 assert_eq!(status.telemetry, 1);
201 assert_eq!(status.server_tool_calls, 1);
202 assert_eq!(status.code_index_syncs, 1);
203 }
204}