duroxide-cdb 0.1.6

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
use crate::client::{BatchOperation, BatchOperationResult, CosmosDBClient};
use crate::models::*;
use duroxide::providers::ProviderError;

/// Build a transactional batch for ack_orchestration_item.
/// Returns (batch_operations, cross_partition_intents).
pub fn build_ack_batch(
    instance_id: &str,
    execution_id: u64,
    _lock_token: &str,
    messages_to_delete: &[String],
    history_delta: &[(u64, String)],
    same_partition_worker_items: Vec<serde_json::Value>,
    same_partition_orch_items: Vec<serde_json::Value>,
    outbox_intents: Vec<serde_json::Value>,
    kv_materialization_ops: Vec<BatchOperation>,
    cancelled_activity_ids: &[String],
    instance_update: serde_json::Value,
) -> Vec<BatchOperation> {
    let mut ops = Vec::new();

    // a. Delete locked orch_queue messages
    for msg_id in messages_to_delete {
        ops.push(BatchOperation::Delete { id: msg_id.clone() });
    }

    // b. Create history event documents (must be Create, not Upsert —
    //    duplicate event_ids indicate a double-ack which must fail the batch
    //    to preserve atomicity, matching PG's unique constraint behavior).
    for (event_id, event_data) in history_delta {
        let doc = HistoryDocument::new(instance_id, execution_id, *event_id, event_data.clone());
        let json = serde_json::to_value(&doc).unwrap();
        ops.push(BatchOperation::Create { body: json });
    }

    // c. Create worker_queue items (same partition)
    for item in same_partition_worker_items {
        ops.push(BatchOperation::Create { body: item });
    }

    // d. Create orch_queue items (same partition)
    for item in same_partition_orch_items {
        ops.push(BatchOperation::Create { body: item });
    }

    // e. Create outbox_intent documents (same partition)
    for intent in outbox_intents {
        ops.push(BatchOperation::Create { body: intent });
    }

    // f. Materialize KV updates in the same partition
    ops.extend(kv_materialization_ops);

    // g. Delete cancelled activity entries from worker_queue
    for activity_doc_id in cancelled_activity_ids {
        ops.push(BatchOperation::Delete {
            id: activity_doc_id.clone(),
        });
    }

    // h. Upsert instance document (releases lock)
    ops.push(BatchOperation::Upsert {
        body: instance_update,
    });

    ops
}

/// Execute a transactional batch and return results.
pub async fn execute_batch(
    client: &CosmosDBClient,
    partition_key: &str,
    operations: Vec<BatchOperation>,
) -> Result<Vec<BatchOperationResult>, ProviderError> {
    if operations.is_empty() {
        return Ok(vec![]);
    }

    // CosmosDB batch limit: 100 operations
    if operations.len() > 100 {
        // Split into chunks. First chunk includes the critical operations.
        let mut all_results = Vec::new();
        for chunk in operations.chunks(100) {
            let results = client
                .transactional_batch(partition_key, chunk.to_vec())
                .await?;
            all_results.extend(results);
        }
        Ok(all_results)
    } else {
        client.transactional_batch(partition_key, operations).await
    }
}