1use crate::prelude::*;
2use crate::responses::QueryDocumentsResponse;
3use crate::{Query, ResourceType};
4use azure_sdk_core::errors::{check_status_extract_headers_and_body, AzureError};
5use azure_sdk_core::prelude::*;
6use azure_sdk_core::{No, ToAssign, Yes};
7use chrono::{DateTime, Utc};
8use futures::stream::{unfold, Stream};
9use hyper::StatusCode;
10use serde::de::DeserializeOwned;
11use std::convert::TryInto;
12use std::marker::PhantomData;
13
14#[derive(Debug)]
15pub struct QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
16where
17 QuerySet: ToAssign,
18 C: CosmosClient,
19 D: DatabaseClient<C>,
20{
21 collection_client: &'a dyn CollectionClient<C, D>,
22 p_query: PhantomData<QuerySet>,
23 query: Option<&'b Query<'b>>,
24 if_match_condition: Option<IfMatchCondition<'b>>,
25 if_modified_since: Option<&'b DateTime<Utc>>,
26 user_agent: Option<&'b str>,
27 activity_id: Option<&'b str>,
28 consistency_level: Option<ConsistencyLevel<'b>>,
29 continuation: Option<&'b str>,
30 max_item_count: i32,
31 partition_keys: Option<&'b PartitionKeys>,
32 query_cross_partition: bool,
33 parallelize_cross_partition_query: bool,
34}
35
36impl<'a, 'b, C, D, QuerySet> Clone for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
37where
38 QuerySet: ToAssign,
39 C: CosmosClient,
40 D: DatabaseClient<C>,
41{
42 fn clone(&self) -> Self {
43 Self {
44 collection_client: self.collection_client,
45 p_query: PhantomData {},
46 query: self.query,
47 if_match_condition: self.if_match_condition,
48 if_modified_since: self.if_modified_since,
49 user_agent: self.user_agent,
50 activity_id: self.activity_id,
51 consistency_level: self.consistency_level.clone(),
52 continuation: self.continuation,
53 max_item_count: self.max_item_count,
54 partition_keys: self.partition_keys,
55 query_cross_partition: self.query_cross_partition,
56 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
57 }
58 }
59}
60
61impl<'a, 'b, C, D> QueryDocumentsBuilder<'a, 'b, C, D, No>
62where
63 C: CosmosClient,
64 D: DatabaseClient<C>,
65{
66 #[inline]
67 pub(crate) fn new(
68 collection_client: &'a dyn CollectionClient<C, D>,
69 ) -> QueryDocumentsBuilder<'a, 'b, C, D, No> {
70 QueryDocumentsBuilder {
71 collection_client,
72 p_query: PhantomData {},
73 query: None,
74 if_match_condition: None,
75 if_modified_since: None,
76 user_agent: None,
77 activity_id: None,
78 consistency_level: None,
79 continuation: None,
80 max_item_count: -1,
81 partition_keys: None,
82 query_cross_partition: false,
83 parallelize_cross_partition_query: false,
84 }
85 }
86}
87
88impl<'a, 'b, C, D, QuerySet> CollectionClientRequired<'a, C, D>
89 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
90where
91 QuerySet: ToAssign,
92 C: CosmosClient,
93 D: DatabaseClient<C>,
94{
95 #[inline]
96 fn collection_client(&self) -> &'a dyn CollectionClient<C, D> {
97 self.collection_client
98 }
99}
100
101impl<'a, 'b, C, D> QueryRequired<'b> for QueryDocumentsBuilder<'a, 'b, C, D, Yes>
105where
106 C: CosmosClient,
107 D: DatabaseClient<C>,
108{
109 #[inline]
110 fn query(&self) -> &'b Query<'b> {
111 self.query.unwrap()
112 }
113}
114
115impl<'a, 'b, C, D, QuerySet> IfMatchConditionOption<'b>
116 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
117where
118 QuerySet: ToAssign,
119 C: CosmosClient,
120 D: DatabaseClient<C>,
121{
122 #[inline]
123 fn if_match_condition(&self) -> Option<IfMatchCondition<'b>> {
124 self.if_match_condition
125 }
126}
127
128impl<'a, 'b, C, D, QuerySet> IfModifiedSinceOption<'b>
129 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
130where
131 QuerySet: ToAssign,
132 C: CosmosClient,
133 D: DatabaseClient<C>,
134{
135 #[inline]
136 fn if_modified_since(&self) -> Option<&'b DateTime<Utc>> {
137 self.if_modified_since
138 }
139}
140
141impl<'a, 'b, C, D, QuerySet> UserAgentOption<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
142where
143 QuerySet: ToAssign,
144 C: CosmosClient,
145 D: DatabaseClient<C>,
146{
147 #[inline]
148 fn user_agent(&self) -> Option<&'b str> {
149 self.user_agent
150 }
151}
152
153impl<'a, 'b, C, D, QuerySet> ActivityIdOption<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
154where
155 QuerySet: ToAssign,
156 C: CosmosClient,
157 D: DatabaseClient<C>,
158{
159 #[inline]
160 fn activity_id(&self) -> Option<&'b str> {
161 self.activity_id
162 }
163}
164
165impl<'a, 'b, C, D, QuerySet> ConsistencyLevelOption<'b>
166 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
167where
168 QuerySet: ToAssign,
169 C: CosmosClient,
170 D: DatabaseClient<C>,
171{
172 #[inline]
173 fn consistency_level(&self) -> Option<ConsistencyLevel<'b>> {
174 self.consistency_level.clone()
175 }
176}
177
178impl<'a, 'b, C, D, QuerySet> ContinuationOption<'b>
179 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
180where
181 QuerySet: ToAssign,
182 C: CosmosClient,
183 D: DatabaseClient<C>,
184{
185 #[inline]
186 fn continuation(&self) -> Option<&'b str> {
187 self.continuation
188 }
189}
190
191impl<'a, 'b, C, D, QuerySet> MaxItemCountOption for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
192where
193 QuerySet: ToAssign,
194 C: CosmosClient,
195 D: DatabaseClient<C>,
196{
197 #[inline]
198 fn max_item_count(&self) -> i32 {
199 self.max_item_count
200 }
201}
202
203impl<'a, 'b, C, D, QuerySet> PartitionKeysOption<'b>
204 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
205where
206 QuerySet: ToAssign,
207 C: CosmosClient,
208 D: DatabaseClient<C>,
209{
210 #[inline]
211 fn partition_keys(&self) -> Option<&'b PartitionKeys> {
212 self.partition_keys
213 }
214}
215
216impl<'a, 'b, C, D, QuerySet> QueryCrossPartitionOption
217 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
218where
219 QuerySet: ToAssign,
220 C: CosmosClient,
221 D: DatabaseClient<C>,
222{
223 #[inline]
224 fn query_cross_partition(&self) -> bool {
225 self.query_cross_partition
226 }
227}
228
229impl<'a, 'b, C, D, QuerySet> ParallelizeCrossPartitionQueryOption
230 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
231where
232 QuerySet: ToAssign,
233 C: CosmosClient,
234 D: DatabaseClient<C>,
235{
236 #[inline]
237 fn parallelize_cross_partition_query(&self) -> bool {
238 self.parallelize_cross_partition_query
239 }
240}
241
242impl<'a, 'b, C, D> QuerySupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, No>
243where
244 C: CosmosClient,
245 D: DatabaseClient<C>,
246{
247 type O = QueryDocumentsBuilder<'a, 'b, C, D, Yes>;
248
249 #[inline]
250 fn with_query(self, query: &'b Query<'b>) -> Self::O {
251 QueryDocumentsBuilder {
252 collection_client: self.collection_client,
253 p_query: PhantomData {},
254 query: Some(query),
255 if_match_condition: self.if_match_condition,
256 if_modified_since: self.if_modified_since,
257 user_agent: self.user_agent,
258 activity_id: self.activity_id,
259 consistency_level: self.consistency_level,
260 continuation: self.continuation,
261 max_item_count: self.max_item_count,
262 partition_keys: self.partition_keys,
263 query_cross_partition: self.query_cross_partition,
264 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
265 }
266 }
267}
268
269impl<'a, 'b, C, D, QuerySet> IfMatchConditionSupport<'b>
270 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
271where
272 QuerySet: ToAssign,
273 C: CosmosClient,
274 D: DatabaseClient<C>,
275{
276 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
277
278 #[inline]
279 fn with_if_match_condition(self, if_match_condition: IfMatchCondition<'b>) -> Self::O {
280 QueryDocumentsBuilder {
281 collection_client: self.collection_client,
282 p_query: PhantomData {},
283 query: self.query,
284 if_match_condition: Some(if_match_condition),
285 if_modified_since: self.if_modified_since,
286 user_agent: self.user_agent,
287 activity_id: self.activity_id,
288 consistency_level: self.consistency_level,
289 continuation: self.continuation,
290 max_item_count: self.max_item_count,
291 partition_keys: self.partition_keys,
292 query_cross_partition: self.query_cross_partition,
293 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
294 }
295 }
296}
297
298impl<'a, 'b, C, D, QuerySet> IfModifiedSinceSupport<'b>
299 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
300where
301 QuerySet: ToAssign,
302 C: CosmosClient,
303 D: DatabaseClient<C>,
304{
305 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
306
307 #[inline]
308 fn with_if_modified_since(self, if_modified_since: &'b DateTime<Utc>) -> Self::O {
309 QueryDocumentsBuilder {
310 collection_client: self.collection_client,
311 p_query: PhantomData {},
312 query: self.query,
313 if_match_condition: self.if_match_condition,
314 if_modified_since: Some(if_modified_since),
315 user_agent: self.user_agent,
316 activity_id: self.activity_id,
317 consistency_level: self.consistency_level,
318 continuation: self.continuation,
319 max_item_count: self.max_item_count,
320 partition_keys: self.partition_keys,
321 query_cross_partition: self.query_cross_partition,
322 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
323 }
324 }
325}
326
327impl<'a, 'b, C, D, QuerySet> UserAgentSupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
328where
329 QuerySet: ToAssign,
330 C: CosmosClient,
331 D: DatabaseClient<C>,
332{
333 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
334
335 #[inline]
336 fn with_user_agent(self, user_agent: &'b str) -> Self::O {
337 QueryDocumentsBuilder {
338 collection_client: self.collection_client,
339 p_query: PhantomData {},
340 query: self.query,
341 if_match_condition: self.if_match_condition,
342 if_modified_since: self.if_modified_since,
343 user_agent: Some(user_agent),
344 activity_id: self.activity_id,
345 consistency_level: self.consistency_level,
346 continuation: self.continuation,
347 max_item_count: self.max_item_count,
348 partition_keys: self.partition_keys,
349 query_cross_partition: self.query_cross_partition,
350 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
351 }
352 }
353}
354
355impl<'a, 'b, C, D, QuerySet> ActivityIdSupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
356where
357 QuerySet: ToAssign,
358 C: CosmosClient,
359 D: DatabaseClient<C>,
360{
361 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
362
363 #[inline]
364 fn with_activity_id(self, activity_id: &'b str) -> Self::O {
365 QueryDocumentsBuilder {
366 collection_client: self.collection_client,
367 p_query: PhantomData {},
368 query: self.query,
369 if_match_condition: self.if_match_condition,
370 if_modified_since: self.if_modified_since,
371 user_agent: self.user_agent,
372 activity_id: Some(activity_id),
373 consistency_level: self.consistency_level,
374 continuation: self.continuation,
375 max_item_count: self.max_item_count,
376 partition_keys: self.partition_keys,
377 query_cross_partition: self.query_cross_partition,
378 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
379 }
380 }
381}
382
383impl<'a, 'b, C, D, QuerySet> ConsistencyLevelSupport<'b>
384 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
385where
386 QuerySet: ToAssign,
387 C: CosmosClient,
388 D: DatabaseClient<C>,
389{
390 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
391
392 #[inline]
393 fn with_consistency_level(self, consistency_level: ConsistencyLevel<'b>) -> Self::O {
394 QueryDocumentsBuilder {
395 collection_client: self.collection_client,
396 p_query: PhantomData {},
397 query: self.query,
398 if_match_condition: self.if_match_condition,
399 if_modified_since: self.if_modified_since,
400 user_agent: self.user_agent,
401 activity_id: self.activity_id,
402 consistency_level: Some(consistency_level),
403 continuation: self.continuation,
404 max_item_count: self.max_item_count,
405 partition_keys: self.partition_keys,
406 query_cross_partition: self.query_cross_partition,
407 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
408 }
409 }
410}
411
412impl<'a, 'b, C, D, QuerySet> ContinuationSupport<'b>
413 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
414where
415 QuerySet: ToAssign,
416 C: CosmosClient,
417 D: DatabaseClient<C>,
418{
419 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
420
421 #[inline]
422 fn with_continuation(self, continuation: &'b str) -> Self::O {
423 QueryDocumentsBuilder {
424 collection_client: self.collection_client,
425 p_query: PhantomData {},
426 query: self.query,
427 if_match_condition: self.if_match_condition,
428 if_modified_since: self.if_modified_since,
429 user_agent: self.user_agent,
430 activity_id: self.activity_id,
431 consistency_level: self.consistency_level,
432 continuation: Some(continuation),
433 max_item_count: self.max_item_count,
434 partition_keys: self.partition_keys,
435 query_cross_partition: self.query_cross_partition,
436 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
437 }
438 }
439}
440
441impl<'a, 'b, C, D, QuerySet> MaxItemCountSupport for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
442where
443 QuerySet: ToAssign,
444 C: CosmosClient,
445 D: DatabaseClient<C>,
446{
447 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
448
449 #[inline]
450 fn with_max_item_count(self, max_item_count: i32) -> Self::O {
451 QueryDocumentsBuilder {
452 collection_client: self.collection_client,
453 p_query: PhantomData {},
454 query: self.query,
455 if_match_condition: self.if_match_condition,
456 if_modified_since: self.if_modified_since,
457 user_agent: self.user_agent,
458 activity_id: self.activity_id,
459 consistency_level: self.consistency_level,
460 continuation: self.continuation,
461 max_item_count,
462 partition_keys: self.partition_keys,
463 query_cross_partition: self.query_cross_partition,
464 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
465 }
466 }
467}
468
469impl<'a, 'b, C, D, QuerySet> PartitionKeysSupport<'b>
470 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
471where
472 QuerySet: ToAssign,
473 C: CosmosClient,
474 D: DatabaseClient<C>,
475{
476 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
477
478 #[inline]
479 fn with_partition_keys(self, partition_keys: &'b PartitionKeys) -> Self::O {
480 QueryDocumentsBuilder {
481 collection_client: self.collection_client,
482 p_query: PhantomData {},
483 query: self.query,
484 if_match_condition: self.if_match_condition,
485 if_modified_since: self.if_modified_since,
486 user_agent: self.user_agent,
487 activity_id: self.activity_id,
488 consistency_level: self.consistency_level,
489 continuation: self.continuation,
490 max_item_count: self.max_item_count,
491 partition_keys: Some(partition_keys),
492 query_cross_partition: self.query_cross_partition,
493 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
494 }
495 }
496}
497
498impl<'a, 'b, C, D, QuerySet> QueryCrossPartitionSupport
499 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
500where
501 QuerySet: ToAssign,
502 C: CosmosClient,
503 D: DatabaseClient<C>,
504{
505 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
506
507 #[inline]
508 fn with_query_cross_partition(self, query_cross_partition: bool) -> Self::O {
509 QueryDocumentsBuilder {
510 collection_client: self.collection_client,
511 p_query: PhantomData {},
512 query: self.query,
513 if_match_condition: self.if_match_condition,
514 if_modified_since: self.if_modified_since,
515 user_agent: self.user_agent,
516 activity_id: self.activity_id,
517 consistency_level: self.consistency_level,
518 continuation: self.continuation,
519 max_item_count: self.max_item_count,
520 partition_keys: self.partition_keys,
521 query_cross_partition,
522 parallelize_cross_partition_query: self.parallelize_cross_partition_query,
523 }
524 }
525}
526
527impl<'a, 'b, C, D, QuerySet> ParallelizeCrossPartitionQuerySupport
528 for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
529where
530 QuerySet: ToAssign,
531 C: CosmosClient,
532 D: DatabaseClient<C>,
533{
534 type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
535
536 #[inline]
537 fn with_parallelize_cross_partition_query(
538 self,
539 parallelize_cross_partition_query: bool,
540 ) -> Self::O {
541 QueryDocumentsBuilder {
542 collection_client: self.collection_client,
543 p_query: PhantomData {},
544 query: self.query,
545 if_match_condition: self.if_match_condition,
546 if_modified_since: self.if_modified_since,
547 user_agent: self.user_agent,
548 activity_id: self.activity_id,
549 consistency_level: self.consistency_level,
550 continuation: self.continuation,
551 max_item_count: self.max_item_count,
552 partition_keys: self.partition_keys,
553 query_cross_partition: self.query_cross_partition,
554 parallelize_cross_partition_query,
555 }
556 }
557}
558
559impl<'a, 'b, C, D> QueryDocumentsBuilder<'a, 'b, C, D, Yes>
561where
562 C: CosmosClient,
563 D: DatabaseClient<C>,
564{
565 pub async fn execute<T>(&self) -> Result<QueryDocumentsResponse<T>, AzureError>
566 where
567 T: DeserializeOwned,
568 {
569 trace!("QueryDocumentBuilder::execute called");
570
571 let req = self.collection_client.cosmos_client().prepare_request(
572 &format!(
573 "dbs/{}/colls/{}/docs",
574 self.collection_client.database_client().database_name(),
575 self.collection_client.collection_name()
576 ),
577 hyper::Method::POST,
578 ResourceType::Documents,
579 );
580
581 let req = req.header(crate::headers::HEADER_DOCUMENTDB_ISQUERY, true.to_string());
583 let req = req.header(http::header::CONTENT_TYPE, "application/query+json");
584
585 let req = IfMatchConditionOption::add_header(self, req);
587 let req = IfModifiedSinceOption::add_header(self, req);
588 let req = UserAgentOption::add_header(self, req);
589 let req = ActivityIdOption::add_header(self, req);
590 let req = ConsistencyLevelOption::add_header(self, req);
591 let req = ContinuationOption::add_header(self, req);
592 let req = MaxItemCountOption::add_header(self, req);
593 let req = PartitionKeysOption::add_header(self, req);
594 let req = QueryCrossPartitionOption::add_header(self, req);
595
596 let body = serde_json::to_string(self.query())?;
597 debug!("body == {}", body);
598
599 let req = req.body(hyper::Body::from(body))?;
600 debug!("{:?}", req);
601
602 let (headers, body) = check_status_extract_headers_and_body(
603 self.collection_client.hyper_client().request(req),
604 StatusCode::OK,
605 )
606 .await?;
607
608 debug!("\nheaders == {:?}", headers);
609 debug!("\nbody == {:#?}", body);
610
611 Ok((&headers, &body as &[u8]).try_into()?)
612 }
613
614 pub fn stream<T>(
615 &self,
616 ) -> impl Stream<Item = Result<QueryDocumentsResponse<T>, AzureError>> + '_
617 where
618 T: DeserializeOwned,
619 {
620 #[derive(Debug, Clone, PartialEq)]
621 enum States {
622 Init,
623 Continuation(String),
624 };
625
626 unfold(
627 Some(States::Init),
628 move |continuation_token: Option<States>| async move {
629 debug!("continuation_token == {:?}", &continuation_token);
630 let response = match continuation_token {
631 Some(States::Init) => self.execute().await,
632 Some(States::Continuation(continuation_token)) => {
633 self.clone()
634 .with_continuation(&continuation_token)
635 .execute()
636 .await
637 }
638 None => return None,
639 };
640
641 let response = match response {
642 Ok(response) => response,
643 Err(err) => return Some((Err(err), None)),
644 };
645
646 let continuation_token = match &response.continuation_token {
647 Some(ct) => Some(States::Continuation(ct.to_owned())),
648 None => None,
649 };
650
651 Some((Ok(response), continuation_token))
652 },
653 )
654 }
655}