1use crate::client::{BatchOperation, BatchOperationResult, CosmosDBClient};
2use crate::models::*;
3use duroxide::providers::ProviderError;
4
5#[allow(clippy::too_many_arguments)]
8pub fn build_ack_batch(
9 instance_id: &str,
10 execution_id: u64,
11 _lock_token: &str,
12 messages_to_delete: &[String],
13 history_delta: &[(u64, String)],
14 same_partition_worker_items: Vec<serde_json::Value>,
15 same_partition_orch_items: Vec<serde_json::Value>,
16 outbox_intents: Vec<serde_json::Value>,
17 kv_materialization_ops: Vec<BatchOperation>,
18 cancelled_activity_ids: &[String],
19 instance_update: serde_json::Value,
20) -> Vec<BatchOperation> {
21 let mut ops = Vec::new();
22
23 for msg_id in messages_to_delete {
25 ops.push(BatchOperation::Delete { id: msg_id.clone() });
26 }
27
28 for (event_id, event_data) in history_delta {
32 let doc = HistoryDocument::new(instance_id, execution_id, *event_id, event_data.clone());
33 let json = serde_json::to_value(&doc).unwrap();
34 ops.push(BatchOperation::Create { body: json });
35 }
36
37 for item in same_partition_worker_items {
39 ops.push(BatchOperation::Create { body: item });
40 }
41
42 for item in same_partition_orch_items {
44 ops.push(BatchOperation::Create { body: item });
45 }
46
47 for intent in outbox_intents {
49 ops.push(BatchOperation::Create { body: intent });
50 }
51
52 ops.extend(kv_materialization_ops);
54
55 for activity_doc_id in cancelled_activity_ids {
57 ops.push(BatchOperation::Delete {
58 id: activity_doc_id.clone(),
59 });
60 }
61
62 ops.push(BatchOperation::Upsert {
64 body: instance_update,
65 });
66
67 ops
68}
69
70pub async fn execute_batch(
72 client: &CosmosDBClient,
73 partition_key: &str,
74 operations: Vec<BatchOperation>,
75) -> Result<Vec<BatchOperationResult>, ProviderError> {
76 if operations.is_empty() {
77 return Ok(vec![]);
78 }
79
80 if operations.len() > 100 {
82 let mut all_results = Vec::new();
84 for chunk in operations.chunks(100) {
85 let results = client
86 .transactional_batch(partition_key, chunk.to_vec())
87 .await?;
88 all_results.extend(results);
89 }
90 Ok(all_results)
91 } else {
92 client.transactional_batch(partition_key, operations).await
93 }
94}