use crate::client::{BatchOperation, BatchOperationResult, CosmosDBClient};
use crate::models::*;
use duroxide::providers::ProviderError;
#[allow(clippy::too_many_arguments)]
pub fn build_ack_batch(
instance_id: &str,
execution_id: u64,
_lock_token: &str,
messages_to_delete: &[String],
history_delta: &[(u64, String)],
same_partition_worker_items: Vec<serde_json::Value>,
same_partition_orch_items: Vec<serde_json::Value>,
outbox_intents: Vec<serde_json::Value>,
kv_materialization_ops: Vec<BatchOperation>,
cancelled_activity_ids: &[String],
instance_update: serde_json::Value,
) -> Vec<BatchOperation> {
let mut ops = Vec::new();
for msg_id in messages_to_delete {
ops.push(BatchOperation::Delete { id: msg_id.clone() });
}
for (event_id, event_data) in history_delta {
let doc = HistoryDocument::new(instance_id, execution_id, *event_id, event_data.clone());
let json = serde_json::to_value(&doc).unwrap();
ops.push(BatchOperation::Create { body: json });
}
for item in same_partition_worker_items {
ops.push(BatchOperation::Create { body: item });
}
for item in same_partition_orch_items {
ops.push(BatchOperation::Create { body: item });
}
for intent in outbox_intents {
ops.push(BatchOperation::Create { body: intent });
}
ops.extend(kv_materialization_ops);
for activity_doc_id in cancelled_activity_ids {
ops.push(BatchOperation::Delete {
id: activity_doc_id.clone(),
});
}
ops.push(BatchOperation::Upsert {
body: instance_update,
});
ops
}
pub async fn execute_batch(
client: &CosmosDBClient,
partition_key: &str,
operations: Vec<BatchOperation>,
) -> Result<Vec<BatchOperationResult>, ProviderError> {
if operations.is_empty() {
return Ok(vec![]);
}
if operations.len() > 100 {
let mut all_results = Vec::new();
for chunk in operations.chunks(100) {
let results = client
.transactional_batch(partition_key, chunk.to_vec())
.await?;
all_results.extend(results);
}
Ok(all_results)
} else {
client.transactional_batch(partition_key, operations).await
}
}