Skip to main content

kaizen/provider/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Query-back from telemetry providers (PostHog, Datadog). OTLP has no pull in v1.
3
4use anyhow::Result;
5use serde_json::Value;
6use std::time::Duration;
7
8#[cfg(feature = "telemetry-datadog")]
9pub(crate) mod datadog;
10#[cfg(feature = "telemetry-posthog")]
11pub(crate) mod posthog;
12mod pull_import;
13
14/// Trailing time window for a pull (coarse; provider maps to its API).
15#[derive(Debug, Clone, Copy)]
16pub struct PullWindow {
17    pub days: u32,
18}
19
20impl Default for PullWindow {
21    fn default() -> Self {
22        Self { days: 7 }
23    }
24}
25
26/// One page of remote rows; cursor is opaque to Kaizen.
27#[derive(Debug, Clone, Default)]
28pub struct PullPage {
29    pub next_cursor: Option<String>,
30    pub items: Vec<Value>,
31}
32
33pub use pull_import::import_pull_page_to_remote;
34
35/// Abstraction for PostHog / Datadog query APIs. OTLP is export-only, not a query authority.
36pub trait TelemetryQueryProvider: Send + Sync {
37    fn health(&self) -> Result<()>;
38    /// Provider-reported label for debugging (e.g. `posthog-2024-01`).
39    fn schema_version(&self) -> &str;
40    fn pull(&self, window: PullWindow, cursor: Option<&str>) -> Result<PullPage>;
41}
42
43// Submodules `posthog` / `datadog` are feature-gated; keep timeout here for a single value.
44#[allow(dead_code)]
45const HTTP_TIMEOUT: Duration = Duration::from_secs(30);
46
47/// Build a `TelemetryQueryProvider` for the configured query authority, or `None` when pull
48/// is off. Resolves credentials from the matching `[[telemetry.exporters]]` row first, then
49/// env. This way `kaizen telemetry configure --type=datadog` is enough to drive `pull` —
50/// users do not have to *also* `export DD_API_KEY` in their shell.
51pub fn from_config(
52    cfg: &crate::core::config::TelemetryConfig,
53) -> Option<std::sync::Arc<dyn TelemetryQueryProvider>> {
54    use crate::core::config::QueryAuthority;
55    match cfg.query.provider {
56        QueryAuthority::None => None,
57        #[cfg(feature = "telemetry-posthog")]
58        QueryAuthority::Posthog => posthog_provider(cfg),
59        #[cfg(not(feature = "telemetry-posthog"))]
60        QueryAuthority::Posthog => {
61            tracing::warn!(
62                "telemetry query provider is posthog but `telemetry-posthog` feature is off"
63            );
64            None
65        }
66        #[cfg(feature = "telemetry-datadog")]
67        QueryAuthority::Datadog => datadog_provider(cfg),
68        #[cfg(not(feature = "telemetry-datadog"))]
69        QueryAuthority::Datadog => {
70            tracing::warn!(
71                "telemetry query provider is datadog but `telemetry-datadog` feature is off"
72            );
73            None
74        }
75    }
76}
77
78#[cfg(feature = "telemetry-datadog")]
79fn datadog_provider(
80    cfg: &crate::core::config::TelemetryConfig,
81) -> Option<std::sync::Arc<dyn TelemetryQueryProvider>> {
82    use crate::core::config::ExporterConfig;
83    let row = cfg
84        .exporters
85        .iter()
86        .find(|e| matches!(e, ExporterConfig::Datadog { .. }));
87    let resolved = row
88        .and_then(crate::telemetry::DatadogResolved::from_config)
89        .or_else(crate::telemetry::DatadogResolved::from_env_only)?;
90    Some(std::sync::Arc::new(datadog::DatadogQueryClient::new(&resolved)) as _)
91}
92
93#[cfg(feature = "telemetry-posthog")]
94fn posthog_provider(
95    cfg: &crate::core::config::TelemetryConfig,
96) -> Option<std::sync::Arc<dyn TelemetryQueryProvider>> {
97    use crate::core::config::ExporterConfig;
98    let row = cfg
99        .exporters
100        .iter()
101        .find(|e| matches!(e, ExporterConfig::PostHog { .. }));
102    let resolved = row
103        .and_then(crate::telemetry::PostHogResolved::from_config)
104        .or_else(crate::telemetry::PostHogResolved::from_env_only)?;
105    Some(std::sync::Arc::new(posthog::PostHogQueryClient::new(&resolved)) as _)
106}
107
108#[allow(dead_code)]
109pub(crate) fn http_timeout() -> Duration {
110    HTTP_TIMEOUT
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    struct Nop;
118    impl TelemetryQueryProvider for Nop {
119        fn health(&self) -> Result<()> {
120            Ok(())
121        }
122        fn schema_version(&self) -> &'static str {
123            "test"
124        }
125        fn pull(&self, _window: PullWindow, _cursor: Option<&str>) -> Result<PullPage> {
126            Ok(PullPage::default())
127        }
128    }
129
130    #[test]
131    fn nop_pull_empty() {
132        let p = Nop;
133        assert_eq!(p.schema_version(), "test");
134        let page = p.pull(PullWindow { days: 1 }, None).unwrap();
135        assert!(page.items.is_empty());
136    }
137}