Skip to main content

hive_router/pipeline/
usage_reporting.rs

1use std::{
2    sync::Arc,
3    time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6use async_trait::async_trait;
7use graphql_tools::parser::schema::Document;
8use hive_console_sdk::agent::usage_agent::{AgentError, UsageAgentExt};
9use hive_console_sdk::agent::usage_agent::{ExecutionReport, UsageAgent};
10use hive_router_config::telemetry::hive::{
11    is_slug_target_ref, is_uuid_target_ref, HiveTelemetryConfig,
12};
13use hive_router_config::usage_reporting::UsageReportingConfig;
14use hive_router_internal::telemetry::resolve_value_or_expression;
15use hive_router_plan_executor::execution::{
16    client_request_details::ClientRequestDetails, plan::PlanExecutionOutput,
17};
18
19use rand::Rng;
20use tokio_util::sync::CancellationToken;
21
22use crate::{
23    background_tasks::{BackgroundTask, BackgroundTasksManager},
24    consts::ROUTER_VERSION,
25};
26
27#[derive(Debug, thiserror::Error)]
28pub enum UsageReportingError {
29    #[error("Usage Reporting - Access token is missing. Please provide it via 'HIVE_ACCESS_TOKEN' environment variable or under 'telemetry.hive.token' in the configuration.")]
30    MissingAccessToken,
31    #[error("Failed to initialize usage agent: {0}")]
32    AgentCreationError(#[from] AgentError),
33    #[error("Usage Reporting - Configuration error: {0}")]
34    ConfigurationError(String),
35}
36
37pub fn init_hive_usage_agent(
38    bg_tasks_manager: &mut BackgroundTasksManager,
39    hive_config: &HiveTelemetryConfig,
40) -> Result<UsageAgent, UsageReportingError> {
41    let usage_config = &hive_config.usage_reporting;
42    let user_agent = format!("hive-router/{}", ROUTER_VERSION);
43    let access_token = match &hive_config.token {
44        Some(t) => resolve_value_or_expression(t, "Hive Telemetry token")
45            .map_err(|e| UsageReportingError::ConfigurationError(e.to_string()))?,
46        None => return Err(UsageReportingError::MissingAccessToken),
47    };
48
49    let target = match &hive_config.target {
50        Some(t) => Some(
51            resolve_value_or_expression(t, "Hive Telemetry target")
52                .map_err(|e| UsageReportingError::ConfigurationError(e.to_string()))?,
53        ),
54        None => None,
55    };
56
57    if let Some(target) = &target {
58        if !is_uuid_target_ref(target) && !is_slug_target_ref(target) {
59            return Err(UsageReportingError::ConfigurationError(format!(
60                "Invalid Hive Telemetry target format: '{}'. It must be either in slug format '$organizationSlug/$projectSlug/$targetSlug' or UUID format 'a0f4c605-6541-4350-8cfe-b31f21a4bf80'",
61                target
62            )));
63        }
64    }
65
66    let mut agent_builder = UsageAgent::builder()
67        .user_agent(user_agent)
68        .endpoint(usage_config.endpoint.clone())
69        .token(access_token)
70        .buffer_size(usage_config.buffer_size)
71        .connect_timeout(usage_config.connect_timeout)
72        .request_timeout(usage_config.request_timeout)
73        .accept_invalid_certs(usage_config.accept_invalid_certs)
74        .flush_interval(usage_config.flush_interval);
75
76    if let Some(target_id) = target {
77        agent_builder = agent_builder.target_id(target_id);
78    }
79
80    let agent = agent_builder.build()?;
81
82    bg_tasks_manager.register_task(agent.clone());
83    Ok(agent)
84}
85
86// TODO: simplfy args
87#[allow(clippy::too_many_arguments)]
88#[inline]
89pub async fn collect_usage_report<'a>(
90    schema: Arc<Document<'static, String>>,
91    duration: Duration,
92    client_name: Option<&str>,
93    client_version: Option<&str>,
94    client_request_details: &ClientRequestDetails<'a>,
95    hive_usage_agent: &UsageAgent,
96    usage_config: &UsageReportingConfig,
97    execution_result: &PlanExecutionOutput,
98) {
99    let sample_rate = usage_config.sample_rate.as_f64();
100    if sample_rate < 1.0 && !rand::rng().random_bool(sample_rate) {
101        return;
102    }
103    if client_request_details
104        .operation
105        .name
106        .is_some_and(|op_name| usage_config.exclude.iter().any(|s| s == op_name))
107    {
108        return;
109    }
110    let timestamp = SystemTime::now()
111        .duration_since(UNIX_EPOCH)
112        .unwrap()
113        .as_millis() as u64;
114    let execution_report = ExecutionReport {
115        schema,
116        client_name: client_name.map(|name| name.to_string()),
117        client_version: client_version.map(|version| version.to_string()),
118        timestamp,
119        duration,
120        ok: execution_result.error_count == 0,
121        errors: execution_result.error_count,
122        operation_body: client_request_details.operation.query.to_owned(),
123        operation_name: client_request_details
124            .operation
125            .name
126            .map(|op_name| op_name.to_owned()),
127        persisted_document_hash: None,
128    };
129
130    if let Err(err) = hive_usage_agent.add_report(execution_report).await {
131        tracing::error!("Failed to send usage report: {}", err);
132    }
133}
134
135#[async_trait]
136impl BackgroundTask for UsageAgent {
137    fn id(&self) -> &str {
138        "hive_console_usage_report_task"
139    }
140
141    async fn run(&self, token: CancellationToken) {
142        self.start_flush_interval(&token).await
143    }
144}