Skip to main content

kaizen/shell/
remote_pull.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Best-effort provider pull into local `remote_*` cache before read commands (when `DataSource` requests it).
3
4use crate::core::config::Config;
5use crate::core::data_source::DataSource;
6use crate::provider::{PullWindow, from_config as provider_from_config};
7use crate::store::Store;
8use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
9use anyhow::Result;
10use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13fn query_provider_label(cfg: &Config) -> String {
14    match cfg.telemetry.query.provider {
15        crate::core::config::QueryAuthority::None => "none".into(),
16        crate::core::config::QueryAuthority::Posthog => "posthog".into(),
17        crate::core::config::QueryAuthority::Datadog => "datadog".into(),
18    }
19}
20
21/// When `source` is not local, refresh remote cache if TTL expired or `force_refresh` (CLI `--refresh` with provider/mixed).
22pub fn maybe_telemetry_pull(
23    _workspace: &Path,
24    store: &Store,
25    cfg: &Config,
26    source: DataSource,
27    force_refresh: bool,
28) -> Result<()> {
29    if source == DataSource::Local {
30        return Ok(());
31    }
32    let Some(p) = provider_from_config(&cfg.telemetry) else {
33        tracing::debug!("telemetry: no query provider; skip pull");
34        return Ok(());
35    };
36    let state = store.get_pull_state()?;
37    let now_ms = SystemTime::now()
38        .duration_since(UNIX_EPOCH)
39        .unwrap_or_default()
40        .as_millis() as i64;
41    let ttl_ms = (cfg.telemetry.query.cache_ttl_seconds as i64).saturating_mul(1000);
42    let fresh = state
43        .last_success_ms
44        .map(|t| now_ms - t < ttl_ms)
45        .unwrap_or(false);
46    if !force_refresh && fresh {
47        return Ok(());
48    }
49    let page = p.pull(PullWindow { days: 7 }, None)?;
50    if !cfg.sync.team_id.trim().is_empty()
51        && let Some(ctx) = crate::sync::ingest_ctx(cfg, _workspace.to_path_buf())
52        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
53    {
54        match crate::provider::import_pull_page_to_remote(store, &cfg.sync.team_id, &wh, &page) {
55            Ok(n) if n > 0 => tracing::debug!(n, "remote_events: imported from provider pull"),
56            _ => {}
57        }
58    }
59    store.set_pull_state(&RemotePullState {
60        query_provider: query_provider_label(cfg),
61        cursor_json: String::new(),
62        last_success_ms: Some(now_ms),
63    })?;
64    Ok(())
65}