firebase_rs_sdk/performance/
transport.rs

1use std::env;
2use std::sync::{Arc, RwLock};
3use std::time::Duration;
4
5use serde::Serialize;
6
7use crate::performance::api::Performance;
8use crate::performance::error::{internal_error, PerformanceResult};
9use crate::performance::storage::{
10    SerializableNetworkRequest, SerializableTrace, TraceEnvelope, TraceStoreHandle,
11};
12use crate::platform::runtime;
13use chrono::Utc;
14
15const DEFAULT_ENDPOINT: &str = "https://firebaselogging.googleapis.com/v0cc/log?format=json_proto3";
16const DEFAULT_BATCH_SIZE: usize = 25;
17const DEFAULT_INTERVAL: Duration = Duration::from_secs(10);
18const INITIAL_DELAY: Duration = Duration::from_millis(2500);
19
20/// Transport configuration surface for batch uploads.
21#[derive(Clone, Debug)]
22pub struct TransportOptions {
23    pub endpoint: Option<String>,
24    pub api_key: Option<String>,
25    pub flush_interval: Option<Duration>,
26    pub max_batch_size: Option<usize>,
27}
28
29impl Default for TransportOptions {
30    fn default() -> Self {
31        Self {
32            endpoint: Some(DEFAULT_ENDPOINT.to_string()),
33            api_key: None,
34            flush_interval: None,
35            max_batch_size: None,
36        }
37    }
38}
39
40pub struct TransportController {
41    performance: Performance,
42    store: TraceStoreHandle,
43    options: Arc<RwLock<TransportOptions>>,
44    client: Arc<HttpTransportClient>,
45}
46
47impl TransportController {
48    pub fn new(
49        performance: Performance,
50        store: TraceStoreHandle,
51        options: Arc<RwLock<TransportOptions>>,
52    ) -> Arc<Self> {
53        let controller = Arc::new(Self {
54            performance,
55            store,
56            options,
57            client: Arc::new(HttpTransportClient::default()),
58        });
59        controller.spawn();
60        controller
61    }
62
63    fn spawn(self: &Arc<Self>) {
64        let this = Arc::clone(self);
65        runtime::spawn_detached(async move {
66            runtime::sleep(INITIAL_DELAY).await;
67            this.run().await;
68        });
69    }
70
71    async fn run(self: Arc<Self>) {
72        loop {
73            runtime::sleep(self.current_interval()).await;
74            if let Err(err) = self.flush_once().await {
75                log::debug!("performance transport flush failed: {err}");
76            }
77        }
78    }
79
80    fn current_interval(&self) -> Duration {
81        self.options
82            .read()
83            .map(|options| options.flush_interval.unwrap_or(DEFAULT_INTERVAL))
84            .unwrap_or(DEFAULT_INTERVAL)
85    }
86
87    fn batch_size(&self) -> usize {
88        self.options
89            .read()
90            .map(|options| options.max_batch_size.unwrap_or(DEFAULT_BATCH_SIZE))
91            .unwrap_or(DEFAULT_BATCH_SIZE)
92    }
93
94    pub async fn flush_once(&self) -> PerformanceResult<()> {
95        if !self.performance.data_collection_enabled() {
96            return Ok(());
97        }
98        let endpoint = match self.current_endpoint() {
99            Some(url) => url,
100            None => return Ok(()),
101        };
102        let batch = self.store.drain(self.batch_size()).await?;
103        if batch.is_empty() {
104            return Ok(());
105        }
106        let payload = self.build_payload(&batch).await?;
107        if let Err(err) = self.client.send(&endpoint, &payload).await {
108            log::debug!("performance transport send failed: {err}");
109            self.requeue(batch).await?;
110        }
111        Ok(())
112    }
113
114    async fn requeue(&self, entries: Vec<TraceEnvelope>) -> PerformanceResult<()> {
115        for entry in entries {
116            self.store.push(entry).await?;
117        }
118        Ok(())
119    }
120
121    pub fn trigger_flush(self: &Arc<Self>) {
122        let controller = Arc::clone(self);
123        runtime::spawn_detached(async move {
124            if let Err(err) = controller.flush_once().await {
125                log::debug!("performance transport ad-hoc flush failed: {err}");
126            }
127        });
128    }
129
130    fn current_endpoint(&self) -> Option<String> {
131        if env::var("FIREBASE_PERF_DISABLE_TRANSPORT").is_ok() {
132            return None;
133        }
134        self.options.read().ok().and_then(|options| {
135            options.endpoint.as_ref().map(|base| {
136                let mut url = base.clone();
137                let key = options
138                    .api_key
139                    .clone()
140                    .or_else(|| self.performance.app().options().api_key.clone());
141                if let Some(key) = key {
142                    if url.contains('?') {
143                        url.push('&');
144                    } else {
145                        url.push('?');
146                    }
147                    url.push_str("key=");
148                    url.push_str(&key);
149                }
150                url
151            })
152        })
153    }
154
155    async fn build_payload(&self, batch: &[TraceEnvelope]) -> PerformanceResult<TransportPayload> {
156        let mut traces = Vec::new();
157        let mut network = Vec::new();
158        for entry in batch {
159            match entry {
160                TraceEnvelope::Trace(trace) => traces.push(SerializableTrace::from(trace)),
161                TraceEnvelope::Network(record) => {
162                    network.push(SerializableNetworkRequest::from(record))
163                }
164            }
165        }
166        Ok(TransportPayload {
167            request_time_ms: format!("{}", Utc::now().timestamp_millis()),
168            app_id: self.performance.app().options().app_id.clone(),
169            project_id: self.performance.app().options().project_id.clone(),
170            installation_id: self.performance.installation_id().await,
171            platform: current_platform(),
172            sdk_version: env!("CARGO_PKG_VERSION").to_string(),
173            traces,
174            network_requests: network,
175        })
176    }
177}
178
179fn current_platform() -> String {
180    #[cfg(all(feature = "wasm-web", target_arch = "wasm32"))]
181    {
182        return "wasm".into();
183    }
184    #[cfg(not(all(feature = "wasm-web", target_arch = "wasm32")))]
185    {
186        return "native".into();
187    }
188}
189
190struct HttpTransportClient {
191    client: reqwest::Client,
192}
193
194impl Default for HttpTransportClient {
195    fn default() -> Self {
196        Self {
197            client: reqwest::Client::new(),
198        }
199    }
200}
201
202impl HttpTransportClient {
203    async fn send(&self, endpoint: &str, payload: &TransportPayload) -> PerformanceResult<()> {
204        let response = self
205            .client
206            .post(endpoint)
207            .json(payload)
208            .send()
209            .await
210            .map_err(|err| internal_error(err.to_string()))?;
211        if !response.status().is_success() {
212            return Err(internal_error(format!(
213                "transport responded with status {}",
214                response.status()
215            )));
216        }
217        Ok(())
218    }
219}
220
221#[derive(Serialize)]
222struct TransportPayload {
223    request_time_ms: String,
224    app_id: Option<String>,
225    project_id: Option<String>,
226    installation_id: Option<String>,
227    platform: String,
228    sdk_version: String,
229    traces: Vec<SerializableTrace>,
230    network_requests: Vec<SerializableNetworkRequest>,
231}