Skip to main content

duroxide_cdb/
batch.rs

1use crate::client::{BatchOperation, BatchOperationResult, CosmosDBClient};
2use crate::models::*;
3use duroxide::providers::ProviderError;
4
5/// Build a transactional batch for ack_orchestration_item.
6/// Returns (batch_operations, cross_partition_intents).
7#[allow(clippy::too_many_arguments)]
8pub fn build_ack_batch(
9    instance_id: &str,
10    execution_id: u64,
11    _lock_token: &str,
12    messages_to_delete: &[String],
13    history_delta: &[(u64, String)],
14    same_partition_worker_items: Vec<serde_json::Value>,
15    same_partition_orch_items: Vec<serde_json::Value>,
16    outbox_intents: Vec<serde_json::Value>,
17    kv_materialization_ops: Vec<BatchOperation>,
18    cancelled_activity_ids: &[String],
19    instance_update: serde_json::Value,
20) -> Vec<BatchOperation> {
21    let mut ops = Vec::new();
22
23    // a. Delete locked orch_queue messages
24    for msg_id in messages_to_delete {
25        ops.push(BatchOperation::Delete { id: msg_id.clone() });
26    }
27
28    // b. Create history event documents (must be Create, not Upsert —
29    //    duplicate event_ids indicate a double-ack which must fail the batch
30    //    to preserve atomicity, matching PG's unique constraint behavior).
31    for (event_id, event_data) in history_delta {
32        let doc = HistoryDocument::new(instance_id, execution_id, *event_id, event_data.clone());
33        let json = serde_json::to_value(&doc).unwrap();
34        ops.push(BatchOperation::Create { body: json });
35    }
36
37    // c. Create worker_queue items (same partition)
38    for item in same_partition_worker_items {
39        ops.push(BatchOperation::Create { body: item });
40    }
41
42    // d. Create orch_queue items (same partition)
43    for item in same_partition_orch_items {
44        ops.push(BatchOperation::Create { body: item });
45    }
46
47    // e. Create outbox_intent documents (same partition)
48    for intent in outbox_intents {
49        ops.push(BatchOperation::Create { body: intent });
50    }
51
52    // f. Materialize KV updates in the same partition
53    ops.extend(kv_materialization_ops);
54
55    // g. Delete cancelled activity entries from worker_queue
56    for activity_doc_id in cancelled_activity_ids {
57        ops.push(BatchOperation::Delete {
58            id: activity_doc_id.clone(),
59        });
60    }
61
62    // h. Upsert instance document (releases lock)
63    ops.push(BatchOperation::Upsert {
64        body: instance_update,
65    });
66
67    ops
68}
69
70/// Execute a transactional batch and return results.
71pub async fn execute_batch(
72    client: &CosmosDBClient,
73    partition_key: &str,
74    operations: Vec<BatchOperation>,
75) -> Result<Vec<BatchOperationResult>, ProviderError> {
76    if operations.is_empty() {
77        return Ok(vec![]);
78    }
79
80    // CosmosDB batch limit: 100 operations
81    if operations.len() > 100 {
82        // Split into chunks. First chunk includes the critical operations.
83        let mut all_results = Vec::new();
84        for chunk in operations.chunks(100) {
85            let results = client
86                .transactional_batch(partition_key, chunk.to_vec())
87                .await?;
88            all_results.extend(results);
89        }
90        Ok(all_results)
91    } else {
92        client.transactional_batch(partition_key, operations).await
93    }
94}