duroxide-cdb 0.1.10

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
# duroxide-cdb: CosmosDB Provider Specification

**Status:** Draft  
**Date:** 2026-02-23  
**Authors:** @affandar

---

## 1. Overview

`duroxide-cdb` is a CosmosDB NoSQL API provider for [duroxide](https://github.com/microsoft/duroxide), implementing the `Provider` and `ProviderAdmin` traits. It stores orchestration state, event history, and work queues in a single CosmosDB container using document-type discrimination.

### Design Principles

1. **Single container, partition-per-instance.** All documents for an orchestration instance live in one logical partition. Operations within an instance use CosmosDB transactional batch.
2. **Transactional outbox for cross-partition writes.** Sub-orchestration starts, completions, and detached orchestration starts use an intent log with best-effort delivery and background reconciliation.
3. **Optimistic concurrency via ETags.** Instance locking and queue item locking use CosmosDB conditional writes instead of database-level locks.
4. **Lease-based dispatcher partitioning.** Concurrent dispatchers within a runtime partition the keyspace to avoid contention. Phase 1: in-memory. Phase 2: CosmosDB-backed leases for multi-runtime coordination.
5. **Short polling at 1/s.** No long polling or change feed in Phase 1.

---

## 2. Data Model

### 2.1 Container

A single CosmosDB container named `duroxide` (configurable).

- **Partition key:** `/instanceId`
- **Unique key policy:** `/id` (default)

All document types coexist in this container, discriminated by the `type` field.

### 2.2 Document Types

#### 2.2.1 Instance Document

One per orchestration instance. Holds metadata, lock state, and custom status.

```json
{
  "id": "<instanceId>:instance",
  "instanceId": "<instanceId>",
  "type": "instance",

  "orchestrationName": "MyOrchestration",
  "orchestrationVersion": "1.0.0",
  "currentExecutionId": 1,
  "status": "Running",
  "output": null,
  "parentInstanceId": null,
  "pinnedDuroxideVersion": null,

  "customStatus": null,
  "customStatusVersion": 0,

  "lockToken": null,
  "lockedUntil": null,

  "createdAt": 1740000000000,
  "updatedAt": 1740000000000
}
```

**Fields:**

| Field | Type | Description |
|-------|------|-------------|
| `id` | string | `<instanceId>:instance` — deterministic, one per instance |
| `instanceId` | string | Partition key. The orchestration instance identifier. |
| `type` | string | Always `"instance"` |
| `orchestrationName` | string | Registered orchestration name |
| `orchestrationVersion` | string | Semver version |
| `currentExecutionId` | i64 | Latest execution ID (increments on ContinueAsNew) |
| `status` | string | `"Running"`, `"Completed"`, `"Failed"`, `"ContinuedAsNew"` |
| `output` | string? | Serialized orchestration output (on completion/failure) |
| `parentInstanceId` | string? | Parent instance ID for sub-orchestrations |
| `pinnedDuroxideVersion` | object? | `{ major, minor, patch }` — capability filtering |
| `customStatus` | string? | User-set custom status |
| `customStatusVersion` | i64 | Monotonic version for custom status polling |
| `lockToken` | string? | UUID held by the dispatcher processing this instance |
| `lockedUntil` | i64? | Epoch ms when the lock expires |
| `createdAt` | i64 | Epoch ms |
| `updatedAt` | i64 | Epoch ms |

#### 2.2.2 History Event Document

One per event in the orchestration history. Append-only.

```json
{
  "id": "<instanceId>:history:<executionId>:<eventId>",
  "instanceId": "<instanceId>",
  "type": "history",

  "executionId": 1,
  "eventId": 1,
  "eventData": "{...serialized duroxide::Event...}"
}
```

**Fields:**

| Field | Type | Description |
|-------|------|-------------|
| `id` | string | Deterministic composite key |
| `instanceId` | string | Partition key |
| `type` | string | Always `"history"` |
| `executionId` | i64 | Execution this event belongs to |
| `eventId` | i64 | Monotonically increasing within an execution |
| `eventData` | string | JSON-serialized `duroxide::Event` |

#### 2.2.3 Orchestrator Queue Item

Work items destined for the orchestration dispatcher. Includes `StartOrchestration`, `ActivityCompleted`, `ActivityFailed`, `TimerFired`, `ExternalRaised`, `SubOrchCompleted`, `SubOrchFailed`, `CancelInstance`, `ContinueAsNew`, `QueueMessage`.

```json
{
  "id": "<uuid>",
  "instanceId": "<instanceId>",
  "type": "orch_queue",

  "workItem": "{...serialized duroxide::WorkItem...}",
  "dispatchSlot": 42,

  "visibleAt": 1740000000000,
  "enqueuedAt": 1740000000000,

  "lockToken": null,
  "lockedUntil": null,
  "attemptCount": 0
}
```

**Fields:**

| Field | Type | Description |
|-------|------|-------------|
| `id` | string | Random UUID — multiple queue items per instance |
| `instanceId` | string | Partition key |
| `type` | string | Always `"orch_queue"` |
| `workItem` | string | JSON-serialized `duroxide::WorkItem` |
| `dispatchSlot` | u8 | `hash(instanceId) % 256` — precomputed for dispatcher partitioning |
| `visibleAt` | i64 | Epoch ms. Item not fetchable before this time. Used for timers and delayed visibility. |
| `enqueuedAt` | i64 | Epoch ms. Determines FIFO ordering. |
| `lockToken` | string? | Set when a dispatcher locks this message batch |
| `lockedUntil` | i64? | Epoch ms. Lock expires after this time. |
| `attemptCount` | i32 | Incremented each time the item is fetched. For poison message detection. |

#### 2.2.4 Worker Queue Item

Work items destined for the activity worker dispatcher. Includes `ActivityExecute`.

```json
{
  "id": "<uuid>",
  "instanceId": "<instanceId>",
  "type": "worker_queue",

  "workItem": "{...serialized duroxide::WorkItem...}",
  "dispatchSlot": 42,

  "visibleAt": 1740000000000,
  "enqueuedAt": 1740000000000,

  "lockToken": null,
  "lockedUntil": null,
  "attemptCount": 0,

  "executionId": 1,
  "activityId": 3,
  "sessionId": null
}
```

**Additional fields over orch_queue:**

| Field | Type | Description |
|-------|------|-------------|
| `executionId` | i64? | Execution that scheduled this activity |
| `activityId` | i64? | Activity ID within the execution |
| `sessionId` | string? | Session ID for session affinity routing |

#### 2.2.5 Outbox Intent Document

Written in the same partition as the source instance. Represents a cross-partition write that needs delivery.

```json
{
  "id": "intent:<deterministic-key>",
  "instanceId": "<source-instance-id>",
  "type": "outbox_intent",

  "targetInstanceId": "<target-instance-id>",
  "targetDocumentType": "orch_queue",
  "payload": "{...the full document to create in the target partition...}",
  "idempotencyKey": "<deterministic-key>",

  "status": "pending",
  "createdAt": 1740000000000,
  "attemptCount": 0,
  "lastAttemptAt": null
}
```

**Fields:**

| Field | Type | Description |
|-------|------|-------------|
| `id` | string | `intent:<idempotencyKey>` — deterministic for dedup |
| `instanceId` | string | Partition key = source instance (same partition as the batch that created it) |
| `type` | string | Always `"outbox_intent"` |
| `targetInstanceId` | string | Destination partition |
| `targetDocumentType` | string | `"orch_queue"` or `"worker_queue"` |
| `payload` | string | The complete JSON document to be created in the target partition |
| `idempotencyKey` | string | Deterministic key derived from source context (e.g., `<sourceInstance>:<executionId>:<eventSequence>`) for idempotent delivery |
| `status` | string | `"pending"` or `"delivered"` |
| `createdAt` | i64 | Epoch ms |
| `attemptCount` | i32 | Delivery attempts so far |
| `lastAttemptAt` | i64? | Epoch ms of last delivery attempt |

#### 2.2.6 Session Document

Tracks session affinity: which worker owns a session.

```json
{
  "id": "<instanceId>:session:<sessionId>",
  "instanceId": "<instanceId>",
  "type": "session",

  "sessionId": "session-abc",
  "ownerId": "worker-1",
  "lockedUntil": 1740000030000,
  "lastActivity": 1740000000000,
  "createdAt": 1740000000000
}
```

### 2.3 Indexing Policy

```json
{
  "indexingMode": "consistent",
  "automatic": true,
  "includedPaths": [
    { "path": "/type/?" },
    { "path": "/instanceId/?" },
    { "path": "/dispatchSlot/?" },
    { "path": "/visibleAt/?" },
    { "path": "/enqueuedAt/?" },
    { "path": "/lockedUntil/?" },
    { "path": "/executionId/?" },
    { "path": "/eventId/?" },
    { "path": "/status/?" },
    { "path": "/sessionId/?" },
    { "path": "/ownerId/?" },
    { "path": "/parentInstanceId/?" },
    { "path": "/createdAt/?" },
    { "path": "/customStatusVersion/?" }
  ],
  "excludedPaths": [
    { "path": "/eventData/*" },
    { "path": "/workItem/*" },
    { "path": "/payload/*" },
    { "path": "/output/*" },
    { "path": "/customStatus/*" },
    { "path": "/*" }
  ],
  "compositeIndexes": [
    [
      { "path": "/type", "order": "ascending" },
      { "path": "/dispatchSlot", "order": "ascending" },
      { "path": "/visibleAt", "order": "ascending" },
      { "path": "/enqueuedAt", "order": "ascending" }
    ],
    [
      { "path": "/type", "order": "ascending" },
      { "path": "/executionId", "order": "ascending" },
      { "path": "/eventId", "order": "ascending" }
    ],
    [
      { "path": "/type", "order": "ascending" },
      { "path": "/status", "order": "ascending" }
    ]
  ]
}
```

**Rationale:**
- Exclude `eventData`, `workItem`, `payload`, `output`, `customStatus` — large blobs never queried by content.
- First composite index: serves `fetch_orchestration_item` and `fetch_work_item` cross-partition queries with slot-based partitioning.
- Second composite index: serves history reads within a partition (ordered by executionId, eventId).
- Third composite index: serves `list_instances_by_status` cross-partition queries.

### 2.4 Dispatch Slot Computation

Every document written to `orch_queue` or `worker_queue` includes a precomputed `dispatchSlot`:

```rust
fn dispatch_slot(instance_id: &str) -> u8 {
    use std::hash::{Hash, Hasher};
    let mut hasher = std::collections::hash_map::DefaultHasher::new();
    instance_id.hash(&mut hasher);
    (hasher.finish() % 256) as u8
}
```

256 slots are distributed across dispatchers via the lease provider. Each dispatcher only queries for items in its assigned slots, eliminating intra-runtime contention.

---

## 3. Algorithms

### 3.1 `fetch_orchestration_item`

```
fetch_orchestration_item(lock_timeout, poll_timeout, capability_filter)
│
├── 0. GET MY SLOTS
│   caller_id = tokio::task::id()
│   my_slots = orch_lease_provider.acquire_slots(caller_id)
│
├── 1. FIND CANDIDATE (cross-partition query)
│   SELECT TOP 1 c.id, c.instanceId, c._etag
│   FROM c
│   WHERE c.type = 'orch_queue'
│     AND c.visibleAt <= @now
│     AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│     AND c.dispatchSlot IN (@my_slots)
│     -- Capability filter (if provided):
│     AND (NOT IS_DEFINED(c.pinnedVersionPacked)
│          OR c.pinnedVersionPacked BETWEEN @minPacked AND @maxPacked)
│   ORDER BY c.enqueuedAt
│
│   → No results? Return Ok(None). Runtime sleeps 1s, calls again.
│
├── 2. LOCK THE INSTANCE (point read + conditional patch)
│   │
│   │  Read: GET /instanceId:instance (partition: instanceId)
│   │        → instance doc with _etag
│   │
│   │  Guard: if instance.lockedUntil > now → instance locked by another
│   │         dispatcher (possible cross-runtime contention).
│   │         Exclude this instanceId, retry step 1.
│   │
│   │  Conditional patch (If-Match: instance._etag):
│   │    SET lockToken = new_uuid()
│   │    SET lockedUntil = now + lock_timeout
│   │
│   │  On 412 Precondition Failed:
│   │    → ETag race. Exclude instanceId, retry step 1.
│   │
│   │  Max 3 retries with exclusion list. After 3 → return Ok(None).
│   │
│   └── On success: we own this instance.
│
├── 3. COLLECT ALL PENDING MESSAGES (single-partition query)
│   │
│   │  SELECT *
│   │  FROM c
│   │  WHERE c.instanceId = @instanceId
│   │    AND c.type = 'orch_queue'
│   │    AND c.visibleAt <= @now
│   │    AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│   │
│   │  If zero messages:
│   │    → Messages were consumed between step 1 and 3.
│   │    → Unlock instance (patch lockedUntil = null, lockToken = null).
│   │    → Retry step 1.
│   │
│   │  Tag all collected messages with the lock:
│   │    Transactional batch (same partition):
│   │      For each message:
│   │        PATCH c.lockToken = @lockToken
│   │        PATCH c.lockedUntil = now + lock_timeout
│   │        PATCH c.attemptCount = c.attemptCount + 1
│   │
│   │  Record MAX(attemptCount) across batch → poison detection.
│   │
│   └── Deserialize each message.workItem → Vec<WorkItem>
│
├── 4. FETCH HISTORY (single-partition query)
│   │
│   │  SELECT c.eventData
│   │  FROM c
│   │  WHERE c.instanceId = @instanceId
│   │    AND c.type = 'history'
│   │    AND c.executionId = @currentExecutionId
│   │  ORDER BY c.eventId
│   │
│   │  Deserialize → Vec<Event>
│   │    On success: history = events, history_error = None
│   │    On failure: history = vec![], history_error = Some(error_msg)
│   │
│   └── Cost: proportional to history size.
│
├── 5. BUILD OrchestrationItem
│   │  { instance, orchestration_name, execution_id, version,
│   │    history, messages, history_error }
│   │
│   └── Read orchestration_name, version, currentExecutionId from instance doc.
│
└── 6. RETURN Ok(Some((item, lockToken, attemptCount)))
```

### 3.2 `fetch_work_item`

Same structure as `fetch_orchestration_item` but simpler — no instance-level lock, no history fetch, no message batching.

```
fetch_work_item(lock_timeout, poll_timeout, session_config)
│
├── 0. GET MY SLOTS
│   caller_id = tokio::task::id()
│   my_slots = worker_lease_provider.acquire_slots(caller_id)
│
├── 1. FIND AND LOCK A WORK ITEM (cross-partition query)
│   │
│   │  Build query based on session_config:
│   │
│   │  If session_config = None:
│   │    → Fetch any non-session item:
│   │    SELECT TOP 1 *
│   │    FROM c
│   │    WHERE c.type = 'worker_queue'
│   │      AND c.visibleAt <= @now
│   │      AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│   │      AND (NOT IS_DEFINED(c.sessionId) OR c.sessionId = null)
│   │      AND c.dispatchSlot IN (@my_slots)
│   │    ORDER BY c.enqueuedAt
│   │
│   │  If session_config = Some({ owner_id, lock_timeout }):
│   │    → Prefer items from sessions owned by this worker,
│   │      or items with unclaimed/expired sessions:
│   │    SELECT TOP 1 *
│   │    FROM c
│   │    WHERE c.type = 'worker_queue'
│   │      AND c.visibleAt <= @now
│   │      AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil <= @now)
│   │      AND c.dispatchSlot IN (@my_slots)
│   │    ORDER BY c.enqueuedAt
│   │    (Then check session ownership in application code after fetch)
│   │
│   └── No results? Return Ok(None).
│
├── 2. SESSION CHECK (if session_config provided and item has sessionId)
│   │
│   │  Read session doc: <instanceId>:session:<sessionId>
│   │    If exists and session.ownerId != my_owner_id and session.lockedUntil > now:
│   │      → Session owned by another worker. Skip this item, retry step 1.
│   │    If not exists or session expired:
│   │      → Claim session: upsert session doc with ownerId = my_owner_id,
│   │        lockedUntil = now + session_lock_timeout (conditional on ETag)
│   │      → On 412: another worker claimed it. Skip, retry step 1.
│   │
│   └── Non-session items skip this step entirely.
│
├── 3. LOCK THE WORK ITEM (conditional patch)
│   │
│   │  Patch (If-Match: item._etag):
│   │    SET lockToken = new_uuid()
│   │    SET lockedUntil = now + lock_timeout
│   │    SET attemptCount = attemptCount + 1
│   │
│   │  On 412: another worker beat us. Retry step 1.
│   │
│   └── On success: we own this work item.
│
└── 4. RETURN Ok(Some((workItem, lockToken, attemptCount)))
```

### 3.3 `ack_orchestration_item`

The most complex operation. Atomically commits a completed orchestration turn.

```
ack_orchestration_item(lock_token, execution_id, history_delta, worker_items,
                       orchestrator_items, metadata, cancelled_activities)
│
├── 1. VALIDATE LOCK
│   │  Read instance doc. Verify lockToken matches.
│   │  If mismatch → ProviderError::permanent("Invalid lock token")
│   │
│   └── This is a point read: ~1 RU.
│
├── 2. CLASSIFY ITEMS BY PARTITION
│   │
│   │  same_partition_worker_items: worker_items where instance matches lock_token's instance
│   │  same_partition_orch_items: orchestrator_items targeting this instance
│   │  cross_partition_items: orchestrator_items targeting OTHER instances
│   │    (sub-orch starts, sub-orch completions, detached orch starts)
│   │
│   └── Build outbox intents for cross_partition_items.
│
├── 3. TRANSACTIONAL BATCH (single partition = this instance)
│   │
│   │  All operations target instanceId partition:
│   │
│   │  a. DELETE locked orch_queue messages (by id, partition key)
│   │     For each message collected during fetch:
│   │       Delete { id: message.id }
│   │
│   │  b. CREATE history event documents
│   │     For each event in history_delta:
│   │       Create { id: "<instanceId>:history:<executionId>:<eventId>", ... }
│   │
│   │  c. CREATE worker_queue items (activities scheduled by this turn)
│   │     For each item in same_partition_worker_items:
│   │       Create { id: uuid(), type: "worker_queue", dispatchSlot: ..., ... }
│   │
│   │  d. CREATE orch_queue items targeting this instance
│   │     For each item in same_partition_orch_items:
│   │       Create { id: uuid(), type: "orch_queue", dispatchSlot: ..., ... }
│   │
│   │  e. CREATE outbox_intent documents for cross-partition items
│   │     For each item in cross_partition_items:
│   │       Create { id: "intent:<idempotencyKey>", type: "outbox_intent", ... }
│   │
│   │  f. DELETE cancelled activity entries from worker_queue
│   │     For each cancelled activity identifier:
│   │       Delete matching worker_queue item (by deterministic id)
│   │
│   │  g. UPSERT instance document
│   │     Update status, output, orchestrationName, orchestrationVersion,
│   │     currentExecutionId, updatedAt, customStatus, customStatusVersion,
│   │     pinnedDuroxideVersion.
│   │     CLEAR lockToken and lockedUntil (release lock).
│   │
│   │  If batch exceeds 100 operations:
│   │    Split into multiple sequential batches. First batch includes
│   │    instance upsert (releases lock). Subsequent batches are
│   │    best-effort (reconciler catches failures).
│   │
│   └── If batch fails → lock expires naturally. Turn will be retried.
│
├── 4. DELIVER OUTBOX INTENTS (best-effort, outside transaction)
│   │
│   │  For each outbox intent created in step 3e:
│   │    Create the target document in the target partition.
│   │    On success: delete the intent document from source partition.
│   │    On 409 Conflict: idempotent — target already exists. Delete intent.
│   │    On transient failure: leave intent as "pending" for reconciler.
│   │
│   └── Fire-and-forget. Reconciler handles stragglers.
│
└── 5. RETURN Ok(())
```

**Transactional batch 100-operation limit:**

Typical turn produces:
- 1-10 orch_queue deletes
- 1-20 history creates
- 0-10 worker_queue creates
- 0-5 orch_queue creates
- 0-3 outbox intents
- 0-5 worker_queue deletes (cancellations)
- 1 instance upsert

Total: ~5-50 operations. Well within the 100-op limit for normal workloads.

### 3.4 `ack_work_item`

```
ack_work_item(token, completion)
│
├── 1. VALIDATE LOCK
│   │  Read worker_queue item by lockToken.
│   │  If not found or lockToken mismatch → ProviderError::permanent
│   │
│   └── Point read within the item's partition.
│
├── 2. If completion is None (cancelled activity):
│   │  Delete the worker_queue item.
│   └── Return Ok(())
│
├── 3. TRANSACTIONAL BATCH (same partition as the activity's instance)
│   │
│   │  a. DELETE the locked worker_queue item
│   │  b. CREATE orch_queue item with the completion WorkItem
│   │     (ActivityCompleted or ActivityFailed targeting same instanceId)
│   │  c. UPDATE session.lastActivity if session-based
│   │
│   └── Same partition → transactional batch works.
│
└── 4. RETURN Ok(())
```

Note: `ack_work_item` is always same-partition because the activity completion targets the same instance that scheduled it. No outbox needed.

### 3.5 `abandon_orchestration_item`

```
abandon_orchestration_item(lock_token, delay, ignore_attempt)
│
├── 1. Find instance by lock_token (query: type='instance' AND lockToken=@token)
│
├── 2. Patch instance document:
│     SET lockToken = null
│     SET lockedUntil = null
│
├── 3. For each orch_queue item tagged with this lockToken:
│     PATCH:
│       SET lockToken = null
│       SET lockedUntil = null
│       SET visibleAt = now + delay (if delay provided, else now)
│       SET attemptCount = attemptCount - 1 (if ignore_attempt = true)
│
└── 4. RETURN Ok(())
```

### 3.6 `abandon_work_item`

```
abandon_work_item(token, delay, ignore_attempt)
│
├── 1. Find worker_queue item by lockToken
│
├── 2. Patch worker_queue item:
│     SET lockToken = null
│     SET lockedUntil = null
│     SET visibleAt = now + delay (if delay provided, else now)
│     SET attemptCount = attemptCount - 1 (if ignore_attempt = true)
│
└── 3. RETURN Ok(())
```

### 3.7 `renew_orchestration_item_lock`

```
renew_orchestration_item_lock(lock_token, extend_for)
│
├── 1. Find instance by lockToken
│     If not found or lockedUntil <= now → ProviderError::permanent("Lock expired")
│
├── 2. Conditional patch (If-Match: _etag):
│     SET lockedUntil = now + extend_for
│
├── 3. For each orch_queue item tagged with this lockToken:
│     PATCH SET lockedUntil = now + extend_for
│
└── 4. RETURN Ok(())
```

### 3.8 `renew_work_item_lock`

```
renew_work_item_lock(token, extend_for)
│
├── 1. Find worker_queue item by lockToken
│     If not found or lockedUntil <= now → ProviderError::permanent("Lock expired")
│
├── 2. Conditional patch (If-Match: _etag):
│     SET lockedUntil = now + extend_for
│
└── 3. RETURN Ok(())
```

### 3.9 `enqueue_for_orchestrator`

```
enqueue_for_orchestrator(item, delay)
│
├── 1. Extract instanceId from WorkItem
│
├── 2. Compute visibleAt = now + delay (or now if no delay)
│
├── 3. Compute dispatchSlot = hash(instanceId) % 256
│
├── 4. Create document in container:
│     { id: uuid(), instanceId, type: "orch_queue",
│       workItem: serialize(item), dispatchSlot,
│       visibleAt, enqueuedAt: now,
│       lockToken: null, lockedUntil: null, attemptCount: 0 }
│
│     NOTE: Do NOT create instance metadata here.
│     Instance creation happens via ack_orchestration_item metadata.
│
└── 5. RETURN Ok(())
```

### 3.10 `enqueue_for_worker`

```
enqueue_for_worker(item)
│
├── 1. Extract instanceId, executionId, activityId, sessionId from WorkItem
│
├── 2. Compute dispatchSlot = hash(instanceId) % 256
│
├── 3. Create document in container:
│     { id: uuid(), instanceId, type: "worker_queue",
│       workItem: serialize(item), dispatchSlot,
│       visibleAt: now, enqueuedAt: now,
│       lockToken: null, lockedUntil: null, attemptCount: 0,
│       executionId, activityId, sessionId }
│
└── 4. RETURN Ok(())
```

### 3.11 `read` / `read_with_execution`

```
read(instance)
│
├── 1. Get currentExecutionId from instance doc (point read)
└── 2. → read_with_execution(instance, currentExecutionId)

read_with_execution(instance, execution_id)
│
├── 1. Single-partition query:
│     SELECT c.eventData FROM c
│     WHERE c.instanceId = @instance
│       AND c.type = 'history'
│       AND c.executionId = @execution_id
│     ORDER BY c.eventId
│
├── 2. Deserialize each eventData → Event
│     (Skip events that fail to deserialize)
│
└── 3. RETURN Ok(Vec<Event>)
```

---

## 4. Transactional Outbox

### 4.1 When the Outbox Is Used

The outbox is needed when `ack_orchestration_item` produces work items that target a **different instance** than the one being acked. These cases are:

| Scenario | Source | Target | Work Item Type |
|----------|--------|--------|---------------|
| Sub-orchestration start | Parent instance | Child instance | `StartOrchestration` |
| Sub-orchestration completion | Child instance | Parent instance | `SubOrchCompleted` / `SubOrchFailed` |
| Detached orchestration start | Coordinator instance | New instance | `StartOrchestration` |
| Cancel cascade | Parent instance | Child instance | `CancelInstance` |

### 4.2 Intent Lifecycle

```
                 ┌──────────────────────────────────────────────┐
                 │ ack_orchestration_item (transactional batch)  │
                 │                                              │
                 │  ... history, queue ops, metadata ...        │
                 │  CREATE outbox_intent (status: "pending")    │
                 └──────────────────┬───────────────────────────┘
                    ── transaction boundary ──
                 ┌──────────────────────────────────────────────┐
                 │ Best-effort delivery (immediate, same call)  │
                 │                                              │
                 │  Create target doc in target partition        │
                 │  On success → delete intent                  │
                 │  On 409    → delete intent (already exists)  │
                 │  On error  → leave as "pending"              │
                 └──────────────────┬───────────────────────────┘
                        (if delivery failed)
                 ┌──────────────────────────────────────────────┐
                 │ Background reconciler (every 2 seconds)      │
                 │                                              │
                 │  Query: type = "outbox_intent"               │
                 │         AND status = "pending"               │
                 │         AND createdAt < now - 2000           │
                 │                                              │
                 │  For each:                                   │
                 │    Create target doc in target partition      │
                 │    On success/409 → delete intent            │
                 │    On transient error → increment attempt,   │
                 │                        leave for next cycle  │
                 └──────────────────────────────────────────────┘
```

### 4.3 Idempotency Key Generation

The idempotency key must be deterministic so that duplicate delivery creates a 409 instead of a duplicate work item:

```rust
fn idempotency_key(
    source_instance: &str,
    execution_id: u64,
    event_sequence: u64,  // position in the history_delta or orchestrator_items list
) -> String {
    format!("{}:{}:{}", source_instance, execution_id, event_sequence)
}
```

The target document's `id` is set to a deterministic value derived from the idempotency key:

```rust
fn target_doc_id(idempotency_key: &str) -> String {
    format!("outbox:{}", idempotency_key)
}
```

This ensures that creating the same target document twice results in a 409 Conflict (CosmosDB enforces unique `id` within a partition), which we treat as success.

### 4.4 Reconciler

The reconciler is a background tokio task spawned by `CosmosDBProvider::new()`:

```rust
async fn reconciler_loop(inner: Arc<CosmosDBProviderInner>) {
    let mut interval = tokio::time::interval(Duration::from_secs(2));
    loop {
        interval.tick().await;

        // Cross-partition query for pending intents older than 2s
        let pending = query_pending_intents(&inner, Duration::from_secs(2)).await;

        for intent in pending {
            match deliver_intent(&inner, &intent).await {
                Ok(()) | Err(Conflict) => {
                    delete_intent(&inner, &intent).await.ok();
                }
                Err(e) if is_transient(&e) => {
                    // Leave for next cycle. Increment attemptCount.
                    increment_intent_attempt(&inner, &intent).await.ok();
                }
                Err(e) => {
                    tracing::warn!(
                        target: "duroxide::providers::cosmosdb",
                        intent_id = %intent.id,
                        error = %e,
                        "Permanent failure delivering outbox intent"
                    );
                }
            }
        }
    }
}
```

The reconciler query is cross-partition (intent documents are spread across source instance partitions). At low volume this is cheap (~5-20 RUs). Under load, most intents are delivered immediately and the reconciler query returns empty.

### 4.5 Shutdown

On `CosmosDBProvider::drop` or explicit shutdown, the reconciler task is cancelled via a `CancellationToken`. Pending intents remain in CosmosDB and will be delivered when the provider restarts.

---

## 5. Lease Provider

### 5.1 Trait

```rust
/// Assigns dispatch slots to concurrent dispatcher tasks.
/// Each dispatcher calls acquire_slots() to get its partition of the keyspace.
#[async_trait]
pub trait LeaseProvider: Send + Sync {
    /// Get the dispatch slots assigned to this caller.
    /// On first call, assigns slots. On subsequent calls, returns cached assignment.
    /// caller_id: unique identifier for the calling task (e.g., tokio task ID).
    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;

    /// Release slots when a dispatcher shuts down.
    async fn release_slots(&self, caller_id: u64);
}
```

### 5.2 Phase 1: In-Memory Implementation

```rust
pub struct InMemoryLeaseProvider {
    total: u32,
    next_index: AtomicU32,
    assignments: DashMap<u64, Vec<u8>>,
}

impl InMemoryLeaseProvider {
    pub fn new(total_dispatchers: u32) -> Self {
        Self {
            total: total_dispatchers,
            next_index: AtomicU32::new(0),
            assignments: DashMap::new(),
        }
    }
}

#[async_trait]
impl LeaseProvider for InMemoryLeaseProvider {
    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
        self.assignments
            .entry(caller_id)
            .or_insert_with(|| {
                let index = self.next_index.fetch_add(1, Ordering::SeqCst);
                (0u16..256)
                    .filter(|s| (*s as u32) % self.total == index)
                    .map(|s| s as u8)
                    .collect()
            })
            .clone()
    }

    async fn release_slots(&self, caller_id: u64) {
        self.assignments.remove(&caller_id);
    }
}
```

**Slot distribution example** with `total_dispatchers = 3`:

| Dispatcher | Index | Slots (256 total) | Count |
|------------|-------|-------------------|-------|
| 0 | 0 | 0, 3, 6, 9, ..., 255 | 86 |
| 1 | 1 | 1, 4, 7, 10, ..., 253 | 85 |
| 2 | 2 | 2, 5, 8, 11, ..., 254 | 85 |

### 5.3 Phase 2: CosmosDB-Backed Implementation (Future)

Same `LeaseProvider` trait. Uses lease documents in a `__leases__` partition for cross-runtime coordination. Each runtime's dispatchers claim non-overlapping slots via optimistic concurrency. Heartbeat renewal, expiry-based takeover for failover.

### 5.4 Usage in Provider

```rust
struct CosmosDBProviderInner {
    // ...
    orch_leases: Box<dyn LeaseProvider>,
    worker_leases: Box<dyn LeaseProvider>,
}

// In fetch_orchestration_item:
let caller_id = tokio::task::id().0;
let my_slots = self.inner.orch_leases.acquire_slots(caller_id).await;
// Use my_slots in query: AND c.dispatchSlot IN (@my_slots)

// In fetch_work_item:
let caller_id = tokio::task::id().0;
let my_slots = self.inner.worker_leases.acquire_slots(caller_id).await;
// Use my_slots in query: AND c.dispatchSlot IN (@my_slots)
```

---

## 6. ProviderAdmin Implementation

### 6.1 Instance Management

| Method | Implementation |
|--------|---------------|
| `list_instances()` | Cross-partition query: `SELECT c.instanceId FROM c WHERE c.type = 'instance'` |
| `list_instances_by_status(status)` | Cross-partition query: `... AND c.status = @status` |
| `get_instance_info(instance)` | Point read: `<instanceId>:instance` |
| `latest_execution_id(instance)` | Point read instance doc → `currentExecutionId` |

### 6.2 Execution Management

| Method | Implementation |
|--------|---------------|
| `list_executions(instance)` | Single-partition query: `SELECT DISTINCT c.executionId FROM c WHERE c.instanceId = @instance AND c.type = 'history'` |
| `read_history(instance)` | `read_history_with_execution_id(instance, latest_execution_id)` |
| `read_history_with_execution_id(instance, exec_id)` | Single-partition query ordered by eventId |
| `get_execution_info(instance, exec_id)` | Derive from history events: first=started_at, last completed/failed=status, count events |

### 6.3 Metrics

| Method | Implementation |
|--------|---------------|
| `get_system_metrics()` | Cross-partition aggregation: `SELECT VALUE { total: COUNT(1), running: COUNT(c.status = 'Running'), ... } FROM c WHERE c.type = 'instance'` |
| `get_queue_depths()` | Cross-partition count: `SELECT COUNT(1) FROM c WHERE c.type = 'orch_queue' AND c.visibleAt <= @now` + same for worker_queue |

### 6.4 Hierarchy

| Method | Implementation |
|--------|---------------|
| `list_children(instance)` | Cross-partition query: `SELECT c.instanceId FROM c WHERE c.type = 'instance' AND c.parentInstanceId = @instance` |
| `get_parent_id(instance)` | Point read instance doc → `parentInstanceId` |

### 6.5 Deletion

```
delete_instances_atomic(ids, force)
│
├── 1. For each id: read instance doc
│     If status = "Running" and force = false → ProviderError::permanent
│
├── 2. Build instance tree (all descendants via list_children recursively)
│
├── 3. Check for orphans: if deleting a parent without all children → error
│
├── 4. For each instance in the tree (leaf-first):
│     Transactional batch (single partition per instance):
│       Delete all documents where instanceId = @id
│       (instance, history, orch_queue, worker_queue, outbox_intent, session)
│
│     Note: each instance is its own partition, so a single transactional
│     batch deletes everything for that instance atomically.
│
└── 5. RETURN DeleteInstanceResult { counts }
```

### 6.6 Pruning

```
prune_executions(instance, options)
│
├── 1. List all executionIds for instance
├── 2. Determine which to keep (latest + keep_last + completed_before filter)
├── 3. For each execution to prune:
│     Delete all history docs for that executionId (single-partition batch)
├── 4. If execution's instance doc references a pruned executionId, update it
└── 5. RETURN PruneResult { counts }
```

### 6.7 Custom Status

```
get_custom_status(instance, last_seen_version)
│
├── 1. Point read instance doc
├── 2. If customStatusVersion > last_seen_version:
│     Return Some((customStatus, customStatusVersion))
└── 3. Else: Return None (no change since last poll)
```

---

## 7. Error Handling

### 7.1 CosmosDB Error → ProviderError Mapping

| CosmosDB Error | HTTP Status | ProviderError | Rationale |
|----------------|-------------|---------------|-----------|
| Conflict | 409 | `retryable` | ETag mismatch (optimistic concurrency) |
| TooManyRequests | 429 | `retryable` | Rate limited, retry after backoff |
| RequestTimeout | 408 | `retryable` | Transient timeout |
| ServiceUnavailable | 503 | `retryable` | Transient availability issue |
| NotFound | 404 | `permanent` | Document doesn't exist |
| PreconditionFailed | 412 | `retryable` | ETag mismatch on conditional write |
| BadRequest | 400 | `permanent` | Malformed query or document |
| RequestEntityTooLarge | 413 | `permanent` | Batch too large |

### 7.2 Retry Strategy

```rust
const MAX_RETRIES: u32 = 3;
const BASE_RETRY_DELAY_MS: u64 = 100;

async fn with_retry<F, T>(operation: &str, f: F) -> Result<T, ProviderError>
where
    F: Fn() -> Future<Output = Result<T, CosmosDBError>>,
{
    for attempt in 0..=MAX_RETRIES {
        match f().await {
            Ok(result) => return Ok(result),
            Err(e) => {
                let provider_err = map_cosmosdb_error(operation, &e);
                if provider_err.is_retryable() && attempt < MAX_RETRIES {
                    let delay = BASE_RETRY_DELAY_MS * (attempt as u64 + 1);
                    tokio::time::sleep(Duration::from_millis(delay)).await;
                    continue;
                }
                return Err(provider_err);
            }
        }
    }
    unreachable!()
}
```

429 responses include a `Retry-After` header. The retry logic should respect that value when present.

---

## 8. Crate Structure

```
duroxide-cdb/
├── Cargo.toml
├── SPEC.md                         ← this document
├── README.md
├── CHANGELOG.md
├── .env.example                    # COSMOSDB_ENDPOINT, COSMOSDB_KEY, COSMOSDB_DATABASE
├── src/
│   ├── lib.rs                      # Re-export CosmosDBProvider
│   ├── provider.rs                 # Provider + ProviderAdmin trait implementations
│   ├── client.rs                   # CosmosDBClient wrapper, connection config
│   ├── containers.rs               # Container initialization, indexing policy setup
│   ├── models.rs                   # Document types (serde structs for all 6 doc types)
│   ├── batch.rs                    # Transactional batch builder helpers
│   ├── query.rs                    # Query builders (cross-partition, single-partition)
│   ├── outbox.rs                   # Outbox intent delivery + reconciler
│   ├── leases.rs                   # LeaseProvider trait + InMemoryLeaseProvider
│   └── errors.rs                   # CosmosDB error → ProviderError mapping
├── tests/
│   ├── common/
│   │   └── mod.rs                  # Test helpers: create_cosmosdb_store, cleanup
│   ├── cosmosdb_provider_test.rs   # Provider validation tests (157 tests)
│   ├── e2e_samples.rs             # Ported e2e sample tests
│   ├── stress_tests.rs            # Stress tests
│   └── session_e2e_tests.rs       # Session affinity e2e tests
└── cdb-stress/                     # Stress test binary
    ├── Cargo.toml
    └── src/
        ├── lib.rs
        └── bin/
            └── cdb-stress.rs
```

### Dependencies

```toml
[package]
name = "duroxide-cdb"
version = "0.1.0"
edition = "2021"

[dependencies]
duroxide = { version = "0.1.20" }
azure_data_cosmos = "0.22"
azure_core = "0.22"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4"] }
chrono = "0.4"
tracing = "0.1"
anyhow = "1"
dashmap = "6"
tokio-util = "0.7"

[dev-dependencies]
duroxide = { version = "0.1.20", features = ["provider-test"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dotenvy = "0.15"
semver = "1"
```

---

## 9. Configuration

```rust
pub struct CosmosDBProviderConfig {
    /// CosmosDB endpoint URL
    pub endpoint: String,

    /// CosmosDB primary key
    pub key: String,

    /// Database name
    pub database: String,

    /// Container name (default: "duroxide")
    pub container: String,

    /// Number of orchestration dispatchers (for lease partitioning)
    pub orch_concurrency: u32,

    /// Number of worker dispatchers (for lease partitioning)
    pub worker_concurrency: u32,

    /// Outbox reconciler interval (default: 2s)
    pub reconciler_interval: Duration,

    /// Outbox intent age threshold before reconciler picks it up (default: 2s)
    pub reconciler_age_threshold: Duration,
}

impl Default for CosmosDBProviderConfig {
    fn default() -> Self {
        Self {
            endpoint: String::new(),
            key: String::new(),
            database: "duroxide".to_string(),
            container: "duroxide".to_string(),
            orch_concurrency: 1,
            worker_concurrency: 1,
            reconciler_interval: Duration::from_secs(2),
            reconciler_age_threshold: Duration::from_secs(2),
        }
    }
}
```

### Environment Variables

| Variable | Description | Default |
|----------|-------------|---------|
| `COSMOSDB_ENDPOINT` | CosmosDB account endpoint | required |
| `COSMOSDB_KEY` | CosmosDB account key | required |
| `COSMOSDB_DATABASE` | Database name | `duroxide` |
| `COSMOSDB_CONTAINER` | Container name | `duroxide` |

---

## 10. Testing Strategy

### 10.1 Provider Validation Tests (157 tests)

Implement `ProviderFactory` for `CosmosDBProvider`. Each test gets an isolated set of instances (unique instance ID prefixes). Cleanup deletes all documents with matching prefixes.

### 10.2 E2E Sample Tests

Port all 17 e2e samples from duroxide-pg. Replace `create_postgres_store()` with `create_cosmosdb_store()`. Adjust timeouts for CosmosDB latency (use 30-60s instead of 5-10s).

### 10.3 Stress Tests

Implement `ProviderStressFactory`. Start with conservative config (max_concurrent: 5, duration: 10s). Target 100% success rate before optimizing throughput.

### 10.4 Test Infrastructure

- **Local:** CosmosDB Linux emulator (Docker)
- **CI:** CosmosDB Linux emulator (Docker) or dedicated test account (serverless tier for cost)
- **Isolation:** Each test generates unique instance ID prefixes. Cleanup queries and deletes all documents with that prefix.

---

## 11. Implementation Phases

### Phase 1: Core Provider (MVP)

Goal: Pass all 157 provider validation tests.

| Step | Deliverable | Est. |
|------|------------|------|
| 1.1 | Scaffolding: Cargo.toml, models.rs, containers.rs, lib.rs | 1 day |
| 1.2 | client.rs, errors.rs, leases.rs (InMemoryLeaseProvider) | 1 day |
| 1.3 | Read path: `read`, `read_with_execution`, `append_with_execution` | 1 day |
| 1.4 | Enqueue: `enqueue_for_orchestrator`, `enqueue_for_worker` | 1 day |
| 1.5 | Fetch: `fetch_orchestration_item`, `fetch_work_item` | 2-3 days |
| 1.6 | Ack: `ack_orchestration_item`, `ack_work_item`, batch.rs | 2-3 days |
| 1.7 | Outbox: outbox.rs, reconciler background task | 1-2 days |
| 1.8 | Lock management: abandon, renew (orch + worker) | 1-2 days |
| 1.9 | Sessions: session routing, renew, cleanup | 1-2 days |
| 1.10 | Custom status: `get_custom_status` | 0.5 day |
| 1.11 | Validation tests: wire up all 157, debug & fix | 3-5 days |

### Phase 2: ProviderAdmin + E2E

| Step | Deliverable | Est. |
|------|------------|------|
| 2.1 | ProviderAdmin: list, get_info, metrics, hierarchy | 2-3 days |
| 2.2 | Deletion: atomic, bulk, cascade | 2 days |
| 2.3 | Pruning: prune_executions, prune_bulk | 1 day |
| 2.4 | E2E sample tests | 2-3 days |
| 2.5 | Session E2E tests | 1 day |

### Phase 3: Stress & Polish

| Step | Deliverable | Est. |
|------|------------|------|
| 3.1 | Stress tests + cdb-stress binary | 1-2 days |
| 3.2 | README, CHANGELOG, CI/CD workflow | 1 day |
| 3.3 | Performance tuning, RU optimization | 1-2 days |

### Phase 4: Multi-Runtime Leases (Future)

| Step | Deliverable | Est. |
|------|------------|------|
| 4.1 | CosmosDBLeaseProvider implementation | 2-3 days |
| 4.2 | Lease heartbeat, expiry, takeover | 2 days |
| 4.3 | Integration testing with multiple runtimes | 2 days |

**Total Phase 1-3: ~4-5 weeks**