duroxide-cdb 0.1.10

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
use crate::client::CosmosDBClient;
use duroxide::providers::ProviderError;

/// Default indexing policy for the duroxide container.
pub fn default_indexing_policy() -> serde_json::Value {
    serde_json::json!({
        "indexingMode": "consistent",
        "automatic": true,
        "includedPaths": [
            { "path": "/type/?" },
            { "path": "/instanceId/?" },
            { "path": "/dispatchSlot/?" },
            { "path": "/visibleAt/?" },
            { "path": "/enqueuedAt/?" },
            { "path": "/lockedUntil/?" },
            { "path": "/executionId/?" },
            { "path": "/eventId/?" },
            { "path": "/status/?" },
            { "path": "/sessionId/?" },
            { "path": "/ownerId/?" },
            { "path": "/parentInstanceId/?" },
            { "path": "/createdAt/?" },
            { "path": "/customStatusVersion/?" },
            { "path": "/docType/?" },
            { "path": "/lockToken/?" }
        ],
        "excludedPaths": [
            { "path": "/eventData/*" },
            { "path": "/workItem/*" },
            { "path": "/payload/*" },
            { "path": "/output/*" },
            { "path": "/customStatus/*" },
            { "path": "/*" }
        ],
        "compositeIndexes": [
            [
                { "path": "/type", "order": "ascending" },
                { "path": "/dispatchSlot", "order": "ascending" },
                { "path": "/visibleAt", "order": "ascending" },
                { "path": "/enqueuedAt", "order": "ascending" }
            ],
            [
                { "path": "/type", "order": "ascending" },
                { "path": "/executionId", "order": "ascending" },
                { "path": "/eventId", "order": "ascending" }
            ],
            [
                { "path": "/type", "order": "ascending" },
                { "path": "/status", "order": "ascending" }
            ]
        ]
    })
}

/// Initialize the database and container. Idempotent.
/// Retries on 429 (metadata rate-limiting) with exponential backoff.
pub async fn ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
    const MAX_RETRIES: u32 = 10;
    const BASE_DELAY_MS: u64 = 500;

    let mut last_err = None;
    for attempt in 0..MAX_RETRIES {
        match try_ensure_infrastructure(client).await {
            Ok(()) => return Ok(()),
            Err(e) if e.retryable && attempt + 1 < MAX_RETRIES => {
                let delay = BASE_DELAY_MS * 2u64.pow(attempt.min(5));
                tracing::warn!(
                    attempt = attempt + 1,
                    delay_ms = delay,
                    "CosmosDB metadata rate-limited, retrying infrastructure setup"
                );
                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
                last_err = Some(e);
            }
            Err(e) => return Err(e),
        }
    }
    Err(last_err.unwrap())
}

async fn try_ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
    client.ensure_database().await?;
    client
        .ensure_container(Some(default_indexing_policy()))
        .await?;
    tracing::info!(
        target: "duroxide::providers::cosmosdb",
        database = client.database(),
        container = client.container(),
        "CosmosDB infrastructure initialized"
    );
    Ok(())
}