duroxide_cdb/
containers.rs1use crate::client::CosmosDBClient;
2use duroxide::providers::ProviderError;
3
4pub fn default_indexing_policy() -> serde_json::Value {
6 serde_json::json!({
7 "indexingMode": "consistent",
8 "automatic": true,
9 "includedPaths": [
10 { "path": "/type/?" },
11 { "path": "/instanceId/?" },
12 { "path": "/dispatchSlot/?" },
13 { "path": "/visibleAt/?" },
14 { "path": "/enqueuedAt/?" },
15 { "path": "/lockedUntil/?" },
16 { "path": "/executionId/?" },
17 { "path": "/eventId/?" },
18 { "path": "/status/?" },
19 { "path": "/sessionId/?" },
20 { "path": "/ownerId/?" },
21 { "path": "/parentInstanceId/?" },
22 { "path": "/createdAt/?" },
23 { "path": "/customStatusVersion/?" },
24 { "path": "/docType/?" },
25 { "path": "/lockToken/?" }
26 ],
27 "excludedPaths": [
28 { "path": "/eventData/*" },
29 { "path": "/workItem/*" },
30 { "path": "/payload/*" },
31 { "path": "/output/*" },
32 { "path": "/customStatus/*" },
33 { "path": "/*" }
34 ],
35 "compositeIndexes": [
36 [
37 { "path": "/type", "order": "ascending" },
38 { "path": "/dispatchSlot", "order": "ascending" },
39 { "path": "/visibleAt", "order": "ascending" },
40 { "path": "/enqueuedAt", "order": "ascending" }
41 ],
42 [
43 { "path": "/type", "order": "ascending" },
44 { "path": "/executionId", "order": "ascending" },
45 { "path": "/eventId", "order": "ascending" }
46 ],
47 [
48 { "path": "/type", "order": "ascending" },
49 { "path": "/status", "order": "ascending" }
50 ]
51 ]
52 })
53}
54
55pub async fn ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
58 const MAX_RETRIES: u32 = 10;
59 const BASE_DELAY_MS: u64 = 500;
60
61 let mut last_err = None;
62 for attempt in 0..MAX_RETRIES {
63 match try_ensure_infrastructure(client).await {
64 Ok(()) => return Ok(()),
65 Err(e) if e.retryable && attempt + 1 < MAX_RETRIES => {
66 let delay = BASE_DELAY_MS * 2u64.pow(attempt.min(5));
67 tracing::warn!(
68 attempt = attempt + 1,
69 delay_ms = delay,
70 "CosmosDB metadata rate-limited, retrying infrastructure setup"
71 );
72 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
73 last_err = Some(e);
74 }
75 Err(e) => return Err(e),
76 }
77 }
78 Err(last_err.unwrap())
79}
80
81async fn try_ensure_infrastructure(client: &CosmosDBClient) -> Result<(), ProviderError> {
82 client.ensure_database().await?;
83 client
84 .ensure_container(Some(default_indexing_policy()))
85 .await?;
86 tracing::info!(
87 target: "duroxide::providers::cosmosdb",
88 database = client.database(),
89 container = client.container(),
90 "CosmosDB infrastructure initialized"
91 );
92 Ok(())
93}