firebase_rs_sdk/performance/
transport.rs1use std::env;
2use std::sync::{Arc, RwLock};
3use std::time::Duration;
4
5use serde::Serialize;
6
7use crate::performance::api::Performance;
8use crate::performance::error::{internal_error, PerformanceResult};
9use crate::performance::storage::{
10 SerializableNetworkRequest, SerializableTrace, TraceEnvelope, TraceStoreHandle,
11};
12use crate::platform::runtime;
13use chrono::Utc;
14
15const DEFAULT_ENDPOINT: &str = "https://firebaselogging.googleapis.com/v0cc/log?format=json_proto3";
16const DEFAULT_BATCH_SIZE: usize = 25;
17const DEFAULT_INTERVAL: Duration = Duration::from_secs(10);
18const INITIAL_DELAY: Duration = Duration::from_millis(2500);
19
20#[derive(Clone, Debug)]
22pub struct TransportOptions {
23 pub endpoint: Option<String>,
24 pub api_key: Option<String>,
25 pub flush_interval: Option<Duration>,
26 pub max_batch_size: Option<usize>,
27}
28
29impl Default for TransportOptions {
30 fn default() -> Self {
31 Self {
32 endpoint: Some(DEFAULT_ENDPOINT.to_string()),
33 api_key: None,
34 flush_interval: None,
35 max_batch_size: None,
36 }
37 }
38}
39
40pub struct TransportController {
41 performance: Performance,
42 store: TraceStoreHandle,
43 options: Arc<RwLock<TransportOptions>>,
44 client: Arc<HttpTransportClient>,
45}
46
47impl TransportController {
48 pub fn new(
49 performance: Performance,
50 store: TraceStoreHandle,
51 options: Arc<RwLock<TransportOptions>>,
52 ) -> Arc<Self> {
53 let controller = Arc::new(Self {
54 performance,
55 store,
56 options,
57 client: Arc::new(HttpTransportClient::default()),
58 });
59 controller.spawn();
60 controller
61 }
62
63 fn spawn(self: &Arc<Self>) {
64 let this = Arc::clone(self);
65 runtime::spawn_detached(async move {
66 runtime::sleep(INITIAL_DELAY).await;
67 this.run().await;
68 });
69 }
70
71 async fn run(self: Arc<Self>) {
72 loop {
73 runtime::sleep(self.current_interval()).await;
74 if let Err(err) = self.flush_once().await {
75 log::debug!("performance transport flush failed: {err}");
76 }
77 }
78 }
79
80 fn current_interval(&self) -> Duration {
81 self.options
82 .read()
83 .map(|options| options.flush_interval.unwrap_or(DEFAULT_INTERVAL))
84 .unwrap_or(DEFAULT_INTERVAL)
85 }
86
87 fn batch_size(&self) -> usize {
88 self.options
89 .read()
90 .map(|options| options.max_batch_size.unwrap_or(DEFAULT_BATCH_SIZE))
91 .unwrap_or(DEFAULT_BATCH_SIZE)
92 }
93
94 pub async fn flush_once(&self) -> PerformanceResult<()> {
95 if !self.performance.data_collection_enabled() {
96 return Ok(());
97 }
98 let endpoint = match self.current_endpoint() {
99 Some(url) => url,
100 None => return Ok(()),
101 };
102 let batch = self.store.drain(self.batch_size()).await?;
103 if batch.is_empty() {
104 return Ok(());
105 }
106 let payload = self.build_payload(&batch).await?;
107 if let Err(err) = self.client.send(&endpoint, &payload).await {
108 log::debug!("performance transport send failed: {err}");
109 self.requeue(batch).await?;
110 }
111 Ok(())
112 }
113
114 async fn requeue(&self, entries: Vec<TraceEnvelope>) -> PerformanceResult<()> {
115 for entry in entries {
116 self.store.push(entry).await?;
117 }
118 Ok(())
119 }
120
121 pub fn trigger_flush(self: &Arc<Self>) {
122 let controller = Arc::clone(self);
123 runtime::spawn_detached(async move {
124 if let Err(err) = controller.flush_once().await {
125 log::debug!("performance transport ad-hoc flush failed: {err}");
126 }
127 });
128 }
129
130 fn current_endpoint(&self) -> Option<String> {
131 if env::var("FIREBASE_PERF_DISABLE_TRANSPORT").is_ok() {
132 return None;
133 }
134 self.options.read().ok().and_then(|options| {
135 options.endpoint.as_ref().map(|base| {
136 let mut url = base.clone();
137 let key = options
138 .api_key
139 .clone()
140 .or_else(|| self.performance.app().options().api_key.clone());
141 if let Some(key) = key {
142 if url.contains('?') {
143 url.push('&');
144 } else {
145 url.push('?');
146 }
147 url.push_str("key=");
148 url.push_str(&key);
149 }
150 url
151 })
152 })
153 }
154
155 async fn build_payload(&self, batch: &[TraceEnvelope]) -> PerformanceResult<TransportPayload> {
156 let mut traces = Vec::new();
157 let mut network = Vec::new();
158 for entry in batch {
159 match entry {
160 TraceEnvelope::Trace(trace) => traces.push(SerializableTrace::from(trace)),
161 TraceEnvelope::Network(record) => {
162 network.push(SerializableNetworkRequest::from(record))
163 }
164 }
165 }
166 Ok(TransportPayload {
167 request_time_ms: format!("{}", Utc::now().timestamp_millis()),
168 app_id: self.performance.app().options().app_id.clone(),
169 project_id: self.performance.app().options().project_id.clone(),
170 installation_id: self.performance.installation_id().await,
171 platform: current_platform(),
172 sdk_version: env!("CARGO_PKG_VERSION").to_string(),
173 traces,
174 network_requests: network,
175 })
176 }
177}
178
179fn current_platform() -> String {
180 #[cfg(all(feature = "wasm-web", target_arch = "wasm32"))]
181 {
182 return "wasm".into();
183 }
184 #[cfg(not(all(feature = "wasm-web", target_arch = "wasm32")))]
185 {
186 return "native".into();
187 }
188}
189
190struct HttpTransportClient {
191 client: reqwest::Client,
192}
193
194impl Default for HttpTransportClient {
195 fn default() -> Self {
196 Self {
197 client: reqwest::Client::new(),
198 }
199 }
200}
201
202impl HttpTransportClient {
203 async fn send(&self, endpoint: &str, payload: &TransportPayload) -> PerformanceResult<()> {
204 let response = self
205 .client
206 .post(endpoint)
207 .json(payload)
208 .send()
209 .await
210 .map_err(|err| internal_error(err.to_string()))?;
211 if !response.status().is_success() {
212 return Err(internal_error(format!(
213 "transport responded with status {}",
214 response.status()
215 )));
216 }
217 Ok(())
218 }
219}
220
221#[derive(Serialize)]
222struct TransportPayload {
223 request_time_ms: String,
224 app_id: Option<String>,
225 project_id: Option<String>,
226 installation_id: Option<String>,
227 platform: String,
228 sdk_version: String,
229 traces: Vec<SerializableTrace>,
230 network_requests: Vec<SerializableNetworkRequest>,
231}