firebase_admin_sdk/firestore/
query.rs

1use super::listen::{listen_request, ListenStream};
2use super::models::{
3    CollectionSelector, CompositeFilter, CompositeOperator, Direction, FieldFilter, FieldOperator,
4    FieldReference, FilterType, ListenRequest, Order, QueryFilter, QueryTarget, RunQueryRequest,
5    RunQueryResponse, StructuredQuery, Target, TargetType,
6};
7use super::reference::{
8    convert_serde_value_to_firestore_value, extract_database_path, DocumentReference,
9};
10use super::snapshot::{DocumentSnapshot, QuerySnapshot};
11use super::FirestoreError;
12use reqwest::header;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15
16/// A `Query` refers to a query which you can read or listen to.
17///
18/// You can also construct refined `Query` objects by adding filters and ordering.
19#[derive(Clone)]
20pub struct Query<'a> {
21    pub(crate) client: &'a ClientWithMiddleware,
22    pub(crate) parent_path: String, // projects/{id}/databases/{id}/documents or .../documents/col/doc
23    pub(crate) collection_id: String,
24    pub(crate) query: StructuredQuery,
25}
26
27impl<'a> Query<'a> {
28    pub(crate) fn new(
29        client: &'a ClientWithMiddleware,
30        parent_path: String,
31        collection_id: String,
32    ) -> Self {
33        Self {
34            client,
35            parent_path,
36            collection_id: collection_id.clone(),
37            query: StructuredQuery {
38                select: None,
39                from: Some(vec![CollectionSelector {
40                    collection_id,
41                    all_descendants: None,
42                }]),
43                where_clause: None,
44                order_by: None,
45                start_at: None,
46                end_at: None,
47                offset: None,
48                limit: None,
49            },
50        }
51    }
52
53    /// Creates and returns a new `Query` with the additional filter.
54    ///
55    /// # Arguments
56    ///
57    /// * `field` - The path of the field to filter (e.g., "age").
58    /// * `op` - The operation to apply.
59    /// * `value` - The value to compare against.
60    pub fn where_filter<T: Serialize>(
61        &self,
62        field: &str,
63        op: FieldOperator,
64        value: T,
65    ) -> Result<Query<'a>, FirestoreError> {
66        let mut new_query = self.clone();
67
68        let serde_value = serde_json::to_value(value)?;
69        let firestore_value = convert_serde_value_to_firestore_value(serde_value)?;
70
71        let filter = QueryFilter {
72            filter_type: Some(FilterType::FieldFilter(FieldFilter {
73                field: FieldReference {
74                    field_path: field.to_string(),
75                },
76                op,
77                value: firestore_value,
78            })),
79        };
80
81        if let Some(existing_where) = &new_query.query.where_clause {
82            // If there's already a filter, we need to combine them using AND.
83            // If the existing filter is a CompositeFilter with AND, we can append.
84            // Otherwise, we create a new CompositeFilter with AND.
85
86            let new_composite = match &existing_where.filter_type {
87                Some(FilterType::CompositeFilter(cf)) if cf.op == CompositeOperator::And => {
88                    let mut filters = cf.filters.clone();
89                    filters.push(filter);
90                    CompositeFilter {
91                        op: CompositeOperator::And,
92                        filters,
93                    }
94                }
95                _ => CompositeFilter {
96                    op: CompositeOperator::And,
97                    filters: vec![existing_where.clone(), filter],
98                },
99            };
100
101            new_query.query.where_clause = Some(QueryFilter {
102                filter_type: Some(FilterType::CompositeFilter(new_composite)),
103            });
104        } else {
105            new_query.query.where_clause = Some(filter);
106        }
107
108        Ok(new_query)
109    }
110
111    /// Creates and returns a new `Query` that's additionally sorted by the specified field.
112    pub fn order_by(&self, field: &str, direction: Direction) -> Query<'a> {
113        let mut new_query = self.clone();
114
115        let order = Order {
116            field: FieldReference {
117                field_path: field.to_string(),
118            },
119            direction,
120        };
121
122        if let Some(order_by) = &mut new_query.query.order_by {
123            order_by.push(order);
124        } else {
125            new_query.query.order_by = Some(vec![order]);
126        }
127
128        new_query
129    }
130
131    /// Creates and returns a new `Query` that only returns the first matching documents.
132    pub fn limit(&self, limit: i32) -> Query<'a> {
133        let mut new_query = self.clone();
134        new_query.query.limit = Some(limit);
135        new_query
136    }
137
138    /// Creates and returns a new `Query` that skips the first matching documents.
139    pub fn offset(&self, offset: i32) -> Query<'a> {
140        let mut new_query = self.clone();
141        new_query.query.offset = Some(offset);
142        new_query
143    }
144
145    /// Executes the query and returns the results as a `QuerySnapshot`.
146    pub async fn get(&self) -> Result<QuerySnapshot<'a>, FirestoreError> {
147        let url = format!("{}:runQuery", self.parent_path);
148
149        let request = RunQueryRequest {
150            parent: self.parent_path.clone(),
151            structured_query: Some(self.query.clone()),
152        };
153
154        let response = self
155            .client
156            .post(&url)
157            .header(header::CONTENT_TYPE, "application/json")
158            .body(serde_json::to_vec(&request)?)
159            .send()
160            .await?;
161
162        if !response.status().is_success() {
163            let status = response.status();
164            let text = response.text().await.unwrap_or_default();
165            return Err(FirestoreError::ApiError(format!(
166                "Run query failed {}: {}",
167                status, text
168            )));
169        }
170
171        // The response is a stream of JSON objects, e.g. [{...}, {...}] or line-delimited?
172        // The REST API usually returns a JSON array [ { "document": ... }, ... ] for runQuery?
173        // Wait, documentation says "The response body contains a stream of RunQueryResponse messages."
174        // In standard REST/JSON, this often means a JSON array.
175
176        // Let's assume it returns a JSON array of RunQueryResponse objects.
177        // We need to parse this. `response.json::<Vec<RunQueryResponse>>()` might work.
178
179        let responses: Vec<RunQueryResponse> = response.json().await?;
180
181        let mut documents = Vec::new();
182        let mut read_time = None;
183
184        for res in responses {
185            if let Some(rt) = res.read_time {
186                read_time = Some(rt);
187            }
188
189            if let Some(doc) = res.document {
190                // Construct DocumentSnapshot
191                // Extract ID from name
192                let name = doc.name.clone();
193                let id = name.split('/').last().unwrap_or_default().to_string();
194
195                let doc_ref = DocumentReference {
196                    client: self.client,
197                    path: name, // The full path
198                };
199
200                documents.push(DocumentSnapshot {
201                    id,
202                    reference: doc_ref,
203                    document: Some(doc),
204                    read_time: read_time.clone(),
205                });
206            }
207        }
208
209        Ok(QuerySnapshot {
210            documents,
211            read_time,
212        })
213    }
214
215    /// Listens to changes to the query results.
216    pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
217        let database = extract_database_path(&self.parent_path);
218
219        let query_target = QueryTarget {
220            parent: self.parent_path.clone(),
221            structured_query: Some(self.query.clone()),
222        };
223
224        let target = Target {
225            target_type: Some(TargetType::Query(query_target)),
226            target_id: Some(1), // Arbitrary ID
227            resume_token: None,
228            read_time: None,
229            once: None,
230            expected_count: None,
231        };
232
233        let request = ListenRequest {
234            database: database.clone(),
235            add_target: Some(target),
236            remove_target: None,
237            labels: None,
238        };
239
240        listen_request(self.client, &database, &request).await
241    }
242}