use crate::options::Precondition;
use crate::PartitionKey;
use azure_core::fmt::SafeDebug;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct BatchUpsertOptions {
pub precondition: Option<Precondition>,
}
impl BatchUpsertOptions {
pub fn with_precondition(mut self, precondition: Precondition) -> Self {
self.precondition = Some(precondition);
self
}
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct BatchReplaceOptions {
pub precondition: Option<Precondition>,
}
impl BatchReplaceOptions {
pub fn with_precondition(mut self, precondition: Precondition) -> Self {
self.precondition = Some(precondition);
self
}
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct BatchReadOptions {
pub precondition: Option<Precondition>,
}
impl BatchReadOptions {
pub fn with_precondition(mut self, precondition: Precondition) -> Self {
self.precondition = Some(precondition);
self
}
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct BatchDeleteOptions {
pub precondition: Option<Precondition>,
}
impl BatchDeleteOptions {
pub fn with_precondition(mut self, precondition: Precondition) -> Self {
self.precondition = Some(precondition);
self
}
}
#[derive(Clone, SafeDebug)]
#[safe(true)]
pub struct TransactionalBatch {
partition_key: PartitionKey,
operations: Vec<TransactionalBatchOperation>,
}
impl TransactionalBatch {
pub fn new(partition_key: impl Into<PartitionKey>) -> Self {
Self {
partition_key: partition_key.into(),
operations: Vec::new(),
}
}
pub fn partition_key(&self) -> &PartitionKey {
&self.partition_key
}
pub(crate) fn operations(&self) -> &[TransactionalBatchOperation] {
&self.operations
}
pub fn create_item<T: Serialize>(mut self, item: T) -> Result<Self, serde_json::Error> {
let resource_body = serde_json::to_value(item)?;
self.operations.push(TransactionalBatchOperation::Create {
resource_body,
id: None,
});
Ok(self)
}
pub fn upsert_item<T: Serialize>(
mut self,
item: T,
options: Option<BatchUpsertOptions>,
) -> Result<Self, serde_json::Error> {
let resource_body = serde_json::to_value(item)?;
let (if_match, if_none_match) = match options.as_ref().and_then(|o| o.precondition.as_ref())
{
Some(Precondition::IfMatch(etag)) => (Some(etag.to_string()), None),
Some(Precondition::IfNoneMatch(etag)) => (None, Some(etag.to_string())),
None | Some(_) => (None, None),
};
self.operations.push(TransactionalBatchOperation::Upsert {
resource_body,
id: None,
if_match,
if_none_match,
});
Ok(self)
}
pub fn replace_item<T: Serialize>(
mut self,
item_id: impl Into<Cow<'static, str>>,
item: T,
options: Option<BatchReplaceOptions>,
) -> Result<Self, serde_json::Error> {
let resource_body = serde_json::to_value(item)?;
let if_match = match options.as_ref().and_then(|o| o.precondition.as_ref()) {
Some(Precondition::IfMatch(etag)) => Some(etag.to_string()),
_ => None,
};
self.operations.push(TransactionalBatchOperation::Replace {
id: item_id.into(),
resource_body,
if_match,
});
Ok(self)
}
pub fn read_item(
mut self,
item_id: impl Into<Cow<'static, str>>,
options: Option<BatchReadOptions>,
) -> Self {
let (if_match, if_none_match) = match options.as_ref().and_then(|o| o.precondition.as_ref())
{
Some(Precondition::IfMatch(etag)) => (Some(etag.to_string()), None),
Some(Precondition::IfNoneMatch(etag)) => (None, Some(etag.to_string())),
None | Some(_) => (None, None),
};
self.operations.push(TransactionalBatchOperation::Read {
id: item_id.into(),
if_match,
if_none_match,
});
self
}
pub fn delete_item(
mut self,
item_id: impl Into<Cow<'static, str>>,
options: Option<BatchDeleteOptions>,
) -> Self {
let if_match = match options.as_ref().and_then(|o| o.precondition.as_ref()) {
Some(Precondition::IfMatch(etag)) => Some(etag.to_string()),
_ => None,
};
self.operations.push(TransactionalBatchOperation::Delete {
id: item_id.into(),
if_match,
});
self
}
}
#[derive(Clone, SafeDebug, Serialize, Deserialize)]
#[safe(true)]
#[serde(tag = "operationType", rename_all_fields = "camelCase")]
pub(crate) enum TransactionalBatchOperation {
Create {
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<Cow<'static, str>>,
resource_body: serde_json::Value,
},
Upsert {
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<Cow<'static, str>>,
resource_body: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
if_match: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
if_none_match: Option<String>,
},
Replace {
id: Cow<'static, str>,
resource_body: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
if_match: Option<String>,
},
Read {
id: Cow<'static, str>,
#[serde(skip_serializing_if = "Option::is_none")]
if_match: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
if_none_match: Option<String>,
},
Delete {
id: Cow<'static, str>,
#[serde(skip_serializing_if = "Option::is_none")]
if_match: Option<String>,
},
}
#[derive(Clone, SafeDebug)]
#[safe(true)]
#[non_exhaustive]
pub struct TransactionalBatchResponse {
results: Vec<TransactionalBatchOperationResult>,
}
impl TransactionalBatchResponse {
pub fn results(&self) -> &[TransactionalBatchOperationResult] {
&self.results
}
}
impl<'de> Deserialize<'de> for TransactionalBatchResponse {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let results = Vec::<TransactionalBatchOperationResult>::deserialize(deserializer)?;
Ok(TransactionalBatchResponse { results })
}
}
#[derive(Clone, SafeDebug, Deserialize)]
#[safe(true)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct TransactionalBatchOperationResult {
status_code: u16,
#[serde(default)]
resource_body: Option<serde_json::Value>,
#[serde(rename = "eTag")]
#[serde(default)]
etag: Option<String>,
#[serde(default)]
request_charge: Option<f64>,
#[serde(default)]
retry_after_milliseconds: Option<u64>,
#[serde(default)]
substatus_code: Option<u32>,
}
impl TransactionalBatchOperationResult {
pub fn status_code(&self) -> u16 {
self.status_code
}
pub fn resource_body(&self) -> Option<&serde_json::Value> {
self.resource_body.as_ref()
}
pub fn etag(&self) -> Option<&str> {
self.etag.as_deref()
}
pub fn request_charge(&self) -> Option<f64> {
self.request_charge
}
pub fn retry_after_milliseconds(&self) -> Option<u64> {
self.retry_after_milliseconds
}
pub fn substatus_code(&self) -> Option<u32> {
self.substatus_code
}
pub fn deserialize_body<T: serde::de::DeserializeOwned>(
&self,
) -> Result<Option<T>, serde_json::Error> {
match &self.resource_body {
Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
None => Ok(None),
}
}
pub fn is_success(&self) -> bool {
self.status_code >= 200 && self.status_code < 300
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::options::ETag;
use serde::Serialize;
#[derive(Serialize)]
struct TestItem {
id: String,
value: i32,
}
#[test]
fn create_batch_with_partition_key() {
let batch = TransactionalBatch::new("test_partition");
assert_eq!(batch.partition_key(), &PartitionKey::from("test_partition"));
assert_eq!(batch.operations().len(), 0);
}
#[test]
fn serialize_all_operations() -> Result<(), Box<dyn std::error::Error>> {
let item = TestItem {
id: "item1".to_string(),
value: 42,
};
let replace_options = BatchReplaceOptions::default()
.with_precondition(Precondition::IfMatch(ETag::from("some-etag")));
let batch = TransactionalBatch::new("test_partition")
.create_item(&item)?
.upsert_item(&item, None)?
.replace_item("id1", &item, Some(replace_options))?
.read_item("id2", None)
.delete_item("id3", None);
assert_eq!(batch.operations().len(), 5);
let serialized = serde_json::to_string_pretty(batch.operations())?;
let expected = r#"[
{
"operationType": "Create",
"resourceBody": {
"id": "item1",
"value": 42
}
},
{
"operationType": "Upsert",
"resourceBody": {
"id": "item1",
"value": 42
}
},
{
"operationType": "Replace",
"id": "id1",
"resourceBody": {
"id": "item1",
"value": 42
},
"ifMatch": "some-etag"
},
{
"operationType": "Read",
"id": "id2"
},
{
"operationType": "Delete",
"id": "id3"
}
]"#;
assert_eq!(serialized, expected);
Ok(())
}
}