use crate::{
feed::FeedBody,
models::{CosmosResponse, ThroughputProperties},
Query,
};
use azure_data_cosmos_driver::models::{AccountReference, CosmosOperation};
use azure_data_cosmos_driver::options::OperationOptions;
use azure_data_cosmos_driver::CosmosDriver;
use std::sync::Arc;
pub(crate) async fn find_offer(
driver: &CosmosDriver,
account: &AccountReference,
resource_id: &str,
operation_options: OperationOptions,
) -> crate::Result<Option<ThroughputProperties>> {
let query = Query::from("SELECT * FROM c WHERE c.offerResourceId = @rid")
.with_parameter("@rid", resource_id)?;
let body = serde_json::to_vec(&query)?;
let operation = CosmosOperation::query_offers(account.clone()).with_body(body);
let driver_response = driver
.execute_operation(operation, operation_options)
.await?;
let Some(driver_response) = driver_response else {
return Ok(None);
};
tracing::debug!(
activity_id = ?driver_response.headers().activity_id,
request_charge = ?driver_response.headers().request_charge,
"offer query completed"
);
let feed: FeedBody<ThroughputProperties> = driver_response.into_body().into_single()?;
Ok(feed.items.into_iter().next())
}
pub(crate) async fn read_offer_by_id(
driver: &CosmosDriver,
account: &AccountReference,
offer_id: &str,
) -> crate::Result<CosmosResponse> {
let operation = CosmosOperation::read_offer(account.clone(), offer_id.to_owned());
let driver_response = driver
.execute_singleton_operation(operation, OperationOptions::default())
.await?;
Ok(crate::driver_bridge::driver_response_to_cosmos_response(
driver_response,
))
}
pub(crate) async fn begin_replace(
driver: Arc<CosmosDriver>,
account: AccountReference,
resource_id: &str,
throughput: ThroughputProperties,
operation_options: OperationOptions,
) -> crate::Result<crate::clients::ThroughputPoller> {
let mut current_throughput =
find_offer(&driver, &account, resource_id, operation_options.clone())
.await?
.ok_or_else(|| {
crate::DriverCosmosError::builder()
.with_status(crate::CosmosStatus::CLIENT_NO_THROUGHPUT_OFFER_FOR_RESOURCE)
.with_message("no throughput offer found for this resource")
.build()
})?;
if current_throughput.offer_id.is_empty() {
return Err(crate::DriverCosmosError::builder()
.with_status(crate::CosmosStatus::SERVICE_RETURNED_OFFER_WITHOUT_ID)
.with_message("throughput offer has an empty id")
.build()
.into());
}
let offer_id = current_throughput.offer_id.clone();
current_throughput.offer = throughput.offer;
let body = serde_json::to_vec(¤t_throughput)?;
let operation =
CosmosOperation::replace_offer(account.clone(), offer_id.clone()).with_body(body);
let replace_options = {
let mut opts = operation_options;
opts.content_response_on_write =
Some(azure_data_cosmos_driver::options::ContentResponseOnWrite::Enabled);
opts
};
let driver_response = driver
.execute_singleton_operation(operation, replace_options)
.await?;
let response = crate::driver_bridge::driver_response_to_cosmos_response(driver_response);
Ok(crate::clients::ThroughputPoller::new(
response, driver, account, offer_id,
))
}