azure_data_tables/operations/
query_entity.rs

1use crate::prelude::*;
2use azure_core::{
3    error::{Error, ErrorKind},
4    headers::*,
5    prelude::*,
6    CollectedResponse, Method, Pageable,
7};
8use azure_storage::headers::CommonStorageResponseHeaders;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11operation! {
12    #[stream]
13    QueryEntity,
14    client: TableClient,
15    ?filter: Filter,
16    ?select: Select,
17    ?top: Top,
18    ?initial_partition_key: String,
19    ?initial_row_key: String
20}
21
22impl QueryEntityBuilder {
23    pub fn into_stream<E>(self) -> Pageable<QueryEntityResponse<E>, Error>
24    where
25        E: DeserializeOwned + Send + Sync,
26    {
27        let make_request = move |continuation: Option<(String, Option<String>)>| {
28            let this = self.clone();
29            let mut ctx = self.context.clone();
30            async move {
31                let mut url = this.client.url()?;
32                url.path_segments_mut()
33                    .map_err(|()| Error::message(ErrorKind::Other, "invalid table URL"))?
34                    .pop()
35                    .push(&format!("{}()", this.client.table_name()));
36
37                this.filter.append_to_url_query(&mut url);
38                this.select.append_to_url_query(&mut url);
39                this.top.append_to_url_query(&mut url);
40
41                if let Some((partition_key, row_key)) = continuation {
42                    url.query_pairs_mut()
43                        .append_pair("NextPartitionKey", &partition_key);
44
45                    if let Some(row_key) = row_key {
46                        url.query_pairs_mut().append_pair("NextRowKey", &row_key);
47                    }
48                } else if let Some(initial_paritition_key) = this.initial_partition_key {
49                    url.query_pairs_mut()
50                        .append_pair("NextPartitionKey", &initial_paritition_key);
51
52                    if let Some(row_key) = this.initial_row_key {
53                        url.query_pairs_mut().append_pair("NextRowKey", &row_key);
54                    }
55                }
56
57                let mut headers = Headers::new();
58                headers.insert(ACCEPT, "application/json;odata=fullmetadata");
59
60                let mut request = TableClient::finalize_request(url, Method::Get, headers, None)?;
61
62                let response = this.client.send(&mut ctx, &mut request).await?;
63
64                let collected_response = CollectedResponse::from_response(response).await?;
65                collected_response.try_into()
66            }
67        };
68        Pageable::new(make_request)
69    }
70}
71
72#[derive(Debug, Clone)]
73pub struct QueryEntityResponse<E>
74where
75    E: DeserializeOwned + Send + Sync,
76{
77    pub common_storage_response_headers: CommonStorageResponseHeaders,
78    pub metadata: String,
79    pub entities: Vec<E>,
80    next_partition_key: Option<String>,
81    next_row_key: Option<String>,
82}
83
84impl<E> Continuable for QueryEntityResponse<E>
85where
86    E: DeserializeOwned + Send + Sync,
87{
88    type Continuation = (String, Option<String>);
89
90    fn continuation(&self) -> Option<Self::Continuation> {
91        self.next_partition_key
92            .clone()
93            .map(|partition_key| (partition_key, self.next_row_key.clone()))
94    }
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98struct QueryEntityResponseInternal<E> {
99    #[serde(rename = "odata.metadata")]
100    pub metadata: String,
101    #[serde(default = "Vec::new")]
102    pub value: Vec<E>,
103}
104
105impl<E: DeserializeOwned + Send + Sync> TryFrom<CollectedResponse> for QueryEntityResponse<E> {
106    type Error = Error;
107
108    fn try_from(response: CollectedResponse) -> azure_core::Result<Self> {
109        let query_entity_response_internal: QueryEntityResponseInternal<E> = response.json()?;
110
111        let headers = response.headers();
112
113        let next_partition_key = headers.get_optional_string(&HeaderName::from_static(
114            "x-ms-continuation-nextpartitionkey",
115        ));
116
117        let next_row_key =
118            headers.get_optional_string(&HeaderName::from_static("x-ms-continuation-nextrowkey"));
119
120        Ok(QueryEntityResponse {
121            common_storage_response_headers: response.headers().try_into()?,
122            metadata: query_entity_response_internal.metadata,
123            entities: query_entity_response_internal.value,
124            next_partition_key,
125            next_row_key,
126        })
127    }
128}