Skip to main content

azure_sdk_cosmos/requests/
query_documents_builder.rs

1use crate::prelude::*;
2use crate::responses::QueryDocumentsResponse;
3use crate::{Query, ResourceType};
4use azure_sdk_core::errors::{check_status_extract_headers_and_body, AzureError};
5use azure_sdk_core::prelude::*;
6use azure_sdk_core::{No, ToAssign, Yes};
7use chrono::{DateTime, Utc};
8use futures::stream::{unfold, Stream};
9use hyper::StatusCode;
10use serde::de::DeserializeOwned;
11use std::convert::TryInto;
12use std::marker::PhantomData;
13
14#[derive(Debug)]
15pub struct QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
16where
17    QuerySet: ToAssign,
18    C: CosmosClient,
19    D: DatabaseClient<C>,
20{
21    collection_client: &'a dyn CollectionClient<C, D>,
22    p_query: PhantomData<QuerySet>,
23    query: Option<&'b Query<'b>>,
24    if_match_condition: Option<IfMatchCondition<'b>>,
25    if_modified_since: Option<&'b DateTime<Utc>>,
26    user_agent: Option<&'b str>,
27    activity_id: Option<&'b str>,
28    consistency_level: Option<ConsistencyLevel<'b>>,
29    continuation: Option<&'b str>,
30    max_item_count: i32,
31    partition_keys: Option<&'b PartitionKeys>,
32    query_cross_partition: bool,
33    parallelize_cross_partition_query: bool,
34}
35
36impl<'a, 'b, C, D, QuerySet> Clone for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
37where
38    QuerySet: ToAssign,
39    C: CosmosClient,
40    D: DatabaseClient<C>,
41{
42    fn clone(&self) -> Self {
43        Self {
44            collection_client: self.collection_client,
45            p_query: PhantomData {},
46            query: self.query,
47            if_match_condition: self.if_match_condition,
48            if_modified_since: self.if_modified_since,
49            user_agent: self.user_agent,
50            activity_id: self.activity_id,
51            consistency_level: self.consistency_level.clone(),
52            continuation: self.continuation,
53            max_item_count: self.max_item_count,
54            partition_keys: self.partition_keys,
55            query_cross_partition: self.query_cross_partition,
56            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
57        }
58    }
59}
60
61impl<'a, 'b, C, D> QueryDocumentsBuilder<'a, 'b, C, D, No>
62where
63    C: CosmosClient,
64    D: DatabaseClient<C>,
65{
66    #[inline]
67    pub(crate) fn new(
68        collection_client: &'a dyn CollectionClient<C, D>,
69    ) -> QueryDocumentsBuilder<'a, 'b, C, D, No> {
70        QueryDocumentsBuilder {
71            collection_client,
72            p_query: PhantomData {},
73            query: None,
74            if_match_condition: None,
75            if_modified_since: None,
76            user_agent: None,
77            activity_id: None,
78            consistency_level: None,
79            continuation: None,
80            max_item_count: -1,
81            partition_keys: None,
82            query_cross_partition: false,
83            parallelize_cross_partition_query: false,
84        }
85    }
86}
87
88impl<'a, 'b, C, D, QuerySet> CollectionClientRequired<'a, C, D>
89    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
90where
91    QuerySet: ToAssign,
92    C: CosmosClient,
93    D: DatabaseClient<C>,
94{
95    #[inline]
96    fn collection_client(&self) -> &'a dyn CollectionClient<C, D> {
97        self.collection_client
98    }
99}
100
101//get mandatory no traits methods
102
103//set mandatory no traits methods
104impl<'a, 'b, C, D> QueryRequired<'b> for QueryDocumentsBuilder<'a, 'b, C, D, Yes>
105where
106    C: CosmosClient,
107    D: DatabaseClient<C>,
108{
109    #[inline]
110    fn query(&self) -> &'b Query<'b> {
111        self.query.unwrap()
112    }
113}
114
115impl<'a, 'b, C, D, QuerySet> IfMatchConditionOption<'b>
116    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
117where
118    QuerySet: ToAssign,
119    C: CosmosClient,
120    D: DatabaseClient<C>,
121{
122    #[inline]
123    fn if_match_condition(&self) -> Option<IfMatchCondition<'b>> {
124        self.if_match_condition
125    }
126}
127
128impl<'a, 'b, C, D, QuerySet> IfModifiedSinceOption<'b>
129    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
130where
131    QuerySet: ToAssign,
132    C: CosmosClient,
133    D: DatabaseClient<C>,
134{
135    #[inline]
136    fn if_modified_since(&self) -> Option<&'b DateTime<Utc>> {
137        self.if_modified_since
138    }
139}
140
141impl<'a, 'b, C, D, QuerySet> UserAgentOption<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
142where
143    QuerySet: ToAssign,
144    C: CosmosClient,
145    D: DatabaseClient<C>,
146{
147    #[inline]
148    fn user_agent(&self) -> Option<&'b str> {
149        self.user_agent
150    }
151}
152
153impl<'a, 'b, C, D, QuerySet> ActivityIdOption<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
154where
155    QuerySet: ToAssign,
156    C: CosmosClient,
157    D: DatabaseClient<C>,
158{
159    #[inline]
160    fn activity_id(&self) -> Option<&'b str> {
161        self.activity_id
162    }
163}
164
165impl<'a, 'b, C, D, QuerySet> ConsistencyLevelOption<'b>
166    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
167where
168    QuerySet: ToAssign,
169    C: CosmosClient,
170    D: DatabaseClient<C>,
171{
172    #[inline]
173    fn consistency_level(&self) -> Option<ConsistencyLevel<'b>> {
174        self.consistency_level.clone()
175    }
176}
177
178impl<'a, 'b, C, D, QuerySet> ContinuationOption<'b>
179    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
180where
181    QuerySet: ToAssign,
182    C: CosmosClient,
183    D: DatabaseClient<C>,
184{
185    #[inline]
186    fn continuation(&self) -> Option<&'b str> {
187        self.continuation
188    }
189}
190
191impl<'a, 'b, C, D, QuerySet> MaxItemCountOption for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
192where
193    QuerySet: ToAssign,
194    C: CosmosClient,
195    D: DatabaseClient<C>,
196{
197    #[inline]
198    fn max_item_count(&self) -> i32 {
199        self.max_item_count
200    }
201}
202
203impl<'a, 'b, C, D, QuerySet> PartitionKeysOption<'b>
204    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
205where
206    QuerySet: ToAssign,
207    C: CosmosClient,
208    D: DatabaseClient<C>,
209{
210    #[inline]
211    fn partition_keys(&self) -> Option<&'b PartitionKeys> {
212        self.partition_keys
213    }
214}
215
216impl<'a, 'b, C, D, QuerySet> QueryCrossPartitionOption
217    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
218where
219    QuerySet: ToAssign,
220    C: CosmosClient,
221    D: DatabaseClient<C>,
222{
223    #[inline]
224    fn query_cross_partition(&self) -> bool {
225        self.query_cross_partition
226    }
227}
228
229impl<'a, 'b, C, D, QuerySet> ParallelizeCrossPartitionQueryOption
230    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
231where
232    QuerySet: ToAssign,
233    C: CosmosClient,
234    D: DatabaseClient<C>,
235{
236    #[inline]
237    fn parallelize_cross_partition_query(&self) -> bool {
238        self.parallelize_cross_partition_query
239    }
240}
241
242impl<'a, 'b, C, D> QuerySupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, No>
243where
244    C: CosmosClient,
245    D: DatabaseClient<C>,
246{
247    type O = QueryDocumentsBuilder<'a, 'b, C, D, Yes>;
248
249    #[inline]
250    fn with_query(self, query: &'b Query<'b>) -> Self::O {
251        QueryDocumentsBuilder {
252            collection_client: self.collection_client,
253            p_query: PhantomData {},
254            query: Some(query),
255            if_match_condition: self.if_match_condition,
256            if_modified_since: self.if_modified_since,
257            user_agent: self.user_agent,
258            activity_id: self.activity_id,
259            consistency_level: self.consistency_level,
260            continuation: self.continuation,
261            max_item_count: self.max_item_count,
262            partition_keys: self.partition_keys,
263            query_cross_partition: self.query_cross_partition,
264            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
265        }
266    }
267}
268
269impl<'a, 'b, C, D, QuerySet> IfMatchConditionSupport<'b>
270    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
271where
272    QuerySet: ToAssign,
273    C: CosmosClient,
274    D: DatabaseClient<C>,
275{
276    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
277
278    #[inline]
279    fn with_if_match_condition(self, if_match_condition: IfMatchCondition<'b>) -> Self::O {
280        QueryDocumentsBuilder {
281            collection_client: self.collection_client,
282            p_query: PhantomData {},
283            query: self.query,
284            if_match_condition: Some(if_match_condition),
285            if_modified_since: self.if_modified_since,
286            user_agent: self.user_agent,
287            activity_id: self.activity_id,
288            consistency_level: self.consistency_level,
289            continuation: self.continuation,
290            max_item_count: self.max_item_count,
291            partition_keys: self.partition_keys,
292            query_cross_partition: self.query_cross_partition,
293            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
294        }
295    }
296}
297
298impl<'a, 'b, C, D, QuerySet> IfModifiedSinceSupport<'b>
299    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
300where
301    QuerySet: ToAssign,
302    C: CosmosClient,
303    D: DatabaseClient<C>,
304{
305    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
306
307    #[inline]
308    fn with_if_modified_since(self, if_modified_since: &'b DateTime<Utc>) -> Self::O {
309        QueryDocumentsBuilder {
310            collection_client: self.collection_client,
311            p_query: PhantomData {},
312            query: self.query,
313            if_match_condition: self.if_match_condition,
314            if_modified_since: Some(if_modified_since),
315            user_agent: self.user_agent,
316            activity_id: self.activity_id,
317            consistency_level: self.consistency_level,
318            continuation: self.continuation,
319            max_item_count: self.max_item_count,
320            partition_keys: self.partition_keys,
321            query_cross_partition: self.query_cross_partition,
322            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
323        }
324    }
325}
326
327impl<'a, 'b, C, D, QuerySet> UserAgentSupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
328where
329    QuerySet: ToAssign,
330    C: CosmosClient,
331    D: DatabaseClient<C>,
332{
333    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
334
335    #[inline]
336    fn with_user_agent(self, user_agent: &'b str) -> Self::O {
337        QueryDocumentsBuilder {
338            collection_client: self.collection_client,
339            p_query: PhantomData {},
340            query: self.query,
341            if_match_condition: self.if_match_condition,
342            if_modified_since: self.if_modified_since,
343            user_agent: Some(user_agent),
344            activity_id: self.activity_id,
345            consistency_level: self.consistency_level,
346            continuation: self.continuation,
347            max_item_count: self.max_item_count,
348            partition_keys: self.partition_keys,
349            query_cross_partition: self.query_cross_partition,
350            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
351        }
352    }
353}
354
355impl<'a, 'b, C, D, QuerySet> ActivityIdSupport<'b> for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
356where
357    QuerySet: ToAssign,
358    C: CosmosClient,
359    D: DatabaseClient<C>,
360{
361    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
362
363    #[inline]
364    fn with_activity_id(self, activity_id: &'b str) -> Self::O {
365        QueryDocumentsBuilder {
366            collection_client: self.collection_client,
367            p_query: PhantomData {},
368            query: self.query,
369            if_match_condition: self.if_match_condition,
370            if_modified_since: self.if_modified_since,
371            user_agent: self.user_agent,
372            activity_id: Some(activity_id),
373            consistency_level: self.consistency_level,
374            continuation: self.continuation,
375            max_item_count: self.max_item_count,
376            partition_keys: self.partition_keys,
377            query_cross_partition: self.query_cross_partition,
378            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
379        }
380    }
381}
382
383impl<'a, 'b, C, D, QuerySet> ConsistencyLevelSupport<'b>
384    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
385where
386    QuerySet: ToAssign,
387    C: CosmosClient,
388    D: DatabaseClient<C>,
389{
390    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
391
392    #[inline]
393    fn with_consistency_level(self, consistency_level: ConsistencyLevel<'b>) -> Self::O {
394        QueryDocumentsBuilder {
395            collection_client: self.collection_client,
396            p_query: PhantomData {},
397            query: self.query,
398            if_match_condition: self.if_match_condition,
399            if_modified_since: self.if_modified_since,
400            user_agent: self.user_agent,
401            activity_id: self.activity_id,
402            consistency_level: Some(consistency_level),
403            continuation: self.continuation,
404            max_item_count: self.max_item_count,
405            partition_keys: self.partition_keys,
406            query_cross_partition: self.query_cross_partition,
407            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
408        }
409    }
410}
411
412impl<'a, 'b, C, D, QuerySet> ContinuationSupport<'b>
413    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
414where
415    QuerySet: ToAssign,
416    C: CosmosClient,
417    D: DatabaseClient<C>,
418{
419    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
420
421    #[inline]
422    fn with_continuation(self, continuation: &'b str) -> Self::O {
423        QueryDocumentsBuilder {
424            collection_client: self.collection_client,
425            p_query: PhantomData {},
426            query: self.query,
427            if_match_condition: self.if_match_condition,
428            if_modified_since: self.if_modified_since,
429            user_agent: self.user_agent,
430            activity_id: self.activity_id,
431            consistency_level: self.consistency_level,
432            continuation: Some(continuation),
433            max_item_count: self.max_item_count,
434            partition_keys: self.partition_keys,
435            query_cross_partition: self.query_cross_partition,
436            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
437        }
438    }
439}
440
441impl<'a, 'b, C, D, QuerySet> MaxItemCountSupport for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
442where
443    QuerySet: ToAssign,
444    C: CosmosClient,
445    D: DatabaseClient<C>,
446{
447    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
448
449    #[inline]
450    fn with_max_item_count(self, max_item_count: i32) -> Self::O {
451        QueryDocumentsBuilder {
452            collection_client: self.collection_client,
453            p_query: PhantomData {},
454            query: self.query,
455            if_match_condition: self.if_match_condition,
456            if_modified_since: self.if_modified_since,
457            user_agent: self.user_agent,
458            activity_id: self.activity_id,
459            consistency_level: self.consistency_level,
460            continuation: self.continuation,
461            max_item_count,
462            partition_keys: self.partition_keys,
463            query_cross_partition: self.query_cross_partition,
464            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
465        }
466    }
467}
468
469impl<'a, 'b, C, D, QuerySet> PartitionKeysSupport<'b>
470    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
471where
472    QuerySet: ToAssign,
473    C: CosmosClient,
474    D: DatabaseClient<C>,
475{
476    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
477
478    #[inline]
479    fn with_partition_keys(self, partition_keys: &'b PartitionKeys) -> Self::O {
480        QueryDocumentsBuilder {
481            collection_client: self.collection_client,
482            p_query: PhantomData {},
483            query: self.query,
484            if_match_condition: self.if_match_condition,
485            if_modified_since: self.if_modified_since,
486            user_agent: self.user_agent,
487            activity_id: self.activity_id,
488            consistency_level: self.consistency_level,
489            continuation: self.continuation,
490            max_item_count: self.max_item_count,
491            partition_keys: Some(partition_keys),
492            query_cross_partition: self.query_cross_partition,
493            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
494        }
495    }
496}
497
498impl<'a, 'b, C, D, QuerySet> QueryCrossPartitionSupport
499    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
500where
501    QuerySet: ToAssign,
502    C: CosmosClient,
503    D: DatabaseClient<C>,
504{
505    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
506
507    #[inline]
508    fn with_query_cross_partition(self, query_cross_partition: bool) -> Self::O {
509        QueryDocumentsBuilder {
510            collection_client: self.collection_client,
511            p_query: PhantomData {},
512            query: self.query,
513            if_match_condition: self.if_match_condition,
514            if_modified_since: self.if_modified_since,
515            user_agent: self.user_agent,
516            activity_id: self.activity_id,
517            consistency_level: self.consistency_level,
518            continuation: self.continuation,
519            max_item_count: self.max_item_count,
520            partition_keys: self.partition_keys,
521            query_cross_partition,
522            parallelize_cross_partition_query: self.parallelize_cross_partition_query,
523        }
524    }
525}
526
527impl<'a, 'b, C, D, QuerySet> ParallelizeCrossPartitionQuerySupport
528    for QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>
529where
530    QuerySet: ToAssign,
531    C: CosmosClient,
532    D: DatabaseClient<C>,
533{
534    type O = QueryDocumentsBuilder<'a, 'b, C, D, QuerySet>;
535
536    #[inline]
537    fn with_parallelize_cross_partition_query(
538        self,
539        parallelize_cross_partition_query: bool,
540    ) -> Self::O {
541        QueryDocumentsBuilder {
542            collection_client: self.collection_client,
543            p_query: PhantomData {},
544            query: self.query,
545            if_match_condition: self.if_match_condition,
546            if_modified_since: self.if_modified_since,
547            user_agent: self.user_agent,
548            activity_id: self.activity_id,
549            consistency_level: self.consistency_level,
550            continuation: self.continuation,
551            max_item_count: self.max_item_count,
552            partition_keys: self.partition_keys,
553            query_cross_partition: self.query_cross_partition,
554            parallelize_cross_partition_query,
555        }
556    }
557}
558
559// methods callable only when every mandatory field has been filled
560impl<'a, 'b, C, D> QueryDocumentsBuilder<'a, 'b, C, D, Yes>
561where
562    C: CosmosClient,
563    D: DatabaseClient<C>,
564{
565    pub async fn execute<T>(&self) -> Result<QueryDocumentsResponse<T>, AzureError>
566    where
567        T: DeserializeOwned,
568    {
569        trace!("QueryDocumentBuilder::execute called");
570
571        let req = self.collection_client.cosmos_client().prepare_request(
572            &format!(
573                "dbs/{}/colls/{}/docs",
574                self.collection_client.database_client().database_name(),
575                self.collection_client.collection_name()
576            ),
577            hyper::Method::POST,
578            ResourceType::Documents,
579        );
580
581        // signal that this is a query
582        let req = req.header(crate::headers::HEADER_DOCUMENTDB_ISQUERY, true.to_string());
583        let req = req.header(http::header::CONTENT_TYPE, "application/query+json");
584
585        // add trait headers
586        let req = IfMatchConditionOption::add_header(self, req);
587        let req = IfModifiedSinceOption::add_header(self, req);
588        let req = UserAgentOption::add_header(self, req);
589        let req = ActivityIdOption::add_header(self, req);
590        let req = ConsistencyLevelOption::add_header(self, req);
591        let req = ContinuationOption::add_header(self, req);
592        let req = MaxItemCountOption::add_header(self, req);
593        let req = PartitionKeysOption::add_header(self, req);
594        let req = QueryCrossPartitionOption::add_header(self, req);
595
596        let body = serde_json::to_string(self.query())?;
597        debug!("body == {}", body);
598
599        let req = req.body(hyper::Body::from(body))?;
600        debug!("{:?}", req);
601
602        let (headers, body) = check_status_extract_headers_and_body(
603            self.collection_client.hyper_client().request(req),
604            StatusCode::OK,
605        )
606        .await?;
607
608        debug!("\nheaders == {:?}", headers);
609        debug!("\nbody == {:#?}", body);
610
611        Ok((&headers, &body as &[u8]).try_into()?)
612    }
613
614    pub fn stream<T>(
615        &self,
616    ) -> impl Stream<Item = Result<QueryDocumentsResponse<T>, AzureError>> + '_
617    where
618        T: DeserializeOwned,
619    {
620        #[derive(Debug, Clone, PartialEq)]
621        enum States {
622            Init,
623            Continuation(String),
624        };
625
626        unfold(
627            Some(States::Init),
628            move |continuation_token: Option<States>| async move {
629                debug!("continuation_token == {:?}", &continuation_token);
630                let response = match continuation_token {
631                    Some(States::Init) => self.execute().await,
632                    Some(States::Continuation(continuation_token)) => {
633                        self.clone()
634                            .with_continuation(&continuation_token)
635                            .execute()
636                            .await
637                    }
638                    None => return None,
639                };
640
641                let response = match response {
642                    Ok(response) => response,
643                    Err(err) => return Some((Err(err), None)),
644                };
645
646                let continuation_token = match &response.continuation_token {
647                    Some(ct) => Some(States::Continuation(ct.to_owned())),
648                    None => None,
649                };
650
651                Some((Ok(response), continuation_token))
652            },
653        )
654    }
655}