1use super::listen::{listen_request, ListenStream};
2use super::models::{
3 ArrayValue, CollectionSelector, Document, DocumentsTarget, FieldOperator, ListenRequest,
4 ListCollectionIdsRequest, ListCollectionIdsResponse, ListDocumentsResponse, MapValue,
5 QueryTarget, StructuredQuery, Target, TargetType, Value, ValueType,
6};
7use super::query::Query;
8use super::snapshot::{DocumentSnapshot, WriteResult};
9use super::FirestoreError;
10use reqwest::header;
11use reqwest_middleware::ClientWithMiddleware;
12use serde::de::Error;
13use serde::ser::Error as SerError;
14use serde::Serialize;
15use serde_json::map::Map;
16use serde_json::Value as SerdeValue;
17use std::collections::HashMap;
18
19pub(crate) fn convert_fields_to_serde_value(
21 fields: HashMap<String, Value>,
22) -> Result<SerdeValue, FirestoreError> {
23 let mut map = Map::new();
24 for (key, value) in fields {
25 map.insert(key, convert_value_to_serde_value(value)?);
26 }
27 Ok(SerdeValue::Object(map))
28}
29
30pub(crate) fn convert_value_to_serde_value(value: Value) -> Result<SerdeValue, FirestoreError> {
31 use serde_json::json;
32 Ok(match value.value_type {
33 ValueType::StringValue(s) => SerdeValue::String(s),
34 ValueType::IntegerValue(s) => {
35 let i: i64 = s.parse().map_err(|e| {
36 <serde_json::Error as Error>::custom(format!(
37 "Failed to parse integer string '{}': {}",
38 s, e
39 ))
40 })?;
41 SerdeValue::Number(i.into())
42 }
43 ValueType::DoubleValue(d) => SerdeValue::Number(
44 serde_json::Number::from_f64(d).ok_or_else(|| {
45 <serde_json::Error as Error>::custom(format!("Invalid f64 value: {}", d))
46 })?,
47 ),
48 ValueType::BooleanValue(b) => SerdeValue::Bool(b),
49 ValueType::MapValue(map_value) => convert_fields_to_serde_value(map_value.fields)?,
50 ValueType::ArrayValue(array_value) => {
51 let values = array_value
52 .values
53 .into_iter()
54 .map(convert_value_to_serde_value)
55 .collect::<Result<Vec<_>, _>>()?;
56 SerdeValue::Array(values)
57 }
58 ValueType::NullValue(_) => SerdeValue::Null,
59 ValueType::TimestampValue(s) => SerdeValue::String(s),
60 ValueType::GeoPointValue(gp) => {
61 json!({ "latitude": gp.latitude, "longitude": gp.longitude })
62 }
63 ValueType::BytesValue(s) => SerdeValue::String(s),
64 ValueType::ReferenceValue(s) => SerdeValue::String(s),
65 })
66}
67
68pub(crate) fn convert_serializable_to_fields<T: Serialize>(
70 value: &T,
71) -> Result<HashMap<String, Value>, FirestoreError> {
72 let serde_value = serde_json::to_value(value)?;
73 if let SerdeValue::Object(map) = serde_value {
74 let mut fields = HashMap::new();
75 for (k, v) in map {
76 fields.insert(k, convert_serde_value_to_firestore_value(v)?);
77 }
78 Ok(fields)
79 } else {
80 Err(FirestoreError::SerializationError(SerError::custom(
81 "Can only set objects as documents",
82 )))
83 }
84}
85
86pub(crate) fn convert_serde_value_to_firestore_value(
87 value: SerdeValue,
88) -> Result<Value, FirestoreError> {
89 let value_type = match value {
90 SerdeValue::Null => ValueType::NullValue(()),
91 SerdeValue::Bool(b) => ValueType::BooleanValue(b),
92 SerdeValue::Number(n) => {
93 if let Some(i) = n.as_i64() {
94 ValueType::IntegerValue(i.to_string())
95 } else if let Some(f) = n.as_f64() {
96 ValueType::DoubleValue(f)
97 } else {
98 return Err(FirestoreError::SerializationError(SerError::custom(
99 format!("Unsupported number type: {}", n)
100 )));
101 }
102 }
103 SerdeValue::String(s) => ValueType::StringValue(s),
104 SerdeValue::Array(a) => {
105 let values = a
106 .into_iter()
107 .map(convert_serde_value_to_firestore_value)
108 .collect::<Result<Vec<_>, _>>()?;
109 ValueType::ArrayValue(ArrayValue { values })
110 }
111 SerdeValue::Object(o) => {
112 let mut fields = HashMap::new();
113 for (k, v) in o {
114 fields.insert(k, convert_serde_value_to_firestore_value(v)?);
115 }
116 ValueType::MapValue(MapValue { fields })
117 }
118 };
119 Ok(Value { value_type })
120}
121
122pub(crate) fn extract_database_path(path: &str) -> String {
125 let parts: Vec<&str> = path.split("/documents").collect();
126 if parts.len() > 0 {
127 parts[0].to_string()
128 } else {
129 path.to_string()
131 }
132}
133
134fn extract_parent_and_collection(path: &str) -> Option<(String, String)> {
138 let start = path.find("projects/")?;
140 let resource_path = &path[start..];
141
142 let slash_idx = resource_path.rfind('/')?;
144 let collection_id = &resource_path[slash_idx + 1..];
145 let parent = &resource_path[..slash_idx];
146
147 Some((parent.to_string(), collection_id.to_string()))
148}
149
150#[derive(Clone, Debug)]
152pub struct DocumentReference<'a> {
153 pub(crate) client: &'a ClientWithMiddleware,
154 pub(crate) path: String,
155}
156
157impl<'a> DocumentReference<'a> {
158 pub async fn get(&self) -> Result<DocumentSnapshot<'a>, FirestoreError> {
164 let response = self.client.get(&self.path).send().await?;
165
166 let id = self.path.split('/').last().unwrap_or_default().to_string();
168
169 if response.status() == reqwest::StatusCode::NOT_FOUND {
170 return Ok(DocumentSnapshot {
171 id,
172 reference: self.clone(),
173 document: None,
174 read_time: None, });
176 }
177
178 if !response.status().is_success() {
179 let status = response.status();
180 let text = response.text().await.unwrap_or_default();
181 return Err(FirestoreError::ApiError(format!(
182 "Get document failed {}: {}",
183 status, text
184 )));
185 }
186
187 let doc: Document = response.json().await?;
188 let read_time = Some(chrono::Utc::now().to_rfc3339()); Ok(DocumentSnapshot {
191 id,
192 reference: self.clone(),
193 document: Some(doc),
194 read_time,
195 })
196 }
197
198 pub fn collection(&self, collection_id: &str) -> CollectionReference<'a> {
200 CollectionReference {
201 client: self.client,
202 path: format!("{}/{}", self.path, collection_id),
203 }
204 }
205
206 pub async fn list_collections(&self) -> Result<Vec<CollectionReference<'a>>, FirestoreError> {
208 let url = format!("{}:listCollectionIds", self.path);
209 let mut collections = Vec::new();
210 let mut next_page_token = None;
211
212 loop {
213 let request = ListCollectionIdsRequest {
214 page_size: Some(100),
215 page_token: next_page_token.take(),
216 };
217
218 let response = self
219 .client
220 .post(&url)
221 .header(header::CONTENT_TYPE, "application/json")
222 .body(serde_json::to_vec(&request)?)
223 .send()
224 .await?;
225
226 if !response.status().is_success() {
227 let status = response.status();
228 let text = response.text().await.unwrap_or_default();
229 return Err(FirestoreError::ApiError(format!(
230 "List collections failed {}: {}",
231 status, text
232 )));
233 }
234
235 let result: ListCollectionIdsResponse = response.json().await?;
236 for id in result.collection_ids {
237 collections.push(self.collection(&id));
238 }
239
240 if let Some(token) = result.next_page_token {
241 if token.is_empty() {
242 break;
243 }
244 next_page_token = Some(token);
245 } else {
246 break;
247 }
248 }
249
250 Ok(collections)
251 }
252
253 pub async fn set<T: Serialize>(&self, value: &T) -> Result<WriteResult, FirestoreError> {
261 let url = self.path.clone();
262
263 let fields = convert_serializable_to_fields(value)?;
264
265 let body = serde_json::to_vec(&serde_json::json!({ "fields": fields }))?;
266
267 let response = self
268 .client
269 .patch(&url)
270 .header(header::CONTENT_TYPE, "application/json")
271 .body(body)
272 .send()
273 .await?;
274
275 if !response.status().is_success() {
276 let status = response.status();
277 let text = response.text().await.unwrap_or_default();
278 return Err(FirestoreError::ApiError(format!(
279 "Set document failed {}: {}",
280 status, text
281 )));
282 }
283
284 let doc: Document = response.json().await?;
285 Ok(WriteResult {
286 write_time: doc.update_time,
287 })
288 }
289
290 pub async fn update<T: Serialize>(
299 &self,
300 value: &T,
301 update_mask: Option<Vec<String>>,
302 ) -> Result<WriteResult, FirestoreError> {
303 let fields = convert_serializable_to_fields(value)?;
304
305 let mut url = self.path.clone();
306 if let Some(mask) = update_mask {
307 url.push('?');
308 for (i, field) in mask.iter().enumerate() {
309 if i > 0 {
310 url.push('&');
311 }
312 url.push_str(&format!("updateMask.fieldPaths={}", field));
313 }
314 }
315
316 let body = serde_json::to_vec(&serde_json::json!({ "fields": fields }))?;
317
318 let response = self
319 .client
320 .patch(&url)
321 .header(header::CONTENT_TYPE, "application/json")
322 .body(body)
323 .send()
324 .await?;
325
326 if !response.status().is_success() {
327 let status = response.status();
328 let text = response.text().await.unwrap_or_default();
329 return Err(FirestoreError::ApiError(format!(
330 "Update document failed {}: {}",
331 status, text
332 )));
333 }
334
335 let doc: Document = response.json().await?;
336 Ok(WriteResult {
337 write_time: doc.update_time,
338 })
339 }
340
341 pub async fn delete(&self) -> Result<WriteResult, FirestoreError> {
343 let response = self.client.delete(&self.path).send().await?;
344
345 if !response.status().is_success() {
346 let status = response.status();
347 let text = response.text().await.unwrap_or_default();
348 return Err(FirestoreError::ApiError(format!(
349 "Delete document failed {}: {}",
350 status, text
351 )));
352 }
353
354 Ok(WriteResult {
359 write_time: chrono::Utc::now().to_rfc3339(),
360 })
361 }
362
363 pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
365 let database = extract_database_path(&self.path);
366
367 let target = Target {
368 target_type: Some(TargetType::Documents(DocumentsTarget {
369 documents: vec![self.path.clone()],
370 })),
371 target_id: Some(1), resume_token: None,
373 read_time: None,
374 once: None,
375 expected_count: None,
376 };
377
378 let request = ListenRequest {
379 database: database.clone(),
380 add_target: Some(target),
381 remove_target: None,
382 labels: None,
383 };
384
385 listen_request(self.client, &database, &request).await
386 }
387}
388
389#[derive(Clone, Debug)]
391pub struct CollectionReference<'a> {
392 pub(crate) client: &'a ClientWithMiddleware,
393 pub(crate) path: String,
394}
395
396impl<'a> CollectionReference<'a> {
397 pub fn doc(&self, document_id: &str) -> DocumentReference<'a> {
403 DocumentReference {
404 client: self.client,
405 path: format!("{}/{}", self.path, document_id),
406 }
407 }
408
409 pub async fn list_documents(&self) -> Result<ListDocumentsResponse, FirestoreError> {
411 let response = self.client.get(&self.path).send().await?;
412
413 if !response.status().is_success() {
414 let status = response.status();
415 let text = response.text().await.unwrap_or_default();
416 return Err(FirestoreError::ApiError(format!(
417 "List documents failed {}: {}",
418 status, text
419 )));
420 }
421
422 let list: ListDocumentsResponse = response.json().await?;
423 Ok(list)
424 }
425
426 pub async fn add<T: Serialize>(&self, value: &T) -> Result<DocumentReference<'a>, FirestoreError> {
432 let fields = convert_serializable_to_fields(value)?;
433 let body = serde_json::to_vec(&serde_json::json!({ "fields": fields }))?;
434
435 let response = self
436 .client
437 .post(&self.path)
438 .header(header::CONTENT_TYPE, "application/json")
439 .body(body)
440 .send()
441 .await?;
442
443 if !response.status().is_success() {
444 let status = response.status();
445 let text = response.text().await.unwrap_or_default();
446 return Err(FirestoreError::ApiError(format!(
447 "Add document failed {}: {}",
448 status, text
449 )));
450 }
451
452 let doc: Document = response.json().await?;
453 Ok(DocumentReference {
454 client: self.client,
455 path: doc.name,
456 })
457 }
458
459 pub fn where_filter<T: Serialize>(
461 &self,
462 field: &str,
463 op: FieldOperator,
464 value: T,
465 ) -> Result<Query<'a>, FirestoreError> {
466 self.query().where_filter(field, op, value)
467 }
468
469 pub fn order_by(&self, field: &str, direction: super::models::Direction) -> Query<'a> {
471 self.query().order_by(field, direction)
472 }
473
474 pub fn limit(&self, limit: i32) -> Query<'a> {
476 self.query().limit(limit)
477 }
478
479 pub fn offset(&self, offset: i32) -> Query<'a> {
481 self.query().offset(offset)
482 }
483
484 fn query(&self) -> Query<'a> {
485 let (parent, collection_id) = extract_parent_and_collection(&self.path)
486 .expect("Collection path should be valid");
487
488 Query::new(self.client, parent, collection_id)
489 }
490
491 pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
493 let database = extract_database_path(&self.path);
494 let (parent, collection_id) = extract_parent_and_collection(&self.path).ok_or_else(|| {
495 FirestoreError::ApiError("Failed to extract parent and collection ID".into())
496 })?;
497
498 let query_target = QueryTarget {
499 parent,
500 structured_query: Some(StructuredQuery {
501 from: Some(vec![CollectionSelector {
502 collection_id,
503 all_descendants: None,
504 }]),
505 select: None,
506 where_clause: None,
507 order_by: None,
508 start_at: None,
509 end_at: None,
510 offset: None,
511 limit: None,
512 }),
513 };
514
515 let target = Target {
516 target_type: Some(TargetType::Query(query_target)),
517 target_id: Some(1), resume_token: None,
519 read_time: None,
520 once: None,
521 expected_count: None,
522 };
523
524 let request = ListenRequest {
525 database: database.clone(),
526 add_target: Some(target),
527 remove_target: None,
528 labels: None,
529 };
530
531 listen_request(self.client, &database, &request).await
532 }
533}