Skip to main content

kaizen/shell/
sync.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen sync run` and `kaizen sync status`.
3
4use crate::core::config::{self, try_team_salt};
5use crate::shell::cli::workspace_path;
6use crate::store::Store;
7use crate::sync::FlushExporters;
8use crate::sync::flush_outbox_once;
9use crate::telemetry;
10use anyhow::{Context, Result};
11use std::path::Path;
12use std::thread;
13use std::time::Duration;
14
15/// No stdout (only tracing), matching `kaizen sync run`.
16pub fn sync_run_text(workspace: Option<&Path>, once: bool) -> Result<String> {
17    let ws = workspace_path(workspace)?;
18    let cfg = config::load(&ws)?;
19    if cfg.sync.endpoint.is_empty() {
20        tracing::info!("sync disabled (sync.endpoint empty)");
21        return Ok(String::new());
22    }
23    let salt = try_team_salt(&cfg.sync)
24        .context("sync requires team_salt_hex (64 hex chars), usually in ~/.kaizen/config.toml")?;
25    let db_path = crate::core::workspace::db_path(&ws)?;
26    let store = Store::open(&db_path)?;
27    let interval = cfg.sync.flush_interval_ms.max(100);
28    let registry = telemetry::load_exporters(&cfg.telemetry, &ws);
29    let flush = FlushExporters {
30        telemetry: &cfg.telemetry,
31        registry: if registry.is_empty() {
32            None
33        } else {
34            Some(&registry)
35        },
36    };
37
38    loop {
39        match flush_outbox_once(&store, &ws, &cfg.sync, &salt, &flush) {
40            Ok(stats) => {
41                if stats.batches > 0 {
42                    tracing::info!(
43                        batches = stats.batches,
44                        events = stats.events_sent,
45                        "sync flush ok"
46                    );
47                }
48            }
49            Err(e) => tracing::error!("sync flush failed: {e:#}"),
50        }
51        if once {
52            break;
53        }
54        thread::sleep(Duration::from_millis(interval));
55    }
56    Ok(String::new())
57}
58
59/// Foreground sync loop; `--once` performs a single flush (tests / cron).
60pub fn cmd_sync_run(workspace: Option<&Path>, once: bool) -> Result<()> {
61    sync_run_text(workspace, once)?;
62    Ok(())
63}
64
65/// Same stdout as `kaizen sync status`.
66pub fn sync_status_text(workspace: Option<&Path>) -> Result<String> {
67    let ws = workspace_path(workspace)?;
68    let db_path = crate::core::workspace::db_path(&ws)?;
69    use std::fmt::Write;
70    let mut s = String::new();
71    if !db_path.exists() {
72        writeln!(&mut s, "no database at {}", db_path.display()).unwrap();
73        return Ok(s);
74    }
75    let store = Store::open(&db_path)?;
76    let st = store.sync_status()?;
77    writeln!(&mut s, "outbox pending: {}", st.pending_outbox).unwrap();
78    match st.last_success_ms {
79        Some(ms) => writeln!(&mut s, "last flush ok: {ms} ms since epoch").unwrap(),
80        None => writeln!(&mut s, "last flush ok: (never)").unwrap(),
81    }
82    writeln!(&mut s, "consecutive failures: {}", st.consecutive_failures).unwrap();
83    if let Some(e) = &st.last_error {
84        writeln!(&mut s, "last error: {e}").unwrap();
85    }
86    let cfg = config::load(&ws)?;
87    if cfg.sync.endpoint.is_empty() {
88        writeln!(&mut s, "sync endpoint: (disabled)").unwrap();
89    } else {
90        writeln!(&mut s, "sync endpoint: {}", cfg.sync.endpoint).unwrap();
91    }
92    Ok(s)
93}
94
95pub fn cmd_sync_status(workspace: Option<&Path>) -> Result<()> {
96    print!("{}", sync_status_text(workspace)?);
97    Ok(())
98}