use azure_core::http::{
headers::{Header, Headers},
Context,
};
use serde::de::DeserializeOwned;
use std::sync::Arc;
use crate::{
constants, cosmos_request::CosmosRequest, cosmos_request::CosmosRequestBuilder, feed::FeedBody,
operation_context::OperationType, pipeline::GatewayPipeline, resource_context::ResourceLink,
Query, QueryFeedPage,
};
pub struct QueryExecutor<T: DeserializeOwned + Send> {
http_pipeline: Arc<GatewayPipeline>,
items_link: ResourceLink,
context: Context<'static>,
query: Query,
base_headers: Headers,
continuation: Option<String>,
complete: bool,
phantom: std::marker::PhantomData<fn() -> T>,
}
impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
pub(crate) fn new(
http_pipeline: Arc<GatewayPipeline>,
items_link: ResourceLink,
context: Context<'static>,
query: Query,
base_headers: Headers,
) -> Self {
Self {
http_pipeline,
items_link,
context,
query,
base_headers,
continuation: None,
complete: false,
phantom: std::marker::PhantomData,
}
}
pub fn into_stream(self) -> azure_core::Result<crate::FeedItemIterator<T>> {
Ok(crate::FeedItemIterator::new(futures::stream::try_unfold(
self,
|mut state| async move {
let val = state.next_page().await?;
Ok(val.map(|item| (item, state)))
},
)))
}
pub async fn next_page(&mut self) -> azure_core::Result<Option<QueryFeedPage<T>>> {
if self.complete {
return Ok(None);
}
let mut builder = create_query_cosmos_request_builder(&self.items_link, &self.query)?;
for (name, value) in self.base_headers.clone() {
builder = builder.header(name, value);
}
if let Some(continuation) = &self.continuation {
builder = builder.header(constants::CONTINUATION, continuation.clone());
}
let cosmos_request = builder.build()?;
let resp = self
.http_pipeline
.send::<FeedBody<T>>(cosmos_request, self.context.to_borrowed())
.await?;
let page = QueryFeedPage::<T>::from_response(resp).await?;
match page.continuation() {
Some(token) => self.continuation = Some(token.to_string()),
None => self.complete = true,
}
Ok(Some(page))
}
}
fn create_query_cosmos_request_builder(
items_link: &ResourceLink,
query: &Query,
) -> azure_core::Result<CosmosRequestBuilder> {
let builder = CosmosRequest::builder(OperationType::Query, items_link.clone())
.header(constants::QUERY, "True")
.header(
constants::QUERY_CONTENT_TYPE.name(),
constants::QUERY_CONTENT_TYPE.value(),
)
.json(query);
Ok(builder)
}