Skip to main content

duroxide_cdb/
containers.rs

1use crate::client::CosmosDBClient;
2use duroxide::providers::ProviderError;
3
4/// Default indexing policy for the duroxide container.
5pub fn default_indexing_policy() -> serde_json::Value {
6    serde_json::json!({
7        "indexingMode": "consistent",
8        "automatic": true,
9        "includedPaths": [
10            { "path": "/type/?" },
11            { "path": "/instanceId/?" },
12            { "path": "/dispatchSlot/?" },
13            { "path": "/visibleAt/?" },
14            { "path": "/enqueuedAt/?" },
15            { "path": "/lockedUntil/?" },
16            { "path": "/executionId/?" },
17            { "path": "/eventId/?" },
18            { "path": "/status/?" },
19            { "path": "/sessionId/?" },
20            { "path": "/ownerId/?" },
21            { "path": "/parentInstanceId/?" },
22            { "path": "/createdAt/?" },
23            { "path": "/customStatusVersion/?" },
24            { "path": "/docType/?" },
25            { "path": "/lockToken/?" }
26        ],
27        "excludedPaths": [
28            { "path": "/eventData/*" },
29            { "path": "/workItem/*" },
30            { "path": "/payload/*" },
31            { "path": "/output/*" },
32            { "path": "/customStatus/*" },
33            { "path": "/*" }
34        ],
35        "compositeIndexes": [
36            [
37                { "path": "/type", "order": "ascending" },
38                { "path": "/dispatchSlot", "order": "ascending" },
39                { "path": "/visibleAt", "order": "ascending" },
40                { "path": "/enqueuedAt", "order": "ascending" }
41            ],
42            [
43                { "path": "/type", "order": "ascending" },
44                { "path": "/executionId", "order": "ascending" },
45                { "path": "/eventId", "order": "ascending" }
46            ],
47            [
48                { "path": "/type", "order": "ascending" },
49                { "path": "/status", "order": "ascending" }
50            ]
51        ]
52    })
53}
54
55/// Initialize the database and container. Idempotent.
56/// Retries on 429 (metadata rate-limiting) with exponential backoff.
57pub async fn ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
58    const MAX_RETRIES: u32 = 10;
59    const BASE_DELAY_MS: u64 = 500;
60
61    let mut last_err = None;
62    for attempt in 0..MAX_RETRIES {
63        match try_ensure_infrastructure(client).await {
64            Ok(()) => return Ok(()),
65            Err(e) if e.retryable && attempt + 1 < MAX_RETRIES => {
66                let delay = BASE_DELAY_MS * 2u64.pow(attempt.min(5));
67                tracing::warn!(
68                    attempt = attempt + 1,
69                    delay_ms = delay,
70                    "CosmosDB metadata rate-limited, retrying infrastructure setup"
71                );
72                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
73                last_err = Some(e);
74            }
75            Err(e) => return Err(e),
76        }
77    }
78    Err(last_err.unwrap())
79}
80
81async fn try_ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
82    client.ensure_database().await?;
83    client
84        .ensure_container(Some(default_indexing_policy()))
85        .await?;
86    tracing::info!(
87        target: "duroxide::providers::cosmosdb",
88        database = client.database(),
89        container = client.container(),
90        "CosmosDB infrastructure initialized"
91    );
92    Ok(())
93}