feldera_cloud1_client/
telemetry.rs

1//! Telemetry events format exchanged between platform and telemetry service.
2
3use reqwest::StatusCode;
4use serde::{Deserialize, Serialize};
5use thiserror::Error as ThisError;
6
7use crate::source_error;
8
9/// Request to register a telemetry event.
10/// Shared type between client and server.
11#[derive(Serialize, Deserialize, Debug)]
12pub struct RegisterTelemetryEventRequest {
13    pub account_id: String,
14    pub license_key: String,
15    pub event: TelemetryEvent,
16}
17
18#[derive(Serialize, Deserialize, Debug, Clone)]
19pub enum TelemetryEvent {
20    /// Pipeline provisioning succeeded.
21    PipelineProvisioned { id_hash: String },
22    /// Pipeline shutdown finished successfully.
23    PipelineShutdownFinished { id_hash: String },
24    /// Statistics update about a deployed pipeline.
25    PipelineStatistics {
26        id_hash: String,
27        statistics: serde_json::Value,
28    },
29}
30
31/// Enumeration of the errors that can occur during registering a telemetry event.
32#[derive(ThisError, Clone, Debug, PartialEq)]
33pub enum RegisterTelemetryEventError {
34    #[error("failed to serialize request to JSON due to {error}")]
35    SerializeRequestToJsonFailed { error: String },
36    #[error("failed to send request due to {error}")]
37    SendRequestFailed { error: String },
38    #[error("HTTP status code ({0}) received back with the response is unexpected")]
39    UnexpectedResponseStatusCode(StatusCode),
40}
41
42/// Registers a telemetry event at the API endpoint.
43pub async fn register_telemetry_event(
44    // HTTP client
45    client: &reqwest::Client,
46    // Cloud API endpoint
47    cloud_api_endpoint: &str,
48    // Identifier of the account to which the license belongs
49    account_id: &str,
50    // License key
51    license_key: &str,
52    // Event that occurred
53    event: TelemetryEvent,
54) -> Result<(), RegisterTelemetryEventError> {
55    let endpoint = format!("{cloud_api_endpoint}/telemetry/event");
56
57    match serde_json::to_value(RegisterTelemetryEventRequest {
58        account_id: account_id.to_string(),
59        license_key: license_key.to_string(),
60        event,
61    }) {
62        Ok(request) => {
63            let result = client.post(&endpoint).json(&request).send().await;
64            match result {
65                Ok(response) => {
66                    let status_code = response.status();
67                    if status_code == StatusCode::OK {
68                        Ok(())
69                    } else {
70                        Err(RegisterTelemetryEventError::UnexpectedResponseStatusCode(
71                            status_code,
72                        ))
73                    }
74                }
75                Err(e) => {
76                    let source_err = source_error(&e);
77                    let error = format!("{e}, source: {source_err}");
78                    Err(RegisterTelemetryEventError::SendRequestFailed { error })
79                }
80            }
81        }
82        Err(e) => Err(RegisterTelemetryEventError::SerializeRequestToJsonFailed {
83            error: e.to_string(),
84        }),
85    }
86}