1use crate::core::config::{self, try_team_salt};
5use crate::shell::cli::workspace_path;
6use crate::store::Store;
7use crate::sync::FlushExporters;
8use crate::sync::flush_outbox_once;
9use crate::telemetry;
10use anyhow::{Context, Result};
11use std::path::Path;
12use std::thread;
13use std::time::Duration;
14
15pub fn sync_run_text(workspace: Option<&Path>, once: bool) -> Result<String> {
17 let ws = workspace_path(workspace)?;
18 let cfg = config::load(&ws)?;
19 if cfg.sync.endpoint.is_empty() {
20 tracing::info!("sync disabled (sync.endpoint empty)");
21 return Ok(String::new());
22 }
23 let salt = try_team_salt(&cfg.sync)
24 .context("sync requires team_salt_hex (64 hex chars), usually in ~/.kaizen/config.toml")?;
25 let db_path = crate::core::workspace::db_path(&ws)?;
26 let store = Store::open(&db_path)?;
27 let interval = cfg.sync.flush_interval_ms.max(100);
28 let registry = telemetry::load_exporters(&cfg.telemetry, &ws);
29 let flush = FlushExporters {
30 telemetry: &cfg.telemetry,
31 registry: if registry.is_empty() {
32 None
33 } else {
34 Some(®istry)
35 },
36 };
37
38 loop {
39 match flush_outbox_once(&store, &ws, &cfg.sync, &salt, &flush) {
40 Ok(stats) => {
41 if stats.batches > 0 {
42 tracing::info!(
43 batches = stats.batches,
44 events = stats.events_sent,
45 "sync flush ok"
46 );
47 }
48 }
49 Err(e) => tracing::error!("sync flush failed: {e:#}"),
50 }
51 if once {
52 break;
53 }
54 thread::sleep(Duration::from_millis(interval));
55 }
56 Ok(String::new())
57}
58
59pub fn cmd_sync_run(workspace: Option<&Path>, once: bool) -> Result<()> {
61 sync_run_text(workspace, once)?;
62 Ok(())
63}
64
65pub fn sync_status_text(workspace: Option<&Path>) -> Result<String> {
67 let ws = workspace_path(workspace)?;
68 let db_path = crate::core::workspace::db_path(&ws)?;
69 use std::fmt::Write;
70 let mut s = String::new();
71 if !db_path.exists() {
72 writeln!(&mut s, "no database at {}", db_path.display()).unwrap();
73 return Ok(s);
74 }
75 let store = Store::open(&db_path)?;
76 let st = store.sync_status()?;
77 writeln!(&mut s, "outbox pending: {}", st.pending_outbox).unwrap();
78 match st.last_success_ms {
79 Some(ms) => writeln!(&mut s, "last flush ok: {ms} ms since epoch").unwrap(),
80 None => writeln!(&mut s, "last flush ok: (never)").unwrap(),
81 }
82 writeln!(&mut s, "consecutive failures: {}", st.consecutive_failures).unwrap();
83 if let Some(e) = &st.last_error {
84 writeln!(&mut s, "last error: {e}").unwrap();
85 }
86 let cfg = config::load(&ws)?;
87 if cfg.sync.endpoint.is_empty() {
88 writeln!(&mut s, "sync endpoint: (disabled)").unwrap();
89 } else {
90 writeln!(&mut s, "sync endpoint: {}", cfg.sync.endpoint).unwrap();
91 }
92 Ok(s)
93}
94
95pub fn cmd_sync_status(workspace: Option<&Path>) -> Result<()> {
96 print!("{}", sync_status_text(workspace)?);
97 Ok(())
98}