# duroxide-cdb: CosmosDB Provider Specification
**Status:** Draft
**Date:** 2026-02-23
**Authors:** @affandar
---
## 1. Overview
`duroxide-cdb` is a CosmosDB NoSQL API provider for [duroxide](https://github.com/microsoft/duroxide), implementing the `Provider` and `ProviderAdmin` traits. It stores orchestration state, event history, and work queues in a single CosmosDB container using document-type discrimination.
### Design Principles
1. **Single container, partition-per-instance.** All documents for an orchestration instance live in one logical partition. Operations within an instance use CosmosDB transactional batch.
2. **Transactional outbox for cross-partition writes.** Sub-orchestration starts, completions, and detached orchestration starts use an intent log with best-effort delivery and background reconciliation.
3. **Optimistic concurrency via ETags.** Instance locking and queue item locking use CosmosDB conditional writes instead of database-level locks.
4. **Lease-based dispatcher partitioning.** Concurrent dispatchers within a runtime partition the keyspace to avoid contention. Phase 1: in-memory. Phase 2: CosmosDB-backed leases for multi-runtime coordination.
5. **Short polling at 1/s.** No long polling or change feed in Phase 1.
---
## 2. Data Model
### 2.1 Container
A single CosmosDB container named `duroxide` (configurable).
- **Partition key:** `/instanceId`
- **Unique key policy:** `/id` (default)
All document types coexist in this container, discriminated by the `type` field.
### 2.2 Document Types
#### 2.2.1 Instance Document
One per orchestration instance. Holds metadata, lock state, and custom status.
```json
{
"id": "<instanceId>:instance",
"instanceId": "<instanceId>",
"type": "instance",
"orchestrationName": "MyOrchestration",
"orchestrationVersion": "1.0.0",
"currentExecutionId": 1,
"status": "Running",
"output": null,
"parentInstanceId": null,
"pinnedDuroxideVersion": null,
"customStatus": null,
"customStatusVersion": 0,
"lockToken": null,
"lockedUntil": null,
"createdAt": 1740000000000,
"updatedAt": 1740000000000
}
```
**Fields:**
| `id` | string | `<instanceId>:instance` — deterministic, one per instance |
| `instanceId` | string | Partition key. The orchestration instance identifier. |
| `type` | string | Always `"instance"` |
| `orchestrationName` | string | Registered orchestration name |
| `orchestrationVersion` | string | Semver version |
| `currentExecutionId` | i64 | Latest execution ID (increments on ContinueAsNew) |
| `status` | string | `"Running"`, `"Completed"`, `"Failed"`, `"ContinuedAsNew"` |
| `output` | string? | Serialized orchestration output (on completion/failure) |
| `parentInstanceId` | string? | Parent instance ID for sub-orchestrations |
| `pinnedDuroxideVersion` | object? | `{ major, minor, patch }` — capability filtering |
| `customStatus` | string? | User-set custom status |
| `customStatusVersion` | i64 | Monotonic version for custom status polling |
| `lockToken` | string? | UUID held by the dispatcher processing this instance |
| `lockedUntil` | i64? | Epoch ms when the lock expires |
| `createdAt` | i64 | Epoch ms |
| `updatedAt` | i64 | Epoch ms |
#### 2.2.2 History Event Document
One per event in the orchestration history. Append-only.
```json
{
"id": "<instanceId>:history:<executionId>:<eventId>",
"instanceId": "<instanceId>",
"type": "history",
"executionId": 1,
"eventId": 1,
"eventData": "{...serialized duroxide::Event...}"
}
```
**Fields:**
| `id` | string | Deterministic composite key |
| `instanceId` | string | Partition key |
| `type` | string | Always `"history"` |
| `executionId` | i64 | Execution this event belongs to |
| `eventId` | i64 | Monotonically increasing within an execution |
| `eventData` | string | JSON-serialized `duroxide::Event` |
#### 2.2.3 Orchestrator Queue Item
Work items destined for the orchestration dispatcher. Includes `StartOrchestration`, `ActivityCompleted`, `ActivityFailed`, `TimerFired`, `ExternalRaised`, `SubOrchCompleted`, `SubOrchFailed`, `CancelInstance`, `ContinueAsNew`, `QueueMessage`.
```json
{
"id": "<uuid>",
"instanceId": "<instanceId>",
"type": "orch_queue",
"workItem": "{...serialized duroxide::WorkItem...}",
"dispatchSlot": 42,
"visibleAt": 1740000000000,
"enqueuedAt": 1740000000000,
"lockToken": null,
"lockedUntil": null,
"attemptCount": 0
}
```
**Fields:**
| `id` | string | Random UUID — multiple queue items per instance |
| `instanceId` | string | Partition key |
| `type` | string | Always `"orch_queue"` |
| `workItem` | string | JSON-serialized `duroxide::WorkItem` |
| `dispatchSlot` | u8 | `hash(instanceId) % 256` — precomputed for dispatcher partitioning |
| `visibleAt` | i64 | Epoch ms. Item not fetchable before this time. Used for timers and delayed visibility. |
| `enqueuedAt` | i64 | Epoch ms. Determines FIFO ordering. |
| `lockToken` | string? | Set when a dispatcher locks this message batch |
| `lockedUntil` | i64? | Epoch ms. Lock expires after this time. |
| `attemptCount` | i32 | Incremented each time the item is fetched. For poison message detection. |
#### 2.2.4 Worker Queue Item
Work items destined for the activity worker dispatcher. Includes `ActivityExecute`.
```json
{
"id": "<uuid>",
"instanceId": "<instanceId>",
"type": "worker_queue",
"workItem": "{...serialized duroxide::WorkItem...}",
"dispatchSlot": 42,
"visibleAt": 1740000000000,
"enqueuedAt": 1740000000000,
"lockToken": null,
"lockedUntil": null,
"attemptCount": 0,
"executionId": 1,
"activityId": 3,
"sessionId": null
}
```
**Additional fields over orch_queue:**
| `executionId` | i64? | Execution that scheduled this activity |
| `activityId` | i64? | Activity ID within the execution |
| `sessionId` | string? | Session ID for session affinity routing |
#### 2.2.5 Outbox Intent Document
Written in the same partition as the source instance. Represents a cross-partition write that needs delivery.
```json
{
"id": "intent:<deterministic-key>",
"instanceId": "<source-instance-id>",
"type": "outbox_intent",
"targetInstanceId": "<target-instance-id>",
"targetDocumentType": "orch_queue",
"payload": "{...the full document to create in the target partition...}",
"idempotencyKey": "<deterministic-key>",
"status": "pending",
"createdAt": 1740000000000,
"attemptCount": 0,
"lastAttemptAt": null
}
```
**Fields:**
| `id` | string | `intent:<idempotencyKey>` — deterministic for dedup |
| `instanceId` | string | Partition key = source instance (same partition as the batch that created it) |
| `type` | string | Always `"outbox_intent"` |
| `targetInstanceId` | string | Destination partition |
| `targetDocumentType` | string | `"orch_queue"` or `"worker_queue"` |
| `payload` | string | The complete JSON document to be created in the target partition |
| `idempotencyKey` | string | Deterministic key derived from source context (e.g., `<sourceInstance>:<executionId>:<eventSequence>`) for idempotent delivery |
| `status` | string | `"pending"` or `"delivered"` |
| `createdAt` | i64 | Epoch ms |
| `attemptCount` | i32 | Delivery attempts so far |
| `lastAttemptAt` | i64? | Epoch ms of last delivery attempt |
#### 2.2.6 Session Document
Tracks session affinity: which worker owns a session.
```json
{
"id": "<instanceId>:session:<sessionId>",
"instanceId": "<instanceId>",
"type": "session",
"sessionId": "session-abc",
"ownerId": "worker-1",
"lockedUntil": 1740000030000,
"lastActivity": 1740000000000,
"createdAt": 1740000000000
}
```
### 2.3 Indexing Policy
```json
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/type/?" },
{ "path": "/instanceId/?" },
{ "path": "/dispatchSlot/?" },
{ "path": "/visibleAt/?" },
{ "path": "/enqueuedAt/?" },
{ "path": "/lockedUntil/?" },
{ "path": "/executionId/?" },
{ "path": "/eventId/?" },
{ "path": "/status/?" },
{ "path": "/sessionId/?" },
{ "path": "/ownerId/?" },
{ "path": "/parentInstanceId/?" },
{ "path": "/createdAt/?" },
{ "path": "/customStatusVersion/?" }
],
"excludedPaths": [
{ "path": "/eventData/*" },
{ "path": "/workItem/*" },
{ "path": "/payload/*" },
{ "path": "/output/*" },
{ "path": "/customStatus/*" },
{ "path": "/*" }
],
"compositeIndexes": [
[
{ "path": "/type", "order": "ascending" },
{ "path": "/dispatchSlot", "order": "ascending" },
{ "path": "/visibleAt", "order": "ascending" },
{ "path": "/enqueuedAt", "order": "ascending" }
],
[
{ "path": "/type", "order": "ascending" },
{ "path": "/executionId", "order": "ascending" },
{ "path": "/eventId", "order": "ascending" }
],
[
{ "path": "/type", "order": "ascending" },
{ "path": "/status", "order": "ascending" }
]
]
}
```
**Rationale:**
- Exclude `eventData`, `workItem`, `payload`, `output`, `customStatus` — large blobs never queried by content.
- First composite index: serves `fetch_orchestration_item` and `fetch_work_item` cross-partition queries with slot-based partitioning.
- Second composite index: serves history reads within a partition (ordered by executionId, eventId).
- Third composite index: serves `list_instances_by_status` cross-partition queries.
### 2.4 Dispatch Slot Computation
Every document written to `orch_queue` or `worker_queue` includes a precomputed `dispatchSlot`:
```rust
fn dispatch_slot(instance_id: &str) -> u8 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
instance_id.hash(&mut hasher);
(hasher.finish() % 256) as u8
}
```
256 slots are distributed across dispatchers via the lease provider. Each dispatcher only queries for items in its assigned slots, eliminating intra-runtime contention.
---
## 3. Algorithms
### 3.1 `fetch_orchestration_item`
```
fetch_orchestration_item(lock_timeout, poll_timeout, capability_filter)
│
├── 0. GET MY SLOTS
│ caller_id = tokio::task::id()
│ my_slots = orch_lease_provider.acquire_slots(caller_id)
│
├── 1. FIND CANDIDATE (cross-partition query)
│ SELECT TOP 1 c.id, c.instanceId, c._etag
│ FROM c
│ WHERE c.type = 'orch_queue'
│ AND c.visibleAt <= @now
│ AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│ AND c.dispatchSlot IN (@my_slots)
│ -- Capability filter (if provided):
│ AND (NOT IS_DEFINED(c.pinnedVersionPacked)
│ OR c.pinnedVersionPacked BETWEEN @minPacked AND @maxPacked)
│ ORDER BY c.enqueuedAt
│
│ → No results? Return Ok(None). Runtime sleeps 1s, calls again.
│
├── 2. LOCK THE INSTANCE (point read + conditional patch)
│ │
│ │ Read: GET /instanceId:instance (partition: instanceId)
│ │ → instance doc with _etag
│ │
│ │ Guard: if instance.lockedUntil > now → instance locked by another
│ │ dispatcher (possible cross-runtime contention).
│ │ Exclude this instanceId, retry step 1.
│ │
│ │ Conditional patch (If-Match: instance._etag):
│ │ SET lockToken = new_uuid()
│ │ SET lockedUntil = now + lock_timeout
│ │
│ │ On 412 Precondition Failed:
│ │ → ETag race. Exclude instanceId, retry step 1.
│ │
│ │ Max 3 retries with exclusion list. After 3 → return Ok(None).
│ │
│ └── On success: we own this instance.
│
├── 3. COLLECT ALL PENDING MESSAGES (single-partition query)
│ │
│ │ SELECT *
│ │ FROM c
│ │ WHERE c.instanceId = @instanceId
│ │ AND c.type = 'orch_queue'
│ │ AND c.visibleAt <= @now
│ │ AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│ │
│ │ If zero messages:
│ │ → Messages were consumed between step 1 and 3.
│ │ → Unlock instance (patch lockedUntil = null, lockToken = null).
│ │ → Retry step 1.
│ │
│ │ Tag all collected messages with the lock:
│ │ Transactional batch (same partition):
│ │ For each message:
│ │ PATCH c.lockToken = @lockToken
│ │ PATCH c.lockedUntil = now + lock_timeout
│ │ PATCH c.attemptCount = c.attemptCount + 1
│ │
│ │ Record MAX(attemptCount) across batch → poison detection.
│ │
│ └── Deserialize each message.workItem → Vec<WorkItem>
│
├── 4. FETCH HISTORY (single-partition query)
│ │
│ │ SELECT c.eventData
│ │ FROM c
│ │ WHERE c.instanceId = @instanceId
│ │ AND c.type = 'history'
│ │ AND c.executionId = @currentExecutionId
│ │ ORDER BY c.eventId
│ │
│ │ Deserialize → Vec<Event>
│ │ On success: history = events, history_error = None
│ │ On failure: history = vec![], history_error = Some(error_msg)
│ │
│ └── Cost: proportional to history size.
│
├── 5. BUILD OrchestrationItem
│ │ { instance, orchestration_name, execution_id, version,
│ │ history, messages, history_error }
│ │
│ └── Read orchestration_name, version, currentExecutionId from instance doc.
│
└── 6. RETURN Ok(Some((item, lockToken, attemptCount)))
```
### 3.2 `fetch_work_item`
Same structure as `fetch_orchestration_item` but simpler — no instance-level lock, no history fetch, no message batching.
```
fetch_work_item(lock_timeout, poll_timeout, session_config)
│
├── 0. GET MY SLOTS
│ caller_id = tokio::task::id()
│ my_slots = worker_lease_provider.acquire_slots(caller_id)
│
├── 1. FIND AND LOCK A WORK ITEM (cross-partition query)
│ │
│ │ Build query based on session_config:
│ │
│ │ If session_config = None:
│ │ → Fetch any non-session item:
│ │ SELECT TOP 1 *
│ │ FROM c
│ │ WHERE c.type = 'worker_queue'
│ │ AND c.visibleAt <= @now
│ │ AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│ │ AND (NOT IS_DEFINED(c.sessionId) OR c.sessionId = null)
│ │ AND c.dispatchSlot IN (@my_slots)
│ │ ORDER BY c.enqueuedAt
│ │
│ │ If session_config = Some({ owner_id, lock_timeout }):
│ │ → Prefer items from sessions owned by this worker,
│ │ or items with unclaimed/expired sessions:
│ │ SELECT TOP 1 *
│ │ FROM c
│ │ WHERE c.type = 'worker_queue'
│ │ AND c.visibleAt <= @now
│ │ AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│ │ AND c.dispatchSlot IN (@my_slots)
│ │ ORDER BY c.enqueuedAt
│ │ (Then check session ownership in application code after fetch)
│ │
│ └── No results? Return Ok(None).
│
├── 2. SESSION CHECK (if session_config provided and item has sessionId)
│ │
│ │ Read session doc: <instanceId>:session:<sessionId>
│ │ If exists and session.ownerId != my_owner_id and session.lockedUntil > now:
│ │ → Session owned by another worker. Skip this item, retry step 1.
│ │ If not exists or session expired:
│ │ → Claim session: upsert session doc with ownerId = my_owner_id,
│ │ lockedUntil = now + session_lock_timeout (conditional on ETag)
│ │ → On 412: another worker claimed it. Skip, retry step 1.
│ │
│ └── Non-session items skip this step entirely.
│
├── 3. LOCK THE WORK ITEM (conditional patch)
│ │
│ │ Patch (If-Match: item._etag):
│ │ SET lockToken = new_uuid()
│ │ SET lockedUntil = now + lock_timeout
│ │ SET attemptCount = attemptCount + 1
│ │
│ │ On 412: another worker beat us. Retry step 1.
│ │
│ └── On success: we own this work item.
│
└── 4. RETURN Ok(Some((workItem, lockToken, attemptCount)))
```
### 3.3 `ack_orchestration_item`
The most complex operation. Atomically commits a completed orchestration turn.
```
ack_orchestration_item(lock_token, execution_id, history_delta, worker_items,
orchestrator_items, metadata, cancelled_activities)
│
├── 1. VALIDATE LOCK
│ │ Read instance doc. Verify lockToken matches.
│ │ If mismatch → ProviderError::permanent("Invalid lock token")
│ │
│ └── This is a point read: ~1 RU.
│
├── 2. CLASSIFY ITEMS BY PARTITION
│ │
│ │ same_partition_worker_items: worker_items where instance matches lock_token's instance
│ │ same_partition_orch_items: orchestrator_items targeting this instance
│ │ cross_partition_items: orchestrator_items targeting OTHER instances
│ │ (sub-orch starts, sub-orch completions, detached orch starts)
│ │
│ └── Build outbox intents for cross_partition_items.
│
├── 3. TRANSACTIONAL BATCH (single partition = this instance)
│ │
│ │ All operations target instanceId partition:
│ │
│ │ a. DELETE locked orch_queue messages (by id, partition key)
│ │ For each message collected during fetch:
│ │ Delete { id: message.id }
│ │
│ │ b. CREATE history event documents
│ │ For each event in history_delta:
│ │ Create { id: "<instanceId>:history:<executionId>:<eventId>", ... }
│ │
│ │ c. CREATE worker_queue items (activities scheduled by this turn)
│ │ For each item in same_partition_worker_items:
│ │ Create { id: uuid(), type: "worker_queue", dispatchSlot: ..., ... }
│ │
│ │ d. CREATE orch_queue items targeting this instance
│ │ For each item in same_partition_orch_items:
│ │ Create { id: uuid(), type: "orch_queue", dispatchSlot: ..., ... }
│ │
│ │ e. CREATE outbox_intent documents for cross-partition items
│ │ For each item in cross_partition_items:
│ │ Create { id: "intent:<idempotencyKey>", type: "outbox_intent", ... }
│ │
│ │ f. DELETE cancelled activity entries from worker_queue
│ │ For each cancelled activity identifier:
│ │ Delete matching worker_queue item (by deterministic id)
│ │
│ │ g. UPSERT instance document
│ │ Update status, output, orchestrationName, orchestrationVersion,
│ │ currentExecutionId, updatedAt, customStatus, customStatusVersion,
│ │ pinnedDuroxideVersion.
│ │ CLEAR lockToken and lockedUntil (release lock).
│ │
│ │ If batch exceeds 100 operations:
│ │ Split into multiple sequential batches. First batch includes
│ │ instance upsert (releases lock). Subsequent batches are
│ │ best-effort (reconciler catches failures).
│ │
│ └── If batch fails → lock expires naturally. Turn will be retried.
│
├── 4. DELIVER OUTBOX INTENTS (best-effort, outside transaction)
│ │
│ │ For each outbox intent created in step 3e:
│ │ Create the target document in the target partition.
│ │ On success: delete the intent document from source partition.
│ │ On 409 Conflict: idempotent — target already exists. Delete intent.
│ │ On transient failure: leave intent as "pending" for reconciler.
│ │
│ └── Fire-and-forget. Reconciler handles stragglers.
│
└── 5. RETURN Ok(())
```
**Transactional batch 100-operation limit:**
Typical turn produces:
- 1-10 orch_queue deletes
- 1-20 history creates
- 0-10 worker_queue creates
- 0-5 orch_queue creates
- 0-3 outbox intents
- 0-5 worker_queue deletes (cancellations)
- 1 instance upsert
Total: ~5-50 operations. Well within the 100-op limit for normal workloads.
### 3.4 `ack_work_item`
```
ack_work_item(token, completion)
│
├── 1. VALIDATE LOCK
│ │ Read worker_queue item by lockToken.
│ │ If not found or lockToken mismatch → ProviderError::permanent
│ │
│ └── Point read within the item's partition.
│
├── 2. If completion is None (cancelled activity):
│ │ Delete the worker_queue item.
│ └── Return Ok(())
│
├── 3. TRANSACTIONAL BATCH (same partition as the activity's instance)
│ │
│ │ a. DELETE the locked worker_queue item
│ │ b. CREATE orch_queue item with the completion WorkItem
│ │ (ActivityCompleted or ActivityFailed targeting same instanceId)
│ │ c. UPDATE session.lastActivity if session-based
│ │
│ └── Same partition → transactional batch works.
│
└── 4. RETURN Ok(())
```
Note: `ack_work_item` is always same-partition because the activity completion targets the same instance that scheduled it. No outbox needed.
### 3.5 `abandon_orchestration_item`
```
abandon_orchestration_item(lock_token, delay, ignore_attempt)
│
├── 1. Find instance by lock_token (query: type='instance' AND lockToken=@token)
│
├── 2. Patch instance document:
│ SET lockToken = null
│ SET lockedUntil = null
│
├── 3. For each orch_queue item tagged with this lockToken:
│ PATCH:
│ SET lockToken = null
│ SET lockedUntil = null
│ SET visibleAt = now + delay (if delay provided, else now)
│ SET attemptCount = attemptCount - 1 (if ignore_attempt = true)
│
└── 4. RETURN Ok(())
```
### 3.6 `abandon_work_item`
```
abandon_work_item(token, delay, ignore_attempt)
│
├── 1. Find worker_queue item by lockToken
│
├── 2. Patch worker_queue item:
│ SET lockToken = null
│ SET lockedUntil = null
│ SET visibleAt = now + delay (if delay provided, else now)
│ SET attemptCount = attemptCount - 1 (if ignore_attempt = true)
│
└── 3. RETURN Ok(())
```
### 3.7 `renew_orchestration_item_lock`
```
renew_orchestration_item_lock(lock_token, extend_for)
│
├── 1. Find instance by lockToken
│ If not found or lockedUntil <= now → ProviderError::permanent("Lock expired")
│
├── 2. Conditional patch (If-Match: _etag):
│ SET lockedUntil = now + extend_for
│
├── 3. For each orch_queue item tagged with this lockToken:
│ PATCH SET lockedUntil = now + extend_for
│
└── 4. RETURN Ok(())
```
### 3.8 `renew_work_item_lock`
```
renew_work_item_lock(token, extend_for)
│
├── 1. Find worker_queue item by lockToken
│ If not found or lockedUntil <= now → ProviderError::permanent("Lock expired")
│
├── 2. Conditional patch (If-Match: _etag):
│ SET lockedUntil = now + extend_for
│
└── 3. RETURN Ok(())
```
### 3.9 `enqueue_for_orchestrator`
```
enqueue_for_orchestrator(item, delay)
│
├── 1. Extract instanceId from WorkItem
│
├── 2. Compute visibleAt = now + delay (or now if no delay)
│
├── 3. Compute dispatchSlot = hash(instanceId) % 256
│
├── 4. Create document in container:
│ { id: uuid(), instanceId, type: "orch_queue",
│ workItem: serialize(item), dispatchSlot,
│ visibleAt, enqueuedAt: now,
│ lockToken: null, lockedUntil: null, attemptCount: 0 }
│
│ NOTE: Do NOT create instance metadata here.
│ Instance creation happens via ack_orchestration_item metadata.
│
└── 5. RETURN Ok(())
```
### 3.10 `enqueue_for_worker`
```
enqueue_for_worker(item)
│
├── 1. Extract instanceId, executionId, activityId, sessionId from WorkItem
│
├── 2. Compute dispatchSlot = hash(instanceId) % 256
│
├── 3. Create document in container:
│ { id: uuid(), instanceId, type: "worker_queue",
│ workItem: serialize(item), dispatchSlot,
│ visibleAt: now, enqueuedAt: now,
│ lockToken: null, lockedUntil: null, attemptCount: 0,
│ executionId, activityId, sessionId }
│
└── 4. RETURN Ok(())
```
### 3.11 `read` / `read_with_execution`
```
read(instance)
│
├── 1. Get currentExecutionId from instance doc (point read)
└── 2. → read_with_execution(instance, currentExecutionId)
read_with_execution(instance, execution_id)
│
├── 1. Single-partition query:
│ SELECT c.eventData FROM c
│ WHERE c.instanceId = @instance
│ AND c.type = 'history'
│ AND c.executionId = @execution_id
│ ORDER BY c.eventId
│
├── 2. Deserialize each eventData → Event
│ (Skip events that fail to deserialize)
│
└── 3. RETURN Ok(Vec<Event>)
```
---
## 4. Transactional Outbox
### 4.1 When the Outbox Is Used
The outbox is needed when `ack_orchestration_item` produces work items that target a **different instance** than the one being acked. These cases are:
| Sub-orchestration start | Parent instance | Child instance | `StartOrchestration` |
| Sub-orchestration completion | Child instance | Parent instance | `SubOrchCompleted` / `SubOrchFailed` |
| Detached orchestration start | Coordinator instance | New instance | `StartOrchestration` |
| Cancel cascade | Parent instance | Child instance | `CancelInstance` |
### 4.2 Intent Lifecycle
```
┌──────────────────────────────────────────────┐
│ ack_orchestration_item (transactional batch) │
│ │
│ ... history, queue ops, metadata ... │
│ CREATE outbox_intent (status: "pending") │
└──────────────────┬───────────────────────────┘
│
── transaction boundary ──
│
▼
┌──────────────────────────────────────────────┐
│ Best-effort delivery (immediate, same call) │
│ │
│ Create target doc in target partition │
│ On success → delete intent │
│ On 409 → delete intent (already exists) │
│ On error → leave as "pending" │
└──────────────────┬───────────────────────────┘
│
(if delivery failed)
│
▼
┌──────────────────────────────────────────────┐
│ Background reconciler (every 2 seconds) │
│ │
│ Query: type = "outbox_intent" │
│ AND status = "pending" │
│ AND createdAt < now - 2000 │
│ │
│ For each: │
│ Create target doc in target partition │
│ On success/409 → delete intent │
│ On transient error → increment attempt, │
│ leave for next cycle │
└──────────────────────────────────────────────┘
```
### 4.3 Idempotency Key Generation
The idempotency key must be deterministic so that duplicate delivery creates a 409 instead of a duplicate work item:
```rust
fn idempotency_key(
source_instance: &str,
execution_id: u64,
event_sequence: u64, // position in the history_delta or orchestrator_items list
) -> String {
format!("{}:{}:{}", source_instance, execution_id, event_sequence)
}
```
The target document's `id` is set to a deterministic value derived from the idempotency key:
```rust
fn target_doc_id(idempotency_key: &str) -> String {
format!("outbox:{}", idempotency_key)
}
```
This ensures that creating the same target document twice results in a 409 Conflict (CosmosDB enforces unique `id` within a partition), which we treat as success.
### 4.4 Reconciler
The reconciler is a background tokio task spawned by `CosmosDBProvider::new()`:
```rust
async fn reconciler_loop(inner: Arc<CosmosDBProviderInner>) {
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
interval.tick().await;
// Cross-partition query for pending intents older than 2s
let pending = query_pending_intents(&inner, Duration::from_secs(2)).await;
for intent in pending {
match deliver_intent(&inner, &intent).await {
Ok(()) | Err(Conflict) => {
delete_intent(&inner, &intent).await.ok();
}
Err(e) if is_transient(&e) => {
// Leave for next cycle. Increment attemptCount.
increment_intent_attempt(&inner, &intent).await.ok();
}
Err(e) => {
tracing::warn!(
target: "duroxide::providers::cosmosdb",
intent_id = %intent.id,
error = %e,
"Permanent failure delivering outbox intent"
);
}
}
}
}
}
```
The reconciler query is cross-partition (intent documents are spread across source instance partitions). At low volume this is cheap (~5-20 RUs). Under load, most intents are delivered immediately and the reconciler query returns empty.
### 4.5 Shutdown
On `CosmosDBProvider::drop` or explicit shutdown, the reconciler task is cancelled via a `CancellationToken`. Pending intents remain in CosmosDB and will be delivered when the provider restarts.
---
## 5. Lease Provider
### 5.1 Trait
```rust
/// Assigns dispatch slots to concurrent dispatcher tasks.
/// Each dispatcher calls acquire_slots() to get its partition of the keyspace.
#[async_trait]
pub trait LeaseProvider: Send + Sync {
/// Get the dispatch slots assigned to this caller.
/// On first call, assigns slots. On subsequent calls, returns cached assignment.
/// caller_id: unique identifier for the calling task (e.g., tokio task ID).
async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;
/// Release slots when a dispatcher shuts down.
async fn release_slots(&self, caller_id: u64);
}
```
### 5.2 Phase 1: In-Memory Implementation
```rust
pub struct InMemoryLeaseProvider {
total: u32,
next_index: AtomicU32,
assignments: DashMap<u64, Vec<u8>>,
}
impl InMemoryLeaseProvider {
pub fn new(total_dispatchers: u32) -> Self {
Self {
total: total_dispatchers,
next_index: AtomicU32::new(0),
assignments: DashMap::new(),
}
}
}
#[async_trait]
impl LeaseProvider for InMemoryLeaseProvider {
async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
self.assignments
.entry(caller_id)
.or_insert_with(|| {
let index = self.next_index.fetch_add(1, Ordering::SeqCst);
(0u16..256)
.filter(|s| (*s as u32) % self.total == index)
.map(|s| s as u8)
.collect()
})
.clone()
}
async fn release_slots(&self, caller_id: u64) {
self.assignments.remove(&caller_id);
}
}
```
**Slot distribution example** with `total_dispatchers = 3`:
| 0 | 0 | 0, 3, 6, 9, ..., 255 | 86 |
| 1 | 1 | 1, 4, 7, 10, ..., 253 | 85 |
| 2 | 2 | 2, 5, 8, 11, ..., 254 | 85 |
### 5.3 Phase 2: CosmosDB-Backed Implementation (Future)
Same `LeaseProvider` trait. Uses lease documents in a `__leases__` partition for cross-runtime coordination. Each runtime's dispatchers claim non-overlapping slots via optimistic concurrency. Heartbeat renewal, expiry-based takeover for failover.
### 5.4 Usage in Provider
```rust
struct CosmosDBProviderInner {
// ...
orch_leases: Box<dyn LeaseProvider>,
worker_leases: Box<dyn LeaseProvider>,
}
// In fetch_orchestration_item:
let caller_id = tokio::task::id().0;
let my_slots = self.inner.orch_leases.acquire_slots(caller_id).await;
// Use my_slots in query: AND c.dispatchSlot IN (@my_slots)
// In fetch_work_item:
let caller_id = tokio::task::id().0;
let my_slots = self.inner.worker_leases.acquire_slots(caller_id).await;
// Use my_slots in query: AND c.dispatchSlot IN (@my_slots)
```
---
## 6. ProviderAdmin Implementation
### 6.1 Instance Management
| `list_instances()` | Cross-partition query: `SELECT c.instanceId FROM c WHERE c.type = 'instance'` |
| `list_instances_by_status(status)` | Cross-partition query: `... AND c.status = @status` |
| `get_instance_info(instance)` | Point read: `<instanceId>:instance` |
| `latest_execution_id(instance)` | Point read instance doc → `currentExecutionId` |
### 6.2 Execution Management
| `list_executions(instance)` | Single-partition query: `SELECT DISTINCT c.executionId FROM c WHERE c.instanceId = @instance AND c.type = 'history'` |
| `read_history(instance)` | `read_history_with_execution_id(instance, latest_execution_id)` |
| `read_history_with_execution_id(instance, exec_id)` | Single-partition query ordered by eventId |
| `get_execution_info(instance, exec_id)` | Derive from history events: first=started_at, last completed/failed=status, count events |
### 6.3 Metrics
| `get_system_metrics()` | Cross-partition aggregation: `SELECT VALUE { total: COUNT(1), running: COUNT(c.status = 'Running'), ... } FROM c WHERE c.type = 'instance'` |
| `get_queue_depths()` | Cross-partition count: `SELECT COUNT(1) FROM c WHERE c.type = 'orch_queue' AND c.visibleAt <= @now` + same for worker_queue |
### 6.4 Hierarchy
| `list_children(instance)` | Cross-partition query: `SELECT c.instanceId FROM c WHERE c.type = 'instance' AND c.parentInstanceId = @instance` |
| `get_parent_id(instance)` | Point read instance doc → `parentInstanceId` |
### 6.5 Deletion
```
delete_instances_atomic(ids, force)
│
├── 1. For each id: read instance doc
│ If status = "Running" and force = false → ProviderError::permanent
│
├── 2. Build instance tree (all descendants via list_children recursively)
│
├── 3. Check for orphans: if deleting a parent without all children → error
│
├── 4. For each instance in the tree (leaf-first):
│ Transactional batch (single partition per instance):
│ Delete all documents where instanceId = @id
│ (instance, history, orch_queue, worker_queue, outbox_intent, session)
│
│ Note: each instance is its own partition, so a single transactional
│ batch deletes everything for that instance atomically.
│
└── 5. RETURN DeleteInstanceResult { counts }
```
### 6.6 Pruning
```
prune_executions(instance, options)
│
├── 1. List all executionIds for instance
├── 2. Determine which to keep (latest + keep_last + completed_before filter)
├── 3. For each execution to prune:
│ Delete all history docs for that executionId (single-partition batch)
├── 4. If execution's instance doc references a pruned executionId, update it
└── 5. RETURN PruneResult { counts }
```
### 6.7 Custom Status
```
get_custom_status(instance, last_seen_version)
│
├── 1. Point read instance doc
├── 2. If customStatusVersion > last_seen_version:
│ Return Some((customStatus, customStatusVersion))
└── 3. Else: Return None (no change since last poll)
```
---
## 7. Error Handling
### 7.1 CosmosDB Error → ProviderError Mapping
| Conflict | 409 | `retryable` | ETag mismatch (optimistic concurrency) |
| TooManyRequests | 429 | `retryable` | Rate limited, retry after backoff |
| RequestTimeout | 408 | `retryable` | Transient timeout |
| ServiceUnavailable | 503 | `retryable` | Transient availability issue |
| NotFound | 404 | `permanent` | Document doesn't exist |
| PreconditionFailed | 412 | `retryable` | ETag mismatch on conditional write |
| BadRequest | 400 | `permanent` | Malformed query or document |
| RequestEntityTooLarge | 413 | `permanent` | Batch too large |
### 7.2 Retry Strategy
```rust
const MAX_RETRIES: u32 = 3;
const BASE_RETRY_DELAY_MS: u64 = 100;
async fn with_retry<F, T>(operation: &str, f: F) -> Result<T, ProviderError>
where
F: Fn() -> Future<Output = Result<T, CosmosDBError>>,
{
for attempt in 0..=MAX_RETRIES {
match f().await {
Ok(result) => return Ok(result),
Err(e) => {
let provider_err = map_cosmosdb_error(operation, &e);
if provider_err.is_retryable() && attempt < MAX_RETRIES {
let delay = BASE_RETRY_DELAY_MS * (attempt as u64 + 1);
tokio::time::sleep(Duration::from_millis(delay)).await;
continue;
}
return Err(provider_err);
}
}
}
unreachable!()
}
```
429 responses include a `Retry-After` header. The retry logic should respect that value when present.
---
## 8. Crate Structure
```
duroxide-cdb/
├── Cargo.toml
├── SPEC.md ← this document
├── README.md
├── CHANGELOG.md
├── .env.example # COSMOSDB_ENDPOINT, COSMOSDB_KEY, COSMOSDB_DATABASE
├── src/
│ ├── lib.rs # Re-export CosmosDBProvider
│ ├── provider.rs # Provider + ProviderAdmin trait implementations
│ ├── client.rs # CosmosDBClient wrapper, connection config
│ ├── containers.rs # Container initialization, indexing policy setup
│ ├── models.rs # Document types (serde structs for all 6 doc types)
│ ├── batch.rs # Transactional batch builder helpers
│ ├── query.rs # Query builders (cross-partition, single-partition)
│ ├── outbox.rs # Outbox intent delivery + reconciler
│ ├── leases.rs # LeaseProvider trait + InMemoryLeaseProvider
│ └── errors.rs # CosmosDB error → ProviderError mapping
├── tests/
│ ├── common/
│ │ └── mod.rs # Test helpers: create_cosmosdb_store, cleanup
│ ├── cosmosdb_provider_test.rs # Provider validation tests (157 tests)
│ ├── e2e_samples.rs # Ported e2e sample tests
│ ├── stress_tests.rs # Stress tests
│ └── session_e2e_tests.rs # Session affinity e2e tests
└── cdb-stress/ # Stress test binary
├── Cargo.toml
└── src/
├── lib.rs
└── bin/
└── cdb-stress.rs
```
### Dependencies
```toml
[package]
name = "duroxide-cdb"
version = "0.1.0"
edition = "2021"
[dependencies]
duroxide = { version = "0.1.20" }
azure_data_cosmos = "0.22"
azure_core = "0.22"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4"] }
chrono = "0.4"
tracing = "0.1"
anyhow = "1"
dashmap = "6"
tokio-util = "0.7"
[dev-dependencies]
duroxide = { version = "0.1.20", features = ["provider-test"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dotenvy = "0.15"
semver = "1"
```
---
## 9. Configuration
```rust
pub struct CosmosDBProviderConfig {
/// CosmosDB endpoint URL
pub endpoint: String,
/// CosmosDB primary key
pub key: String,
/// Database name
pub database: String,
/// Container name (default: "duroxide")
pub container: String,
/// Number of orchestration dispatchers (for lease partitioning)
pub orch_concurrency: u32,
/// Number of worker dispatchers (for lease partitioning)
pub worker_concurrency: u32,
/// Outbox reconciler interval (default: 2s)
pub reconciler_interval: Duration,
/// Outbox intent age threshold before reconciler picks it up (default: 2s)
pub reconciler_age_threshold: Duration,
}
impl Default for CosmosDBProviderConfig {
fn default() -> Self {
Self {
endpoint: String::new(),
key: String::new(),
database: "duroxide".to_string(),
container: "duroxide".to_string(),
orch_concurrency: 1,
worker_concurrency: 1,
reconciler_interval: Duration::from_secs(2),
reconciler_age_threshold: Duration::from_secs(2),
}
}
}
```
### Environment Variables
| `COSMOSDB_ENDPOINT` | CosmosDB account endpoint | required |
| `COSMOSDB_KEY` | CosmosDB account key | required |
| `COSMOSDB_DATABASE` | Database name | `duroxide` |
| `COSMOSDB_CONTAINER` | Container name | `duroxide` |
---
## 10. Testing Strategy
### 10.1 Provider Validation Tests (157 tests)
Implement `ProviderFactory` for `CosmosDBProvider`. Each test gets an isolated set of instances (unique instance ID prefixes). Cleanup deletes all documents with matching prefixes.
### 10.2 E2E Sample Tests
Port all 17 e2e samples from duroxide-pg. Replace `create_postgres_store()` with `create_cosmosdb_store()`. Adjust timeouts for CosmosDB latency (use 30-60s instead of 5-10s).
### 10.3 Stress Tests
Implement `ProviderStressFactory`. Start with conservative config (max_concurrent: 5, duration: 10s). Target 100% success rate before optimizing throughput.
### 10.4 Test Infrastructure
- **Local:** CosmosDB Linux emulator (Docker)
- **CI:** CosmosDB Linux emulator (Docker) or dedicated test account (serverless tier for cost)
- **Isolation:** Each test generates unique instance ID prefixes. Cleanup queries and deletes all documents with that prefix.
---
## 11. Implementation Phases
### Phase 1: Core Provider (MVP)
Goal: Pass all 157 provider validation tests.
| 1.1 | Scaffolding: Cargo.toml, models.rs, containers.rs, lib.rs | 1 day |
| 1.2 | client.rs, errors.rs, leases.rs (InMemoryLeaseProvider) | 1 day |
| 1.3 | Read path: `read`, `read_with_execution`, `append_with_execution` | 1 day |
| 1.4 | Enqueue: `enqueue_for_orchestrator`, `enqueue_for_worker` | 1 day |
| 1.5 | Fetch: `fetch_orchestration_item`, `fetch_work_item` | 2-3 days |
| 1.6 | Ack: `ack_orchestration_item`, `ack_work_item`, batch.rs | 2-3 days |
| 1.7 | Outbox: outbox.rs, reconciler background task | 1-2 days |
| 1.8 | Lock management: abandon, renew (orch + worker) | 1-2 days |
| 1.9 | Sessions: session routing, renew, cleanup | 1-2 days |
| 1.10 | Custom status: `get_custom_status` | 0.5 day |
| 1.11 | Validation tests: wire up all 157, debug & fix | 3-5 days |
### Phase 2: ProviderAdmin + E2E
| 2.1 | ProviderAdmin: list, get_info, metrics, hierarchy | 2-3 days |
| 2.2 | Deletion: atomic, bulk, cascade | 2 days |
| 2.3 | Pruning: prune_executions, prune_bulk | 1 day |
| 2.4 | E2E sample tests | 2-3 days |
| 2.5 | Session E2E tests | 1 day |
### Phase 3: Stress & Polish
| 3.1 | Stress tests + cdb-stress binary | 1-2 days |
| 3.2 | README, CHANGELOG, CI/CD workflow | 1 day |
| 3.3 | Performance tuning, RU optimization | 1-2 days |
### Phase 4: Multi-Runtime Leases (Future)
| 4.1 | CosmosDBLeaseProvider implementation | 2-3 days |
| 4.2 | Lease heartbeat, expiry, takeover | 2 days |
| 4.3 | Integration testing with multiple runtimes | 2 days |
**Total Phase 1-3: ~4-5 weeks**