firebase_rs_sdk/firestore/remote/datastore/
http.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5
6use reqwest::Method;
7
8use crate::firestore::api::query::{Bound, FieldFilter, QueryDefinition};
9use crate::firestore::api::{DocumentSnapshot, SnapshotMetadata};
10use crate::firestore::error::{
11 internal_error, invalid_argument, FirestoreError, FirestoreErrorCode, FirestoreResult,
12};
13use crate::firestore::model::{DatabaseId, DocumentKey};
14use crate::firestore::remote::connection::{Connection, ConnectionBuilder, RequestContext};
15use crate::firestore::remote::serializer::JsonProtoSerializer;
16use crate::firestore::value::MapValue;
17use serde_json::{json, Value as JsonValue};
18
19use super::{Datastore, NoopTokenProvider, TokenProviderArc};
20
21#[derive(Clone)]
22pub struct HttpDatastore {
23 connection: Connection,
24 serializer: JsonProtoSerializer,
25 auth_provider: TokenProviderArc,
26 app_check_provider: TokenProviderArc,
27 retry: RetrySettings,
28}
29
30#[derive(Clone)]
31pub struct HttpDatastoreBuilder {
32 database_id: DatabaseId,
33 connection_builder: ConnectionBuilder,
34 auth_provider: TokenProviderArc,
35 app_check_provider: TokenProviderArc,
36 retry: RetrySettings,
37}
38
39#[derive(Clone, Debug)]
40pub struct RetrySettings {
41 pub max_attempts: usize,
42 pub initial_delay: Duration,
43 pub multiplier: f64,
44 pub max_delay: Duration,
45 pub request_timeout: Duration,
46}
47
48impl Default for RetrySettings {
49 fn default() -> Self {
50 Self {
51 max_attempts: 5,
52 initial_delay: Duration::from_millis(100),
53 multiplier: 1.5,
54 max_delay: Duration::from_secs(5),
55 request_timeout: Duration::from_secs(20),
56 }
57 }
58}
59
60impl HttpDatastore {
61 pub fn builder(database_id: DatabaseId) -> HttpDatastoreBuilder {
62 HttpDatastoreBuilder::new(database_id)
63 }
64
65 pub fn from_database_id(database_id: DatabaseId) -> FirestoreResult<Self> {
66 Self::builder(database_id).build()
67 }
68
69 fn new(
70 connection: Connection,
71 serializer: JsonProtoSerializer,
72 auth_provider: TokenProviderArc,
73 app_check_provider: TokenProviderArc,
74 retry: RetrySettings,
75 ) -> Self {
76 Self {
77 connection,
78 serializer,
79 auth_provider,
80 app_check_provider,
81 retry,
82 }
83 }
84
85 fn execute_with_retry<F, T>(&self, mut operation: F) -> FirestoreResult<T>
86 where
87 F: FnMut(&RequestContext) -> FirestoreResult<T>,
88 {
89 let mut attempt = 0usize;
90 loop {
91 let context = self.build_request_context()?;
92 match operation(&context) {
93 Ok(result) => return Ok(result),
94 Err(err) => {
95 if !self.retry.should_retry(attempt, &err) {
96 return Err(err);
97 }
98
99 if err.code == FirestoreErrorCode::Unauthenticated {
100 self.auth_provider.invalidate_token();
101 self.app_check_provider.invalidate_token();
102 }
103
104 let delay = self.retry.backoff_delay(attempt);
105 thread::sleep(delay);
106 attempt += 1;
107 }
108 }
109 }
110 }
111
112 fn build_request_context(&self) -> FirestoreResult<RequestContext> {
113 let auth_token = self.auth_provider.get_token()?;
114 let app_check_token = self.app_check_provider.get_token()?;
115 Ok(RequestContext {
116 auth_token,
117 app_check_token,
118 request_timeout: Some(self.retry.request_timeout),
119 })
120 }
121}
122
123impl Datastore for HttpDatastore {
124 fn get_document(&self, key: &DocumentKey) -> FirestoreResult<DocumentSnapshot> {
125 let doc_path = format!("documents/{}", key.path().canonical_string());
126 let snapshot = self.execute_with_retry(|context| {
127 self.connection
128 .invoke_json_optional(Method::GET, &doc_path, None, context)
129 })?;
130
131 if let Some(json) = snapshot {
132 let map_value = self
133 .serializer
134 .decode_document_fields(&json)?
135 .unwrap_or_else(|| MapValue::new(std::collections::BTreeMap::new()));
136 Ok(DocumentSnapshot::new(
137 key.clone(),
138 Some(map_value),
139 SnapshotMetadata::new(false, false),
140 ))
141 } else {
142 Ok(DocumentSnapshot::new(
143 key.clone(),
144 None,
145 SnapshotMetadata::new(false, false),
146 ))
147 }
148 }
149
150 fn set_document(&self, key: &DocumentKey, data: MapValue, merge: bool) -> FirestoreResult<()> {
151 if merge {
152 return Err(invalid_argument(
153 "HTTP datastore set with merge is not yet implemented",
154 ));
155 }
156
157 let commit_body = self.serializer.encode_commit_body(key, &data);
158 self.execute_with_retry(|context| {
159 self.connection
160 .invoke_json(
161 Method::POST,
162 "documents:commit",
163 Some(commit_body.clone()),
164 context,
165 )
166 .map(|_| ())
167 })
168 }
169
170 fn run_query(&self, query: &QueryDefinition) -> FirestoreResult<Vec<DocumentSnapshot>> {
171 let request_path = if query.parent_path().is_empty() {
172 "documents:runQuery".to_string()
173 } else {
174 format!(
175 "documents/{}:runQuery",
176 query.parent_path().canonical_string()
177 )
178 };
179
180 let structured_query = self.build_structured_query(query)?;
181 let body = json!({
182 "structuredQuery": structured_query
183 });
184
185 let response = self.execute_with_retry(|context| {
186 self.connection
187 .invoke_json(Method::POST, &request_path, Some(body.clone()), context)
188 })?;
189
190 let results = response
191 .as_array()
192 .ok_or_else(|| internal_error("Firestore runQuery response must be an array"))?;
193
194 let mut snapshots = Vec::new();
195 for entry in results {
196 let document = match entry.get("document") {
197 Some(value) => value,
198 None => continue,
199 };
200
201 let name = document
202 .get("name")
203 .and_then(|value| value.as_str())
204 .ok_or_else(|| {
205 internal_error("Firestore runQuery document missing 'name' field")
206 })?;
207 let key = self.parse_document_name(name)?;
208
209 let map_value = self
210 .serializer
211 .decode_document_fields(document)?
212 .unwrap_or_else(|| MapValue::new(BTreeMap::new()));
213
214 snapshots.push(DocumentSnapshot::new(
215 key,
216 Some(map_value),
217 SnapshotMetadata::new(false, false),
218 ));
219 }
220
221 Ok(snapshots)
222 }
223}
224
225impl HttpDatastore {
226 fn parse_document_name(&self, name: &str) -> FirestoreResult<DocumentKey> {
227 let prefix = format!("{}/documents/", self.serializer.database_name());
228 if !name.starts_with(&prefix) {
229 return Err(internal_error(format!(
230 "Unexpected document name '{name}' returned by Firestore"
231 )));
232 }
233
234 let relative = &name[prefix.len()..];
235 DocumentKey::from_string(relative)
236 }
237
238 fn build_structured_query(&self, definition: &QueryDefinition) -> FirestoreResult<JsonValue> {
239 let mut structured = serde_json::Map::new();
240
241 if let Some(fields) = definition.projection() {
242 let field_entries: Vec<_> = fields
243 .iter()
244 .map(|field| json!({ "fieldPath": field.canonical_string() }))
245 .collect();
246 structured.insert("select".to_string(), json!({ "fields": field_entries }));
247 }
248
249 structured.insert(
250 "from".to_string(),
251 json!([{
252 "collectionId": definition.collection_id(),
253 "allDescendants": false
254 }]),
255 );
256
257 if !definition.filters().is_empty() {
258 let filter_json = self.encode_filters(definition.filters());
259 structured.insert("where".to_string(), filter_json);
260 }
261
262 if !definition.request_order_by().is_empty() {
263 let orders: Vec<_> = definition
264 .request_order_by()
265 .iter()
266 .map(|order| {
267 json!({
268 "field": { "fieldPath": order.field().canonical_string() },
269 "direction": order.direction().as_str(),
270 })
271 })
272 .collect();
273 structured.insert("orderBy".to_string(), JsonValue::Array(orders));
274 }
275
276 if let Some(limit) = definition.limit() {
277 structured.insert("limit".to_string(), json!(limit as i64));
278 }
279
280 if let Some(start) = definition.request_start_at() {
281 structured.insert("startAt".to_string(), self.encode_start_cursor(start));
282 }
283
284 if let Some(end) = definition.request_end_at() {
285 structured.insert("endAt".to_string(), self.encode_end_cursor(end));
286 }
287
288 Ok(JsonValue::Object(structured))
289 }
290
291 fn encode_filters(&self, filters: &[FieldFilter]) -> JsonValue {
292 if filters.len() == 1 {
293 return self.encode_field_filter(&filters[0]);
294 }
295
296 let nested: Vec<_> = filters
297 .iter()
298 .map(|filter| self.encode_field_filter(filter))
299 .collect();
300
301 json!({
302 "compositeFilter": {
303 "op": "AND",
304 "filters": nested
305 }
306 })
307 }
308
309 fn encode_field_filter(&self, filter: &FieldFilter) -> JsonValue {
310 json!({
311 "fieldFilter": {
312 "field": { "fieldPath": filter.field().canonical_string() },
313 "op": filter.operator().as_str(),
314 "value": self.serializer.encode_value(filter.value())
315 }
316 })
317 }
318
319 fn encode_start_cursor(&self, bound: &Bound) -> JsonValue {
320 json!({
321 "values": bound
322 .values()
323 .iter()
324 .map(|value| self.serializer.encode_value(value))
325 .collect::<Vec<_>>(),
326 "before": bound.inclusive(),
327 })
328 }
329
330 fn encode_end_cursor(&self, bound: &Bound) -> JsonValue {
331 json!({
332 "values": bound
333 .values()
334 .iter()
335 .map(|value| self.serializer.encode_value(value))
336 .collect::<Vec<_>>(),
337 "before": !bound.inclusive(),
338 })
339 }
340}
341
342impl HttpDatastoreBuilder {
343 fn new(database_id: DatabaseId) -> Self {
344 let auth_provider: TokenProviderArc = Arc::new(NoopTokenProvider);
345 let app_check_provider: TokenProviderArc = Arc::new(NoopTokenProvider);
346 let connection_builder = Connection::builder(database_id.clone());
347 Self {
348 database_id,
349 connection_builder,
350 auth_provider,
351 app_check_provider,
352 retry: RetrySettings::default(),
353 }
354 }
355
356 pub fn with_auth_provider(mut self, provider: TokenProviderArc) -> Self {
357 self.auth_provider = provider;
358 self
359 }
360
361 pub fn with_app_check_provider(mut self, provider: TokenProviderArc) -> Self {
362 self.app_check_provider = provider;
363 self
364 }
365
366 pub fn with_retry_settings(mut self, settings: RetrySettings) -> Self {
367 self.retry = settings;
368 self
369 }
370
371 pub fn with_connection_builder(mut self, builder: ConnectionBuilder) -> Self {
372 self.connection_builder = builder;
373 self
374 }
375
376 pub fn build(self) -> FirestoreResult<HttpDatastore> {
377 let connection = self.connection_builder.build()?;
378 let serializer = JsonProtoSerializer::new(self.database_id.clone());
379 Ok(HttpDatastore::new(
380 connection,
381 serializer,
382 self.auth_provider,
383 self.app_check_provider,
384 self.retry,
385 ))
386 }
387}
388
389impl RetrySettings {
390 fn should_retry(&self, attempt: usize, error: &FirestoreError) -> bool {
391 if attempt + 1 >= self.max_attempts {
392 return false;
393 }
394
395 matches!(
396 error.code,
397 FirestoreErrorCode::Internal
398 | FirestoreErrorCode::Unavailable
399 | FirestoreErrorCode::DeadlineExceeded
400 | FirestoreErrorCode::ResourceExhausted
401 | FirestoreErrorCode::Unauthenticated
402 )
403 }
404
405 fn backoff_delay(&self, attempt: usize) -> Duration {
406 let factor = self.multiplier.powi(attempt as i32);
407 let delay = self.initial_delay.mul_f64(factor);
408 if delay > self.max_delay {
409 self.max_delay
410 } else {
411 delay
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::app::{FirebaseApp, FirebaseAppConfig, FirebaseOptions};
420 use crate::component::ComponentContainer;
421 use crate::firestore::api::Firestore;
422 use crate::firestore::error::{internal_error, unauthenticated};
423 use crate::firestore::model::DatabaseId;
424 use crate::test_support::start_mock_server;
425 use httpmock::prelude::*;
426 use serde_json::json;
427 use std::panic;
428
429 #[test]
430 fn retries_unauthenticated_errors() {
431 let settings = RetrySettings {
432 max_attempts: 3,
433 ..Default::default()
434 };
435 let error = unauthenticated("expired");
436 assert!(settings.should_retry(0, &error));
437 assert!(settings.should_retry(1, &error));
438 assert!(!settings.should_retry(2, &error));
439 }
440
441 #[test]
442 fn stops_retrying_after_max_attempts() {
443 let settings = RetrySettings {
444 max_attempts: 1,
445 ..Default::default()
446 };
447 let error = internal_error("boom");
448 assert!(!settings.should_retry(0, &error));
449 }
450
451 #[test]
452 fn run_query_fetches_documents() {
453 let server = match panic::catch_unwind(|| start_mock_server()) {
454 Ok(server) => server,
455 Err(_) => {
456 eprintln!(
457 "Skipping run_query_fetches_documents: unable to bind httpmock server in this environment."
458 );
459 return;
460 }
461 };
462 let database_id = DatabaseId::new("demo-project", "(default)");
463
464 let response_body = json!([
465 {
466 "document": {
467 "name": format!(
468 "projects/{}/databases/{}/documents/cities/LA",
469 database_id.project_id(),
470 database_id.database()
471 ),
472 "fields": {
473 "name": { "stringValue": "Los Angeles" }
474 }
475 }
476 },
477 {
478 "document": {
479 "name": format!(
480 "projects/{}/databases/{}/documents/cities/SF",
481 database_id.project_id(),
482 database_id.database()
483 ),
484 "fields": {
485 "name": { "stringValue": "San Francisco" }
486 }
487 }
488 }
489 ]);
490
491 let expected_body = json!({
492 "structuredQuery": {
493 "from": [
494 {
495 "collectionId": "cities",
496 "allDescendants": false
497 }
498 ],
499 "orderBy": [
500 {
501 "field": { "fieldPath": "__name__" },
502 "direction": "ASCENDING"
503 }
504 ]
505 }
506 });
507
508 let expected_path = format!(
509 "/v1/projects/{}/databases/{}/documents:runQuery",
510 database_id.project_id(),
511 database_id.database()
512 );
513
514 let run_query_path = expected_path.clone();
515 let expected_body_clone = expected_body.clone();
516 let response_clone = response_body.clone();
517
518 let _mock = server.mock(move |when, then| {
519 when.method(POST)
520 .path(run_query_path.as_str())
521 .json_body(expected_body_clone.clone());
522 then.status(200).json_body(response_clone.clone());
523 });
524
525 let client = reqwest::blocking::Client::builder()
526 .build()
527 .expect("reqwest client");
528
529 let connection_builder = Connection::builder(database_id.clone())
530 .with_client(client)
531 .with_emulator_host(server.address().to_string());
532
533 let datastore = HttpDatastore::builder(database_id.clone())
534 .with_connection_builder(connection_builder)
535 .build()
536 .expect("datastore");
537
538 let options = FirebaseOptions {
539 project_id: Some(database_id.project_id().to_string()),
540 ..Default::default()
541 };
542 let app = FirebaseApp::new(
543 options,
544 FirebaseAppConfig::new("query-test", false),
545 ComponentContainer::new("query-test"),
546 );
547
548 let firestore = Firestore::new(app, database_id.clone());
549 let query = firestore.collection("cities").unwrap().query();
550 let definition = query.definition();
551
552 let snapshots = datastore.run_query(&definition).expect("query");
553 assert_eq!(snapshots.len(), 2);
554 let names: Vec<_> = snapshots.iter().map(|snap| snap.id().to_string()).collect();
555 assert_eq!(names, vec!["LA", "SF"]);
556 }
557}