firebase_rs_sdk/firestore/remote/datastore/
http.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5
6use reqwest::Method;
7
8use crate::firestore::api::query::{Bound, FieldFilter, QueryDefinition};
9use crate::firestore::api::{DocumentSnapshot, SnapshotMetadata};
10use crate::firestore::error::{
11    internal_error, invalid_argument, FirestoreError, FirestoreErrorCode, FirestoreResult,
12};
13use crate::firestore::model::{DatabaseId, DocumentKey};
14use crate::firestore::remote::connection::{Connection, ConnectionBuilder, RequestContext};
15use crate::firestore::remote::serializer::JsonProtoSerializer;
16use crate::firestore::value::MapValue;
17use serde_json::{json, Value as JsonValue};
18
19use super::{Datastore, NoopTokenProvider, TokenProviderArc};
20
21#[derive(Clone)]
22pub struct HttpDatastore {
23    connection: Connection,
24    serializer: JsonProtoSerializer,
25    auth_provider: TokenProviderArc,
26    app_check_provider: TokenProviderArc,
27    retry: RetrySettings,
28}
29
30#[derive(Clone)]
31pub struct HttpDatastoreBuilder {
32    database_id: DatabaseId,
33    connection_builder: ConnectionBuilder,
34    auth_provider: TokenProviderArc,
35    app_check_provider: TokenProviderArc,
36    retry: RetrySettings,
37}
38
39#[derive(Clone, Debug)]
40pub struct RetrySettings {
41    pub max_attempts: usize,
42    pub initial_delay: Duration,
43    pub multiplier: f64,
44    pub max_delay: Duration,
45    pub request_timeout: Duration,
46}
47
48impl Default for RetrySettings {
49    fn default() -> Self {
50        Self {
51            max_attempts: 5,
52            initial_delay: Duration::from_millis(100),
53            multiplier: 1.5,
54            max_delay: Duration::from_secs(5),
55            request_timeout: Duration::from_secs(20),
56        }
57    }
58}
59
60impl HttpDatastore {
61    pub fn builder(database_id: DatabaseId) -> HttpDatastoreBuilder {
62        HttpDatastoreBuilder::new(database_id)
63    }
64
65    pub fn from_database_id(database_id: DatabaseId) -> FirestoreResult<Self> {
66        Self::builder(database_id).build()
67    }
68
69    fn new(
70        connection: Connection,
71        serializer: JsonProtoSerializer,
72        auth_provider: TokenProviderArc,
73        app_check_provider: TokenProviderArc,
74        retry: RetrySettings,
75    ) -> Self {
76        Self {
77            connection,
78            serializer,
79            auth_provider,
80            app_check_provider,
81            retry,
82        }
83    }
84
85    fn execute_with_retry<F, T>(&self, mut operation: F) -> FirestoreResult<T>
86    where
87        F: FnMut(&RequestContext) -> FirestoreResult<T>,
88    {
89        let mut attempt = 0usize;
90        loop {
91            let context = self.build_request_context()?;
92            match operation(&context) {
93                Ok(result) => return Ok(result),
94                Err(err) => {
95                    if !self.retry.should_retry(attempt, &err) {
96                        return Err(err);
97                    }
98
99                    if err.code == FirestoreErrorCode::Unauthenticated {
100                        self.auth_provider.invalidate_token();
101                        self.app_check_provider.invalidate_token();
102                    }
103
104                    let delay = self.retry.backoff_delay(attempt);
105                    thread::sleep(delay);
106                    attempt += 1;
107                }
108            }
109        }
110    }
111
112    fn build_request_context(&self) -> FirestoreResult<RequestContext> {
113        let auth_token = self.auth_provider.get_token()?;
114        let app_check_token = self.app_check_provider.get_token()?;
115        Ok(RequestContext {
116            auth_token,
117            app_check_token,
118            request_timeout: Some(self.retry.request_timeout),
119        })
120    }
121}
122
123impl Datastore for HttpDatastore {
124    fn get_document(&self, key: &DocumentKey) -> FirestoreResult<DocumentSnapshot> {
125        let doc_path = format!("documents/{}", key.path().canonical_string());
126        let snapshot = self.execute_with_retry(|context| {
127            self.connection
128                .invoke_json_optional(Method::GET, &doc_path, None, context)
129        })?;
130
131        if let Some(json) = snapshot {
132            let map_value = self
133                .serializer
134                .decode_document_fields(&json)?
135                .unwrap_or_else(|| MapValue::new(std::collections::BTreeMap::new()));
136            Ok(DocumentSnapshot::new(
137                key.clone(),
138                Some(map_value),
139                SnapshotMetadata::new(false, false),
140            ))
141        } else {
142            Ok(DocumentSnapshot::new(
143                key.clone(),
144                None,
145                SnapshotMetadata::new(false, false),
146            ))
147        }
148    }
149
150    fn set_document(&self, key: &DocumentKey, data: MapValue, merge: bool) -> FirestoreResult<()> {
151        if merge {
152            return Err(invalid_argument(
153                "HTTP datastore set with merge is not yet implemented",
154            ));
155        }
156
157        let commit_body = self.serializer.encode_commit_body(key, &data);
158        self.execute_with_retry(|context| {
159            self.connection
160                .invoke_json(
161                    Method::POST,
162                    "documents:commit",
163                    Some(commit_body.clone()),
164                    context,
165                )
166                .map(|_| ())
167        })
168    }
169
170    fn run_query(&self, query: &QueryDefinition) -> FirestoreResult<Vec<DocumentSnapshot>> {
171        let request_path = if query.parent_path().is_empty() {
172            "documents:runQuery".to_string()
173        } else {
174            format!(
175                "documents/{}:runQuery",
176                query.parent_path().canonical_string()
177            )
178        };
179
180        let structured_query = self.build_structured_query(query)?;
181        let body = json!({
182            "structuredQuery": structured_query
183        });
184
185        let response = self.execute_with_retry(|context| {
186            self.connection
187                .invoke_json(Method::POST, &request_path, Some(body.clone()), context)
188        })?;
189
190        let results = response
191            .as_array()
192            .ok_or_else(|| internal_error("Firestore runQuery response must be an array"))?;
193
194        let mut snapshots = Vec::new();
195        for entry in results {
196            let document = match entry.get("document") {
197                Some(value) => value,
198                None => continue,
199            };
200
201            let name = document
202                .get("name")
203                .and_then(|value| value.as_str())
204                .ok_or_else(|| {
205                    internal_error("Firestore runQuery document missing 'name' field")
206                })?;
207            let key = self.parse_document_name(name)?;
208
209            let map_value = self
210                .serializer
211                .decode_document_fields(document)?
212                .unwrap_or_else(|| MapValue::new(BTreeMap::new()));
213
214            snapshots.push(DocumentSnapshot::new(
215                key,
216                Some(map_value),
217                SnapshotMetadata::new(false, false),
218            ));
219        }
220
221        Ok(snapshots)
222    }
223}
224
225impl HttpDatastore {
226    fn parse_document_name(&self, name: &str) -> FirestoreResult<DocumentKey> {
227        let prefix = format!("{}/documents/", self.serializer.database_name());
228        if !name.starts_with(&prefix) {
229            return Err(internal_error(format!(
230                "Unexpected document name '{name}' returned by Firestore"
231            )));
232        }
233
234        let relative = &name[prefix.len()..];
235        DocumentKey::from_string(relative)
236    }
237
238    fn build_structured_query(&self, definition: &QueryDefinition) -> FirestoreResult<JsonValue> {
239        let mut structured = serde_json::Map::new();
240
241        if let Some(fields) = definition.projection() {
242            let field_entries: Vec<_> = fields
243                .iter()
244                .map(|field| json!({ "fieldPath": field.canonical_string() }))
245                .collect();
246            structured.insert("select".to_string(), json!({ "fields": field_entries }));
247        }
248
249        structured.insert(
250            "from".to_string(),
251            json!([{
252                "collectionId": definition.collection_id(),
253                "allDescendants": false
254            }]),
255        );
256
257        if !definition.filters().is_empty() {
258            let filter_json = self.encode_filters(definition.filters());
259            structured.insert("where".to_string(), filter_json);
260        }
261
262        if !definition.request_order_by().is_empty() {
263            let orders: Vec<_> = definition
264                .request_order_by()
265                .iter()
266                .map(|order| {
267                    json!({
268                        "field": { "fieldPath": order.field().canonical_string() },
269                        "direction": order.direction().as_str(),
270                    })
271                })
272                .collect();
273            structured.insert("orderBy".to_string(), JsonValue::Array(orders));
274        }
275
276        if let Some(limit) = definition.limit() {
277            structured.insert("limit".to_string(), json!(limit as i64));
278        }
279
280        if let Some(start) = definition.request_start_at() {
281            structured.insert("startAt".to_string(), self.encode_start_cursor(start));
282        }
283
284        if let Some(end) = definition.request_end_at() {
285            structured.insert("endAt".to_string(), self.encode_end_cursor(end));
286        }
287
288        Ok(JsonValue::Object(structured))
289    }
290
291    fn encode_filters(&self, filters: &[FieldFilter]) -> JsonValue {
292        if filters.len() == 1 {
293            return self.encode_field_filter(&filters[0]);
294        }
295
296        let nested: Vec<_> = filters
297            .iter()
298            .map(|filter| self.encode_field_filter(filter))
299            .collect();
300
301        json!({
302            "compositeFilter": {
303                "op": "AND",
304                "filters": nested
305            }
306        })
307    }
308
309    fn encode_field_filter(&self, filter: &FieldFilter) -> JsonValue {
310        json!({
311            "fieldFilter": {
312                "field": { "fieldPath": filter.field().canonical_string() },
313                "op": filter.operator().as_str(),
314                "value": self.serializer.encode_value(filter.value())
315            }
316        })
317    }
318
319    fn encode_start_cursor(&self, bound: &Bound) -> JsonValue {
320        json!({
321            "values": bound
322                .values()
323                .iter()
324                .map(|value| self.serializer.encode_value(value))
325                .collect::<Vec<_>>(),
326            "before": bound.inclusive(),
327        })
328    }
329
330    fn encode_end_cursor(&self, bound: &Bound) -> JsonValue {
331        json!({
332            "values": bound
333                .values()
334                .iter()
335                .map(|value| self.serializer.encode_value(value))
336                .collect::<Vec<_>>(),
337            "before": !bound.inclusive(),
338        })
339    }
340}
341
342impl HttpDatastoreBuilder {
343    fn new(database_id: DatabaseId) -> Self {
344        let auth_provider: TokenProviderArc = Arc::new(NoopTokenProvider);
345        let app_check_provider: TokenProviderArc = Arc::new(NoopTokenProvider);
346        let connection_builder = Connection::builder(database_id.clone());
347        Self {
348            database_id,
349            connection_builder,
350            auth_provider,
351            app_check_provider,
352            retry: RetrySettings::default(),
353        }
354    }
355
356    pub fn with_auth_provider(mut self, provider: TokenProviderArc) -> Self {
357        self.auth_provider = provider;
358        self
359    }
360
361    pub fn with_app_check_provider(mut self, provider: TokenProviderArc) -> Self {
362        self.app_check_provider = provider;
363        self
364    }
365
366    pub fn with_retry_settings(mut self, settings: RetrySettings) -> Self {
367        self.retry = settings;
368        self
369    }
370
371    pub fn with_connection_builder(mut self, builder: ConnectionBuilder) -> Self {
372        self.connection_builder = builder;
373        self
374    }
375
376    pub fn build(self) -> FirestoreResult<HttpDatastore> {
377        let connection = self.connection_builder.build()?;
378        let serializer = JsonProtoSerializer::new(self.database_id.clone());
379        Ok(HttpDatastore::new(
380            connection,
381            serializer,
382            self.auth_provider,
383            self.app_check_provider,
384            self.retry,
385        ))
386    }
387}
388
389impl RetrySettings {
390    fn should_retry(&self, attempt: usize, error: &FirestoreError) -> bool {
391        if attempt + 1 >= self.max_attempts {
392            return false;
393        }
394
395        matches!(
396            error.code,
397            FirestoreErrorCode::Internal
398                | FirestoreErrorCode::Unavailable
399                | FirestoreErrorCode::DeadlineExceeded
400                | FirestoreErrorCode::ResourceExhausted
401                | FirestoreErrorCode::Unauthenticated
402        )
403    }
404
405    fn backoff_delay(&self, attempt: usize) -> Duration {
406        let factor = self.multiplier.powi(attempt as i32);
407        let delay = self.initial_delay.mul_f64(factor);
408        if delay > self.max_delay {
409            self.max_delay
410        } else {
411            delay
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::app::{FirebaseApp, FirebaseAppConfig, FirebaseOptions};
420    use crate::component::ComponentContainer;
421    use crate::firestore::api::Firestore;
422    use crate::firestore::error::{internal_error, unauthenticated};
423    use crate::firestore::model::DatabaseId;
424    use crate::test_support::start_mock_server;
425    use httpmock::prelude::*;
426    use serde_json::json;
427    use std::panic;
428
429    #[test]
430    fn retries_unauthenticated_errors() {
431        let settings = RetrySettings {
432            max_attempts: 3,
433            ..Default::default()
434        };
435        let error = unauthenticated("expired");
436        assert!(settings.should_retry(0, &error));
437        assert!(settings.should_retry(1, &error));
438        assert!(!settings.should_retry(2, &error));
439    }
440
441    #[test]
442    fn stops_retrying_after_max_attempts() {
443        let settings = RetrySettings {
444            max_attempts: 1,
445            ..Default::default()
446        };
447        let error = internal_error("boom");
448        assert!(!settings.should_retry(0, &error));
449    }
450
451    #[test]
452    fn run_query_fetches_documents() {
453        let server = match panic::catch_unwind(|| start_mock_server()) {
454            Ok(server) => server,
455            Err(_) => {
456                eprintln!(
457                    "Skipping run_query_fetches_documents: unable to bind httpmock server in this environment."
458                );
459                return;
460            }
461        };
462        let database_id = DatabaseId::new("demo-project", "(default)");
463
464        let response_body = json!([
465            {
466                "document": {
467                    "name": format!(
468                        "projects/{}/databases/{}/documents/cities/LA",
469                        database_id.project_id(),
470                        database_id.database()
471                    ),
472                    "fields": {
473                        "name": { "stringValue": "Los Angeles" }
474                    }
475                }
476            },
477            {
478                "document": {
479                    "name": format!(
480                        "projects/{}/databases/{}/documents/cities/SF",
481                        database_id.project_id(),
482                        database_id.database()
483                    ),
484                    "fields": {
485                        "name": { "stringValue": "San Francisco" }
486                    }
487                }
488            }
489        ]);
490
491        let expected_body = json!({
492            "structuredQuery": {
493                "from": [
494                    {
495                        "collectionId": "cities",
496                        "allDescendants": false
497                    }
498                ],
499                "orderBy": [
500                    {
501                        "field": { "fieldPath": "__name__" },
502                        "direction": "ASCENDING"
503                    }
504                ]
505            }
506        });
507
508        let expected_path = format!(
509            "/v1/projects/{}/databases/{}/documents:runQuery",
510            database_id.project_id(),
511            database_id.database()
512        );
513
514        let run_query_path = expected_path.clone();
515        let expected_body_clone = expected_body.clone();
516        let response_clone = response_body.clone();
517
518        let _mock = server.mock(move |when, then| {
519            when.method(POST)
520                .path(run_query_path.as_str())
521                .json_body(expected_body_clone.clone());
522            then.status(200).json_body(response_clone.clone());
523        });
524
525        let client = reqwest::blocking::Client::builder()
526            .build()
527            .expect("reqwest client");
528
529        let connection_builder = Connection::builder(database_id.clone())
530            .with_client(client)
531            .with_emulator_host(server.address().to_string());
532
533        let datastore = HttpDatastore::builder(database_id.clone())
534            .with_connection_builder(connection_builder)
535            .build()
536            .expect("datastore");
537
538        let options = FirebaseOptions {
539            project_id: Some(database_id.project_id().to_string()),
540            ..Default::default()
541        };
542        let app = FirebaseApp::new(
543            options,
544            FirebaseAppConfig::new("query-test", false),
545            ComponentContainer::new("query-test"),
546        );
547
548        let firestore = Firestore::new(app, database_id.clone());
549        let query = firestore.collection("cities").unwrap().query();
550        let definition = query.definition();
551
552        let snapshots = datastore.run_query(&definition).expect("query");
553        assert_eq!(snapshots.len(), 2);
554        let names: Vec<_> = snapshots.iter().map(|snap| snap.id().to_string()).collect();
555        assert_eq!(names, vec!["LA", "SF"]);
556    }
557}