firebase_admin_sdk/firestore/
query.rs1use super::listen::{listen_request, ListenStream};
2use super::models::{
3 CollectionSelector, CompositeFilter, CompositeOperator, Direction, FieldFilter, FieldOperator,
4 FieldReference, FilterType, ListenRequest, Order, QueryFilter, QueryTarget, RunQueryRequest,
5 RunQueryResponse, StructuredQuery, Target, TargetType,
6};
7use super::reference::{
8 convert_serde_value_to_firestore_value, extract_database_path, DocumentReference,
9};
10use super::snapshot::{DocumentSnapshot, QuerySnapshot};
11use super::FirestoreError;
12use reqwest::header;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15
16#[derive(Clone)]
20pub struct Query<'a> {
21 pub(crate) client: &'a ClientWithMiddleware,
22 pub(crate) parent_path: String, pub(crate) query: StructuredQuery,
24}
25
26impl<'a> Query<'a> {
27 pub(crate) fn new(
28 client: &'a ClientWithMiddleware,
29 parent_path: String,
30 collection_id: String,
31 ) -> Self {
32 Self {
33 client,
34 parent_path,
35 query: StructuredQuery {
36 select: None,
37 from: Some(vec![CollectionSelector {
38 collection_id,
39 all_descendants: None,
40 }]),
41 where_clause: None,
42 order_by: None,
43 start_at: None,
44 end_at: None,
45 offset: None,
46 limit: None,
47 },
48 }
49 }
50
51 pub fn where_filter<T: Serialize>(
59 &self,
60 field: &str,
61 op: FieldOperator,
62 value: T,
63 ) -> Result<Query<'a>, FirestoreError> {
64 let mut new_query = self.clone();
65
66 let serde_value = serde_json::to_value(value)?;
67 let firestore_value = convert_serde_value_to_firestore_value(serde_value)?;
68
69 let filter = QueryFilter {
70 filter_type: Some(FilterType::FieldFilter(FieldFilter {
71 field: FieldReference {
72 field_path: field.to_string(),
73 },
74 op,
75 value: firestore_value,
76 })),
77 };
78
79 if let Some(existing_where) = &new_query.query.where_clause {
80 let new_composite = match &existing_where.filter_type {
85 Some(FilterType::CompositeFilter(cf)) if cf.op == CompositeOperator::And => {
86 let mut filters = cf.filters.clone();
87 filters.push(filter);
88 CompositeFilter {
89 op: CompositeOperator::And,
90 filters,
91 }
92 }
93 _ => CompositeFilter {
94 op: CompositeOperator::And,
95 filters: vec![existing_where.clone(), filter],
96 },
97 };
98
99 new_query.query.where_clause = Some(QueryFilter {
100 filter_type: Some(FilterType::CompositeFilter(new_composite)),
101 });
102 } else {
103 new_query.query.where_clause = Some(filter);
104 }
105
106 Ok(new_query)
107 }
108
109 pub fn order_by(&self, field: &str, direction: Direction) -> Query<'a> {
111 let mut new_query = self.clone();
112
113 let order = Order {
114 field: FieldReference {
115 field_path: field.to_string(),
116 },
117 direction,
118 };
119
120 if let Some(order_by) = &mut new_query.query.order_by {
121 order_by.push(order);
122 } else {
123 new_query.query.order_by = Some(vec![order]);
124 }
125
126 new_query
127 }
128
129 pub fn limit(&self, limit: i32) -> Query<'a> {
131 let mut new_query = self.clone();
132 new_query.query.limit = Some(limit);
133 new_query
134 }
135
136 pub fn offset(&self, offset: i32) -> Query<'a> {
138 let mut new_query = self.clone();
139 new_query.query.offset = Some(offset);
140 new_query
141 }
142
143 pub async fn get(&self) -> Result<QuerySnapshot<'a>, FirestoreError> {
145 let url = format!("{}:runQuery", self.parent_path);
146
147 let request = RunQueryRequest {
148 parent: self.parent_path.clone(),
149 structured_query: Some(self.query.clone()),
150 };
151
152 let response = self
153 .client
154 .post(&url)
155 .header(header::CONTENT_TYPE, "application/json")
156 .body(serde_json::to_vec(&request)?)
157 .send()
158 .await?;
159
160 if !response.status().is_success() {
161 let status = response.status();
162 let text = response.text().await.unwrap_or_default();
163 return Err(FirestoreError::ApiError(format!(
164 "Run query failed {}: {}",
165 status, text
166 )));
167 }
168
169 let responses: Vec<RunQueryResponse> = response.json().await?;
178
179 let mut documents = Vec::new();
180 let mut read_time = None;
181
182 for res in responses {
183 if let Some(rt) = res.read_time {
184 read_time = Some(rt);
185 }
186
187 if let Some(doc) = res.document {
188 let name = doc.name.clone();
191 let id = name.split('/').last().unwrap_or_default().to_string();
192
193 let doc_ref = DocumentReference {
194 client: self.client,
195 path: name, };
197
198 documents.push(DocumentSnapshot {
199 id,
200 reference: doc_ref,
201 document: Some(doc),
202 read_time: read_time.clone(),
203 });
204 }
205 }
206
207 Ok(QuerySnapshot {
208 documents,
209 read_time,
210 })
211 }
212
213 pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
215 let database = extract_database_path(&self.parent_path);
216
217 let query_target = QueryTarget {
218 parent: self.parent_path.clone(),
219 structured_query: Some(self.query.clone()),
220 };
221
222 let target = Target {
223 target_type: Some(TargetType::Query(query_target)),
224 target_id: Some(1), resume_token: None,
226 read_time: None,
227 once: None,
228 expected_count: None,
229 };
230
231 let request = ListenRequest {
232 database: database.clone(),
233 add_target: Some(target),
234 remove_target: None,
235 labels: None,
236 };
237
238 listen_request(self.client, &database, &request).await
239 }
240}