kaizen/shell/
remote_pull.rs1use crate::core::config::Config;
5use crate::core::data_source::DataSource;
6use crate::provider::{PullWindow, from_config as provider_from_config};
7use crate::store::Store;
8use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
9use anyhow::Result;
10use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13fn query_provider_label(cfg: &Config) -> String {
14 match cfg.telemetry.query.provider {
15 crate::core::config::QueryAuthority::None => "none".into(),
16 crate::core::config::QueryAuthority::Posthog => "posthog".into(),
17 crate::core::config::QueryAuthority::Datadog => "datadog".into(),
18 }
19}
20
21pub fn maybe_telemetry_pull(
23 _workspace: &Path,
24 store: &Store,
25 cfg: &Config,
26 source: DataSource,
27 force_refresh: bool,
28) -> Result<()> {
29 if source == DataSource::Local {
30 return Ok(());
31 }
32 let Some(p) = provider_from_config(&cfg.telemetry) else {
33 tracing::debug!("telemetry: no query provider; skip pull");
34 return Ok(());
35 };
36 let state = store.get_pull_state()?;
37 let now_ms = SystemTime::now()
38 .duration_since(UNIX_EPOCH)
39 .unwrap_or_default()
40 .as_millis() as i64;
41 let ttl_ms = (cfg.telemetry.query.cache_ttl_seconds as i64).saturating_mul(1000);
42 let fresh = state
43 .last_success_ms
44 .map(|t| now_ms - t < ttl_ms)
45 .unwrap_or(false);
46 if !force_refresh && fresh {
47 return Ok(());
48 }
49 let page = p.pull(PullWindow { days: 7 }, None)?;
50 if !cfg.sync.team_id.trim().is_empty()
51 && let Some(ctx) = crate::sync::ingest_ctx(cfg, _workspace.to_path_buf())
52 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
53 {
54 match crate::provider::import_pull_page_to_remote(store, &cfg.sync.team_id, &wh, &page) {
55 Ok(n) if n > 0 => tracing::debug!(n, "remote_events: imported from provider pull"),
56 _ => {}
57 }
58 }
59 store.set_pull_state(&RemotePullState {
60 query_provider: query_provider_label(cfg),
61 cursor_json: String::new(),
62 last_success_ms: Some(now_ms),
63 })?;
64 Ok(())
65}