azure_data_tables/operations/
query_entity.rs1use crate::prelude::*;
2use azure_core::{
3 error::{Error, ErrorKind},
4 headers::*,
5 prelude::*,
6 CollectedResponse, Method, Pageable,
7};
8use azure_storage::headers::CommonStorageResponseHeaders;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11operation! {
12 #[stream]
13 QueryEntity,
14 client: TableClient,
15 ?filter: Filter,
16 ?select: Select,
17 ?top: Top,
18 ?initial_partition_key: String,
19 ?initial_row_key: String
20}
21
22impl QueryEntityBuilder {
23 pub fn into_stream<E>(self) -> Pageable<QueryEntityResponse<E>, Error>
24 where
25 E: DeserializeOwned + Send + Sync,
26 {
27 let make_request = move |continuation: Option<(String, Option<String>)>| {
28 let this = self.clone();
29 let mut ctx = self.context.clone();
30 async move {
31 let mut url = this.client.url()?;
32 url.path_segments_mut()
33 .map_err(|()| Error::message(ErrorKind::Other, "invalid table URL"))?
34 .pop()
35 .push(&format!("{}()", this.client.table_name()));
36
37 this.filter.append_to_url_query(&mut url);
38 this.select.append_to_url_query(&mut url);
39 this.top.append_to_url_query(&mut url);
40
41 if let Some((partition_key, row_key)) = continuation {
42 url.query_pairs_mut()
43 .append_pair("NextPartitionKey", &partition_key);
44
45 if let Some(row_key) = row_key {
46 url.query_pairs_mut().append_pair("NextRowKey", &row_key);
47 }
48 } else if let Some(initial_paritition_key) = this.initial_partition_key {
49 url.query_pairs_mut()
50 .append_pair("NextPartitionKey", &initial_paritition_key);
51
52 if let Some(row_key) = this.initial_row_key {
53 url.query_pairs_mut().append_pair("NextRowKey", &row_key);
54 }
55 }
56
57 let mut headers = Headers::new();
58 headers.insert(ACCEPT, "application/json;odata=fullmetadata");
59
60 let mut request = TableClient::finalize_request(url, Method::Get, headers, None)?;
61
62 let response = this.client.send(&mut ctx, &mut request).await?;
63
64 let collected_response = CollectedResponse::from_response(response).await?;
65 collected_response.try_into()
66 }
67 };
68 Pageable::new(make_request)
69 }
70}
71
72#[derive(Debug, Clone)]
73pub struct QueryEntityResponse<E>
74where
75 E: DeserializeOwned + Send + Sync,
76{
77 pub common_storage_response_headers: CommonStorageResponseHeaders,
78 pub metadata: String,
79 pub entities: Vec<E>,
80 next_partition_key: Option<String>,
81 next_row_key: Option<String>,
82}
83
84impl<E> Continuable for QueryEntityResponse<E>
85where
86 E: DeserializeOwned + Send + Sync,
87{
88 type Continuation = (String, Option<String>);
89
90 fn continuation(&self) -> Option<Self::Continuation> {
91 self.next_partition_key
92 .clone()
93 .map(|partition_key| (partition_key, self.next_row_key.clone()))
94 }
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98struct QueryEntityResponseInternal<E> {
99 #[serde(rename = "odata.metadata")]
100 pub metadata: String,
101 #[serde(default = "Vec::new")]
102 pub value: Vec<E>,
103}
104
105impl<E: DeserializeOwned + Send + Sync> TryFrom<CollectedResponse> for QueryEntityResponse<E> {
106 type Error = Error;
107
108 fn try_from(response: CollectedResponse) -> azure_core::Result<Self> {
109 let query_entity_response_internal: QueryEntityResponseInternal<E> = response.json()?;
110
111 let headers = response.headers();
112
113 let next_partition_key = headers.get_optional_string(&HeaderName::from_static(
114 "x-ms-continuation-nextpartitionkey",
115 ));
116
117 let next_row_key =
118 headers.get_optional_string(&HeaderName::from_static("x-ms-continuation-nextrowkey"));
119
120 Ok(QueryEntityResponse {
121 common_storage_response_headers: response.headers().try_into()?,
122 metadata: query_entity_response_internal.metadata,
123 entities: query_entity_response_internal.value,
124 next_partition_key,
125 next_row_key,
126 })
127 }
128}