Skip to main content

firebase_admin_sdk/firestore/
query.rs

1use super::listen::{listen_request, ListenStream};
2use super::models::{
3    CollectionSelector, CompositeFilter, CompositeOperator, Direction, FieldFilter, FieldOperator,
4    FieldReference, FilterType, ListenRequest, Order, QueryFilter, QueryTarget, RunQueryRequest,
5    RunQueryResponse, StructuredQuery, Target, TargetType,
6};
7use super::reference::{
8    convert_serde_value_to_firestore_value, extract_database_path, DocumentReference,
9};
10use super::snapshot::{DocumentSnapshot, QuerySnapshot};
11use super::FirestoreError;
12use reqwest::header;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15
16/// A `Query` refers to a query which you can read or listen to.
17///
18/// You can also construct refined `Query` objects by adding filters and ordering.
19#[derive(Clone)]
20pub struct Query<'a> {
21    pub(crate) client: &'a ClientWithMiddleware,
22    pub(crate) parent_path: String, // projects/{id}/databases/{id}/documents or .../documents/col/doc
23    pub(crate) query: StructuredQuery,
24}
25
26impl<'a> Query<'a> {
27    pub(crate) fn new(
28        client: &'a ClientWithMiddleware,
29        parent_path: String,
30        collection_id: String,
31    ) -> Self {
32        Self {
33            client,
34            parent_path,
35            query: StructuredQuery {
36                select: None,
37                from: Some(vec![CollectionSelector {
38                    collection_id,
39                    all_descendants: None,
40                }]),
41                where_clause: None,
42                order_by: None,
43                start_at: None,
44                end_at: None,
45                offset: None,
46                limit: None,
47            },
48        }
49    }
50
51    /// Creates and returns a new `Query` with the additional filter.
52    ///
53    /// # Arguments
54    ///
55    /// * `field` - The path of the field to filter (e.g., "age").
56    /// * `op` - The operation to apply.
57    /// * `value` - The value to compare against.
58    pub fn where_filter<T: Serialize>(
59        &self,
60        field: &str,
61        op: FieldOperator,
62        value: T,
63    ) -> Result<Query<'a>, FirestoreError> {
64        let mut new_query = self.clone();
65
66        let serde_value = serde_json::to_value(value)?;
67        let firestore_value = convert_serde_value_to_firestore_value(serde_value)?;
68
69        let filter = QueryFilter {
70            filter_type: Some(FilterType::FieldFilter(FieldFilter {
71                field: FieldReference {
72                    field_path: field.to_string(),
73                },
74                op,
75                value: firestore_value,
76            })),
77        };
78
79        if let Some(existing_where) = &new_query.query.where_clause {
80            // If there's already a filter, we need to combine them using AND.
81            // If the existing filter is a CompositeFilter with AND, we can append.
82            // Otherwise, we create a new CompositeFilter with AND.
83
84            let new_composite = match &existing_where.filter_type {
85                Some(FilterType::CompositeFilter(cf)) if cf.op == CompositeOperator::And => {
86                    let mut filters = cf.filters.clone();
87                    filters.push(filter);
88                    CompositeFilter {
89                        op: CompositeOperator::And,
90                        filters,
91                    }
92                }
93                _ => CompositeFilter {
94                    op: CompositeOperator::And,
95                    filters: vec![existing_where.clone(), filter],
96                },
97            };
98
99            new_query.query.where_clause = Some(QueryFilter {
100                filter_type: Some(FilterType::CompositeFilter(new_composite)),
101            });
102        } else {
103            new_query.query.where_clause = Some(filter);
104        }
105
106        Ok(new_query)
107    }
108
109    /// Creates and returns a new `Query` that's additionally sorted by the specified field.
110    pub fn order_by(&self, field: &str, direction: Direction) -> Query<'a> {
111        let mut new_query = self.clone();
112
113        let order = Order {
114            field: FieldReference {
115                field_path: field.to_string(),
116            },
117            direction,
118        };
119
120        if let Some(order_by) = &mut new_query.query.order_by {
121            order_by.push(order);
122        } else {
123            new_query.query.order_by = Some(vec![order]);
124        }
125
126        new_query
127    }
128
129    /// Creates and returns a new `Query` that only returns the first matching documents.
130    pub fn limit(&self, limit: i32) -> Query<'a> {
131        let mut new_query = self.clone();
132        new_query.query.limit = Some(limit);
133        new_query
134    }
135
136    /// Creates and returns a new `Query` that skips the first matching documents.
137    pub fn offset(&self, offset: i32) -> Query<'a> {
138        let mut new_query = self.clone();
139        new_query.query.offset = Some(offset);
140        new_query
141    }
142
143    /// Executes the query and returns the results as a `QuerySnapshot`.
144    pub async fn get(&self) -> Result<QuerySnapshot<'a>, FirestoreError> {
145        let url = format!("{}:runQuery", self.parent_path);
146
147        let request = RunQueryRequest {
148            parent: self.parent_path.clone(),
149            structured_query: Some(self.query.clone()),
150        };
151
152        let response = self
153            .client
154            .post(&url)
155            .header(header::CONTENT_TYPE, "application/json")
156            .body(serde_json::to_vec(&request)?)
157            .send()
158            .await?;
159
160        if !response.status().is_success() {
161            let status = response.status();
162            let text = response.text().await.unwrap_or_default();
163            return Err(FirestoreError::ApiError(format!(
164                "Run query failed {}: {}",
165                status, text
166            )));
167        }
168
169        // The response is a stream of JSON objects, e.g. [{...}, {...}] or line-delimited?
170        // The REST API usually returns a JSON array [ { "document": ... }, ... ] for runQuery?
171        // Wait, documentation says "The response body contains a stream of RunQueryResponse messages."
172        // In standard REST/JSON, this often means a JSON array.
173
174        // Let's assume it returns a JSON array of RunQueryResponse objects.
175        // We need to parse this. `response.json::<Vec<RunQueryResponse>>()` might work.
176
177        let responses: Vec<RunQueryResponse> = response.json().await?;
178
179        let mut documents = Vec::new();
180        let mut read_time = None;
181
182        for res in responses {
183            if let Some(rt) = res.read_time {
184                read_time = Some(rt);
185            }
186
187            if let Some(doc) = res.document {
188                // Construct DocumentSnapshot
189                // Extract ID from name
190                let name = doc.name.clone();
191                let id = name.split('/').last().unwrap_or_default().to_string();
192
193                let doc_ref = DocumentReference {
194                    client: self.client,
195                    path: name, // The full path
196                };
197
198                documents.push(DocumentSnapshot {
199                    id,
200                    reference: doc_ref,
201                    document: Some(doc),
202                    read_time: read_time.clone(),
203                });
204            }
205        }
206
207        Ok(QuerySnapshot {
208            documents,
209            read_time,
210        })
211    }
212
213    /// Listens to changes to the query results.
214    pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
215        let database = extract_database_path(&self.parent_path);
216
217        let query_target = QueryTarget {
218            parent: self.parent_path.clone(),
219            structured_query: Some(self.query.clone()),
220        };
221
222        let target = Target {
223            target_type: Some(TargetType::Query(query_target)),
224            target_id: Some(1), // Arbitrary ID
225            resume_token: None,
226            read_time: None,
227            once: None,
228            expected_count: None,
229        };
230
231        let request = ListenRequest {
232            database: database.clone(),
233            add_target: Some(target),
234            remove_target: None,
235            labels: None,
236        };
237
238        listen_request(self.client, &database, &request).await
239    }
240}