hive_router/pipeline/
usage_reporting.rs1use std::{
2 sync::Arc,
3 time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6use async_trait::async_trait;
7use graphql_tools::parser::schema::Document;
8use hive_console_sdk::agent::usage_agent::{AgentError, UsageAgentExt};
9use hive_console_sdk::agent::usage_agent::{ExecutionReport, UsageAgent};
10use hive_router_config::telemetry::hive::{
11 is_slug_target_ref, is_uuid_target_ref, HiveTelemetryConfig,
12};
13use hive_router_config::usage_reporting::UsageReportingConfig;
14use hive_router_internal::telemetry::resolve_value_or_expression;
15use hive_router_plan_executor::execution::{
16 client_request_details::ClientRequestDetails, plan::PlanExecutionOutput,
17};
18
19use rand::Rng;
20use tokio_util::sync::CancellationToken;
21
22use crate::{
23 background_tasks::{BackgroundTask, BackgroundTasksManager},
24 consts::ROUTER_VERSION,
25};
26
27#[derive(Debug, thiserror::Error)]
28pub enum UsageReportingError {
29 #[error("Usage Reporting - Access token is missing. Please provide it via 'HIVE_ACCESS_TOKEN' environment variable or under 'telemetry.hive.token' in the configuration.")]
30 MissingAccessToken,
31 #[error("Failed to initialize usage agent: {0}")]
32 AgentCreationError(#[from] AgentError),
33 #[error("Usage Reporting - Configuration error: {0}")]
34 ConfigurationError(String),
35}
36
37pub fn init_hive_usage_agent(
38 bg_tasks_manager: &mut BackgroundTasksManager,
39 hive_config: &HiveTelemetryConfig,
40) -> Result<UsageAgent, UsageReportingError> {
41 let usage_config = &hive_config.usage_reporting;
42 let user_agent = format!("hive-router/{}", ROUTER_VERSION);
43 let access_token = match &hive_config.token {
44 Some(t) => resolve_value_or_expression(t, "Hive Telemetry token")
45 .map_err(|e| UsageReportingError::ConfigurationError(e.to_string()))?,
46 None => return Err(UsageReportingError::MissingAccessToken),
47 };
48
49 let target = match &hive_config.target {
50 Some(t) => Some(
51 resolve_value_or_expression(t, "Hive Telemetry target")
52 .map_err(|e| UsageReportingError::ConfigurationError(e.to_string()))?,
53 ),
54 None => None,
55 };
56
57 if let Some(target) = &target {
58 if !is_uuid_target_ref(target) && !is_slug_target_ref(target) {
59 return Err(UsageReportingError::ConfigurationError(format!(
60 "Invalid Hive Telemetry target format: '{}'. It must be either in slug format '$organizationSlug/$projectSlug/$targetSlug' or UUID format 'a0f4c605-6541-4350-8cfe-b31f21a4bf80'",
61 target
62 )));
63 }
64 }
65
66 let mut agent_builder = UsageAgent::builder()
67 .user_agent(user_agent)
68 .endpoint(usage_config.endpoint.clone())
69 .token(access_token)
70 .buffer_size(usage_config.buffer_size)
71 .connect_timeout(usage_config.connect_timeout)
72 .request_timeout(usage_config.request_timeout)
73 .accept_invalid_certs(usage_config.accept_invalid_certs)
74 .flush_interval(usage_config.flush_interval);
75
76 if let Some(target_id) = target {
77 agent_builder = agent_builder.target_id(target_id);
78 }
79
80 let agent = agent_builder.build()?;
81
82 bg_tasks_manager.register_task(agent.clone());
83 Ok(agent)
84}
85
86#[allow(clippy::too_many_arguments)]
88#[inline]
89pub async fn collect_usage_report<'a>(
90 schema: Arc<Document<'static, String>>,
91 duration: Duration,
92 client_name: Option<&str>,
93 client_version: Option<&str>,
94 client_request_details: &ClientRequestDetails<'a>,
95 hive_usage_agent: &UsageAgent,
96 usage_config: &UsageReportingConfig,
97 execution_result: &PlanExecutionOutput,
98) {
99 let sample_rate = usage_config.sample_rate.as_f64();
100 if sample_rate < 1.0 && !rand::rng().random_bool(sample_rate) {
101 return;
102 }
103 if client_request_details
104 .operation
105 .name
106 .is_some_and(|op_name| usage_config.exclude.iter().any(|s| s == op_name))
107 {
108 return;
109 }
110 let timestamp = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap()
113 .as_millis() as u64;
114 let execution_report = ExecutionReport {
115 schema,
116 client_name: client_name.map(|name| name.to_string()),
117 client_version: client_version.map(|version| version.to_string()),
118 timestamp,
119 duration,
120 ok: execution_result.error_count == 0,
121 errors: execution_result.error_count,
122 operation_body: client_request_details.operation.query.to_owned(),
123 operation_name: client_request_details
124 .operation
125 .name
126 .map(|op_name| op_name.to_owned()),
127 persisted_document_hash: None,
128 };
129
130 if let Err(err) = hive_usage_agent.add_report(execution_report).await {
131 tracing::error!("Failed to send usage report: {}", err);
132 }
133}
134
135#[async_trait]
136impl BackgroundTask for UsageAgent {
137 fn id(&self) -> &str {
138 "hive_console_usage_report_task"
139 }
140
141 async fn run(&self, token: CancellationToken) {
142 self.start_flush_interval(&token).await
143 }
144}