firebase_admin_sdk/firestore/
query.rs1use super::listen::{listen_request, ListenStream};
2use super::models::{
3 CollectionSelector, CompositeFilter, CompositeOperator, Direction, FieldFilter, FieldOperator,
4 FieldReference, FilterType, ListenRequest, Order, QueryFilter, QueryTarget, RunQueryRequest,
5 RunQueryResponse, StructuredQuery, Target, TargetType,
6};
7use super::reference::{
8 convert_serde_value_to_firestore_value, extract_database_path, DocumentReference,
9};
10use super::snapshot::{DocumentSnapshot, QuerySnapshot};
11use super::FirestoreError;
12use reqwest::header;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15
16#[derive(Clone)]
20pub struct Query<'a> {
21 pub(crate) client: &'a ClientWithMiddleware,
22 pub(crate) parent_path: String, pub(crate) collection_id: String,
24 pub(crate) query: StructuredQuery,
25}
26
27impl<'a> Query<'a> {
28 pub(crate) fn new(
29 client: &'a ClientWithMiddleware,
30 parent_path: String,
31 collection_id: String,
32 ) -> Self {
33 Self {
34 client,
35 parent_path,
36 collection_id: collection_id.clone(),
37 query: StructuredQuery {
38 select: None,
39 from: Some(vec![CollectionSelector {
40 collection_id,
41 all_descendants: None,
42 }]),
43 where_clause: None,
44 order_by: None,
45 start_at: None,
46 end_at: None,
47 offset: None,
48 limit: None,
49 },
50 }
51 }
52
53 pub fn where_filter<T: Serialize>(
61 &self,
62 field: &str,
63 op: FieldOperator,
64 value: T,
65 ) -> Result<Query<'a>, FirestoreError> {
66 let mut new_query = self.clone();
67
68 let serde_value = serde_json::to_value(value)?;
69 let firestore_value = convert_serde_value_to_firestore_value(serde_value)?;
70
71 let filter = QueryFilter {
72 filter_type: Some(FilterType::FieldFilter(FieldFilter {
73 field: FieldReference {
74 field_path: field.to_string(),
75 },
76 op,
77 value: firestore_value,
78 })),
79 };
80
81 if let Some(existing_where) = &new_query.query.where_clause {
82 let new_composite = match &existing_where.filter_type {
87 Some(FilterType::CompositeFilter(cf)) if cf.op == CompositeOperator::And => {
88 let mut filters = cf.filters.clone();
89 filters.push(filter);
90 CompositeFilter {
91 op: CompositeOperator::And,
92 filters,
93 }
94 }
95 _ => CompositeFilter {
96 op: CompositeOperator::And,
97 filters: vec![existing_where.clone(), filter],
98 },
99 };
100
101 new_query.query.where_clause = Some(QueryFilter {
102 filter_type: Some(FilterType::CompositeFilter(new_composite)),
103 });
104 } else {
105 new_query.query.where_clause = Some(filter);
106 }
107
108 Ok(new_query)
109 }
110
111 pub fn order_by(&self, field: &str, direction: Direction) -> Query<'a> {
113 let mut new_query = self.clone();
114
115 let order = Order {
116 field: FieldReference {
117 field_path: field.to_string(),
118 },
119 direction,
120 };
121
122 if let Some(order_by) = &mut new_query.query.order_by {
123 order_by.push(order);
124 } else {
125 new_query.query.order_by = Some(vec![order]);
126 }
127
128 new_query
129 }
130
131 pub fn limit(&self, limit: i32) -> Query<'a> {
133 let mut new_query = self.clone();
134 new_query.query.limit = Some(limit);
135 new_query
136 }
137
138 pub fn offset(&self, offset: i32) -> Query<'a> {
140 let mut new_query = self.clone();
141 new_query.query.offset = Some(offset);
142 new_query
143 }
144
145 pub async fn get(&self) -> Result<QuerySnapshot<'a>, FirestoreError> {
147 let url = format!("{}:runQuery", self.parent_path);
148
149 let request = RunQueryRequest {
150 parent: self.parent_path.clone(),
151 structured_query: Some(self.query.clone()),
152 };
153
154 let response = self
155 .client
156 .post(&url)
157 .header(header::CONTENT_TYPE, "application/json")
158 .body(serde_json::to_vec(&request)?)
159 .send()
160 .await?;
161
162 if !response.status().is_success() {
163 let status = response.status();
164 let text = response.text().await.unwrap_or_default();
165 return Err(FirestoreError::ApiError(format!(
166 "Run query failed {}: {}",
167 status, text
168 )));
169 }
170
171 let responses: Vec<RunQueryResponse> = response.json().await?;
180
181 let mut documents = Vec::new();
182 let mut read_time = None;
183
184 for res in responses {
185 if let Some(rt) = res.read_time {
186 read_time = Some(rt);
187 }
188
189 if let Some(doc) = res.document {
190 let name = doc.name.clone();
193 let id = name.split('/').last().unwrap_or_default().to_string();
194
195 let doc_ref = DocumentReference {
196 client: self.client,
197 path: name, };
199
200 documents.push(DocumentSnapshot {
201 id,
202 reference: doc_ref,
203 document: Some(doc),
204 read_time: read_time.clone(),
205 });
206 }
207 }
208
209 Ok(QuerySnapshot {
210 documents,
211 read_time,
212 })
213 }
214
215 pub async fn listen(&self) -> Result<ListenStream, FirestoreError> {
217 let database = extract_database_path(&self.parent_path);
218
219 let query_target = QueryTarget {
220 parent: self.parent_path.clone(),
221 structured_query: Some(self.query.clone()),
222 };
223
224 let target = Target {
225 target_type: Some(TargetType::Query(query_target)),
226 target_id: Some(1), resume_token: None,
228 read_time: None,
229 once: None,
230 expected_count: None,
231 };
232
233 let request = ListenRequest {
234 database: database.clone(),
235 add_target: Some(target),
236 remove_target: None,
237 labels: None,
238 };
239
240 listen_request(self.client, &database, &request).await
241 }
242}