Skip to main content

duroxide_cdb/
provider.rs

1use crate::batch;
2use crate::client::{BatchOperation, CosmosDBClient};
3use crate::containers;
4use crate::errors;
5use crate::leases::{InMemoryLeaseProvider, LeaseProvider};
6use crate::models::*;
7use crate::outbox;
8use crate::outbox::OutboxFaultInjector;
9use crate::query;
10use duroxide::providers::*;
11use duroxide::{Event, EventKind, TagFilter};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_util::sync::CancellationToken;
15
16const MAX_LOCK_RETRIES: usize = 20;
17
18/// Configuration for the CosmosDB provider.
19#[derive(Debug, Clone)]
20pub struct CosmosDBProviderConfig {
21    pub endpoint: String,
22    pub key: String,
23    pub database: String,
24    pub container: String,
25    pub orch_concurrency: u32,
26    pub worker_concurrency: u32,
27    pub reconciler_interval: Duration,
28    pub reconciler_age_threshold: Duration,
29}
30
31impl Default for CosmosDBProviderConfig {
32    fn default() -> Self {
33        Self {
34            endpoint: String::new(),
35            key: String::new(),
36            database: "duroxide".to_string(),
37            container: "duroxide".to_string(),
38            orch_concurrency: 1,
39            worker_concurrency: 1,
40            reconciler_interval: Duration::from_secs(2),
41            reconciler_age_threshold: Duration::from_secs(2),
42        }
43    }
44}
45
46/// CosmosDB provider for duroxide.
47/// Implements both Provider and ProviderAdmin traits.
48#[derive(Clone)]
49pub struct CosmosDBProvider {
50    inner: Arc<CosmosDBProviderInner>,
51}
52
53struct CosmosDBProviderInner {
54    client: CosmosDBClient,
55    orch_leases: Box<dyn LeaseProvider>,
56    worker_leases: Box<dyn LeaseProvider>,
57    cancel: CancellationToken,
58    _reconciler_handle: Option<tokio::task::JoinHandle<()>>,
59    outbox_fault_injector: Option<OutboxFaultInjector>,
60}
61
62impl CosmosDBProvider {
63    /// Create a new CosmosDB provider with default settings.
64    pub async fn new(endpoint: &str, key: &str, database: &str) -> Result<Self, ProviderError> {
65        let config = CosmosDBProviderConfig {
66            endpoint: endpoint.to_string(),
67            key: key.to_string(),
68            database: database.to_string(),
69            ..Default::default()
70        };
71        Self::new_with_config(config).await
72    }
73
74    /// Create a new CosmosDB provider with custom configuration.
75    pub async fn new_with_config(config: CosmosDBProviderConfig) -> Result<Self, ProviderError> {
76        let client = CosmosDBClient::new(
77            &config.endpoint,
78            &config.key,
79            &config.database,
80            &config.container,
81        )
82        .map_err(|e| ProviderError::permanent("new", e))?;
83
84        // Ensure database and container exist
85        containers::ensure_infrastructure(&client).await?;
86
87        let cancel = CancellationToken::new();
88
89        // Start outbox reconciler
90        let reconciler_handle = outbox::start_reconciler(
91            client.clone(),
92            config.reconciler_interval,
93            config.reconciler_age_threshold,
94            cancel.clone(),
95        );
96
97        let orch_leases = Box::new(InMemoryLeaseProvider::new(config.orch_concurrency));
98        let worker_leases = Box::new(InMemoryLeaseProvider::new(config.worker_concurrency));
99
100        Ok(Self {
101            inner: Arc::new(CosmosDBProviderInner {
102                client,
103                orch_leases,
104                worker_leases,
105                cancel,
106                _reconciler_handle: Some(reconciler_handle),
107                outbox_fault_injector: None,
108            }),
109        })
110    }
111
112    /// Create with a specific container name (for test isolation).
113    pub async fn new_with_container(
114        endpoint: &str,
115        key: &str,
116        database: &str,
117        container: &str,
118    ) -> Result<Self, ProviderError> {
119        let config = CosmosDBProviderConfig {
120            endpoint: endpoint.to_string(),
121            key: key.to_string(),
122            database: database.to_string(),
123            container: container.to_string(),
124            ..Default::default()
125        };
126        Self::new_with_config(config).await
127    }
128
129    fn client(&self) -> &CosmosDBClient {
130        &self.inner.client
131    }
132
133    /// Set a fault injector for outbox delivery testing.
134    /// When set, the next N best-effort deliveries will be skipped,
135    /// forcing the background reconciler to deliver the intents.
136    pub fn set_outbox_fault_injector(&mut self, injector: OutboxFaultInjector) {
137        // Safety: we need interior mutability here. The fault injector is
138        // Arc<AtomicU32> internally, so it's safe to set before use.
139        let inner = Arc::get_mut(&mut self.inner)
140            .expect("Cannot set fault injector after provider has been cloned");
141        inner.outbox_fault_injector = Some(injector);
142    }
143
144    /// Cleanup: delete the container. Used in tests.
145    pub async fn cleanup(&self) -> Result<(), ProviderError> {
146        self.inner.cancel.cancel();
147        self.client().delete_container().await
148    }
149
150    async fn load_kv_store_documents(
151        &self,
152        instance_id: &str,
153    ) -> Result<Vec<KeyValueDocument>, ProviderError> {
154        let docs =
155            query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_KV).await?;
156        docs.into_iter()
157            .map(|doc| {
158                serde_json::from_value(doc).map_err(|e| {
159                    ProviderError::permanent(
160                        "load_kv_store_documents",
161                        format!("Deserialize error: {e}"),
162                    )
163                })
164            })
165            .collect()
166    }
167
168    async fn load_kv_delta_documents(
169        &self,
170        instance_id: &str,
171    ) -> Result<Vec<KeyValueDeltaDocument>, ProviderError> {
172        let docs = query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_KV_DELTA)
173            .await?;
174        docs.into_iter()
175            .map(|doc| {
176                serde_json::from_value(doc).map_err(|e| {
177                    ProviderError::permanent(
178                        "load_kv_delta_documents",
179                        format!("Deserialize error: {e}"),
180                    )
181                })
182            })
183            .collect()
184    }
185
186    fn apply_kv_history_delta(
187        instance_id: &str,
188        execution_id: u64,
189        now: u64,
190        history_delta: &[Event],
191        store_docs: &[KeyValueDocument],
192        existing_delta_docs: &[KeyValueDeltaDocument],
193    ) -> std::collections::HashMap<String, KeyValueDeltaDocument> {
194        let mut delta_docs: std::collections::HashMap<String, KeyValueDeltaDocument> =
195            existing_delta_docs
196                .iter()
197                .cloned()
198                .map(|doc| (doc.key.clone(), doc))
199                .collect();
200        let store_keys: Vec<String> = store_docs.iter().map(|doc| doc.key.clone()).collect();
201
202        for event in history_delta {
203            match &event.kind {
204                EventKind::KeyValueSet {
205                    key,
206                    value,
207                    last_updated_at_ms,
208                } => {
209                    delta_docs.insert(
210                        key.clone(),
211                        KeyValueDeltaDocument::new(
212                            instance_id,
213                            key,
214                            Some(value.clone()),
215                            execution_id,
216                            *last_updated_at_ms,
217                        ),
218                    );
219                }
220                EventKind::KeyValueCleared { key } => {
221                    delta_docs.insert(
222                        key.clone(),
223                        KeyValueDeltaDocument::new(instance_id, key, None, execution_id, now),
224                    );
225                }
226                EventKind::KeyValuesCleared => {
227                    for doc in delta_docs.values_mut() {
228                        doc.value = None;
229                        doc.execution_id = execution_id;
230                        doc.last_updated_at_ms = now;
231                    }
232                    for key in &store_keys {
233                        delta_docs.entry(key.clone()).or_insert_with(|| {
234                            KeyValueDeltaDocument::new(instance_id, key, None, execution_id, now)
235                        });
236                    }
237                }
238                _ => {}
239            }
240        }
241
242        delta_docs
243    }
244
245    fn kv_delta_changed_keys(
246        history_delta: &[Event],
247        store_docs: &[KeyValueDocument],
248        existing_delta_docs: &[KeyValueDeltaDocument],
249    ) -> (std::collections::HashSet<String>, bool) {
250        let mut changed_keys = std::collections::HashSet::new();
251        let mut clear_all = false;
252
253        for event in history_delta {
254            match &event.kind {
255                EventKind::KeyValueSet { key, .. } | EventKind::KeyValueCleared { key } => {
256                    changed_keys.insert(key.clone());
257                }
258                EventKind::KeyValuesCleared => {
259                    clear_all = true;
260                }
261                _ => {}
262            }
263        }
264
265        if clear_all {
266            changed_keys.extend(store_docs.iter().map(|doc| doc.key.clone()));
267            changed_keys.extend(existing_delta_docs.iter().map(|doc| doc.key.clone()));
268        }
269
270        (changed_keys, clear_all)
271    }
272
273    /// Read the instance document. Returns None if not found.
274    async fn read_instance(
275        &self,
276        instance_id: &str,
277    ) -> Result<Option<InstanceDocument>, ProviderError> {
278        let doc_id = InstanceDocument::doc_id(instance_id);
279        let resp = self.client().read_document(&doc_id, instance_id).await?;
280
281        if errors::is_not_found(resp.status) {
282            return Ok(None);
283        }
284        if !resp.is_success() {
285            return Err(errors::map_cosmosdb_error(
286                "read_instance",
287                resp.status,
288                &resp.body,
289            ));
290        }
291
292        let mut inst: InstanceDocument = serde_json::from_str(&resp.body).map_err(|e| {
293            ProviderError::permanent("read_instance", format!("Deserialize error: {e}"))
294        })?;
295        inst.etag = resp.etag;
296        Ok(Some(inst))
297    }
298
299    /// Try to lock an instance by conditional replace.
300    /// If no instance document exists yet (first StartOrchestration), creates one.
301    /// `work_item_json` is used to extract orchestration name/version for new instances.
302    async fn try_lock_instance(
303        &self,
304        instance_id: &str,
305        lock_timeout: Duration,
306        now: u64,
307        work_item_json: Option<&str>,
308    ) -> Result<Option<(InstanceDocument, String)>, ProviderError> {
309        let inst = match self.read_instance(instance_id).await? {
310            Some(i) => i,
311            None => {
312                // Extract orchestration name/version from the work item if available
313                let (orch_name, orch_version) = if let Some(json) = work_item_json {
314                    match serde_json::from_str::<WorkItem>(json) {
315                        Ok(WorkItem::StartOrchestration {
316                            orchestration,
317                            version,
318                            ..
319                        }) => (orchestration, version.unwrap_or_default()),
320                        _ => (String::new(), String::new()),
321                    }
322                } else {
323                    (String::new(), String::new())
324                };
325
326                // Lazy instance creation: create a skeleton instance doc
327                let lock_token = uuid::Uuid::new_v4().to_string();
328                let locked_until = now + lock_timeout.as_millis() as u64;
329
330                let mut new_inst =
331                    InstanceDocument::new(instance_id, &orch_name, &orch_version, 1, None, now);
332                new_inst.lock_token = Some(lock_token.clone());
333                new_inst.locked_until = Some(locked_until);
334
335                let doc_json = serde_json::to_value(&new_inst).map_err(|e| {
336                    ProviderError::permanent("try_lock_instance", format!("Serialize error: {e}"))
337                })?;
338
339                let resp = self
340                    .client()
341                    .create_document(instance_id, &doc_json)
342                    .await?;
343                if resp.is_success() {
344                    new_inst.etag = resp.etag;
345                    return Ok(Some((new_inst, lock_token)));
346                } else if errors::is_conflict(resp.status) {
347                    // Another dispatcher created it first; re-read
348                    match self.read_instance(instance_id).await? {
349                        Some(i) => i,
350                        None => return Ok(None),
351                    }
352                } else {
353                    return Err(errors::map_cosmosdb_error(
354                        "try_lock_instance",
355                        resp.status,
356                        &resp.body,
357                    ));
358                }
359            }
360        };
361
362        // Check if already locked
363        if let Some(locked_until) = inst.locked_until {
364            if locked_until > now {
365                return Ok(None); // Locked by another dispatcher
366            }
367        }
368
369        let etag = inst.etag.clone().unwrap_or_default();
370        let lock_token = uuid::Uuid::new_v4().to_string();
371        let locked_until = now + lock_timeout.as_millis() as u64;
372
373        let mut updated = inst.clone();
374        updated.lock_token = Some(lock_token.clone());
375        updated.locked_until = Some(locked_until);
376        updated.updated_at = now;
377
378        let doc_json = serde_json::to_value(&updated).map_err(|e| {
379            ProviderError::permanent("try_lock_instance", format!("Serialize error: {e}"))
380        })?;
381
382        let resp = self
383            .client()
384            .replace_document(&updated.id, instance_id, &doc_json, Some(&etag))
385            .await?;
386
387        if resp.is_success() {
388            Ok(Some((updated, lock_token)))
389        } else if errors::is_precondition_failed(resp.status) || errors::is_conflict(resp.status) {
390            Ok(None) // ETag race
391        } else {
392            Err(errors::map_cosmosdb_error(
393                "try_lock_instance",
394                resp.status,
395                &resp.body,
396            ))
397        }
398    }
399
400    /// Unlock an instance.
401    async fn unlock_instance(&self, instance_id: &str) -> Result<(), ProviderError> {
402        let Some(inst) = self.read_instance(instance_id).await? else {
403            return Ok(());
404        };
405
406        let etag = inst.etag.clone().unwrap_or_default();
407        let mut updated = inst;
408        updated.lock_token = None;
409        updated.locked_until = None;
410
411        let doc_json = serde_json::to_value(&updated).map_err(|e| {
412            ProviderError::permanent("unlock_instance", format!("Serialize error: {e}"))
413        })?;
414
415        let resp = self
416            .client()
417            .replace_document(&updated.id, instance_id, &doc_json, Some(&etag))
418            .await?;
419
420        if resp.is_success() || errors::is_precondition_failed(resp.status) {
421            Ok(())
422        } else {
423            Err(errors::map_cosmosdb_error(
424                "unlock_instance",
425                resp.status,
426                &resp.body,
427            ))
428        }
429    }
430}
431
432impl Drop for CosmosDBProviderInner {
433    fn drop(&mut self) {
434        self.cancel.cancel();
435    }
436}
437
438// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
439// Provider trait implementation
440// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
441
442#[async_trait::async_trait]
443impl Provider for CosmosDBProvider {
444    fn name(&self) -> &str {
445        "duroxide-cdb"
446    }
447
448    fn version(&self) -> &str {
449        env!("CARGO_PKG_VERSION")
450    }
451
452    // ─── fetch_orchestration_item ────────────────────────────────
453
454    async fn fetch_orchestration_item(
455        &self,
456        lock_timeout: Duration,
457        _poll_timeout: Duration,
458        filter: Option<&DispatcherCapabilityFilter>,
459    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
460        let caller_id = task_id_u64();
461        let my_slots = self.inner.orch_leases.acquire_slots(caller_id).await;
462        let now = now_ms();
463
464        // Capability filter: compute packed version bounds for initial query filtering
465        let (min_packed, max_packed) = if let Some(f) = filter {
466            if f.supported_duroxide_versions.is_empty() {
467                return Ok(None);
468            }
469            let min = f
470                .supported_duroxide_versions
471                .iter()
472                .map(|r| pack_semver(&r.min))
473                .min()
474                .unwrap();
475            let max = f
476                .supported_duroxide_versions
477                .iter()
478                .map(|r| pack_semver(&r.max))
479                .max()
480                .unwrap();
481            (Some(min), Some(max))
482        } else {
483            (None, None)
484        };
485
486        let mut excluded = Vec::new();
487
488        for _attempt in 0..MAX_LOCK_RETRIES {
489            // Step 1: Find candidate (no version filtering on queue items)
490            let candidate = query::find_candidate_orch_item(
491                self.client(),
492                now,
493                &my_slots,
494                None, // Don't filter by version in queue query
495                None,
496                &excluded,
497            )
498            .await?;
499
500            let Some(candidate) = candidate else {
501                return Ok(None);
502            };
503
504            let instance_id = &candidate.instance_id;
505
506            // Step 2: Try to lock the instance
507            let lock_result = self
508                .try_lock_instance(instance_id, lock_timeout, now, Some(&candidate.work_item))
509                .await?;
510
511            let Some((locked_instance, lock_token)) = lock_result else {
512                excluded.push(instance_id.to_string());
513                continue;
514            };
515
516            // Step 2b: Check capability filter against instance's pinned version
517            if let (Some(min_v), Some(max_v)) = (min_packed, max_packed) {
518                if let Some(pinned) = locked_instance.pinned_duroxide_version_packed {
519                    if pinned < min_v || pinned > max_v {
520                        // Incompatible version - unlock and skip
521                        self.unlock_instance(instance_id).await?;
522                        excluded.push(instance_id.to_string());
523                        continue;
524                    }
525                }
526                // If pinned is None, it's compatible (unpinned = legacy)
527            }
528
529            // Step 3: Collect all pending messages
530            let messages = query::collect_orch_messages(self.client(), instance_id, now).await?;
531
532            if messages.is_empty() {
533                // Messages were consumed between step 1 and 3
534                self.unlock_instance(instance_id).await?;
535                excluded.push(instance_id.to_string());
536                continue;
537            }
538
539            // Tag all messages with lock
540            let max_attempt = messages.iter().map(|m| m.attempt_count).max().unwrap_or(0);
541            for msg in &messages {
542                let mut updated_msg = msg.clone();
543                updated_msg.lock_token = Some(lock_token.clone());
544                updated_msg.locked_until = Some(now + lock_timeout.as_millis() as u64);
545                updated_msg.attempt_count += 1;
546
547                let doc_json = serde_json::to_value(&updated_msg).map_err(|e| {
548                    ProviderError::permanent(
549                        "fetch_orchestration_item",
550                        format!("Serialize error: {e}"),
551                    )
552                })?;
553
554                let _ = self
555                    .client()
556                    .replace_document(&msg.id, instance_id, &doc_json, msg.etag.as_deref())
557                    .await?;
558            }
559
560            let attempt_count = (max_attempt + 1) as u32;
561
562            // Deserialize work items
563            let work_items: Vec<WorkItem> = messages
564                .iter()
565                .map(|m| {
566                    serde_json::from_str(&m.work_item).map_err(|e| {
567                        ProviderError::permanent(
568                            "fetch_orchestration_item",
569                            format!("Failed to deserialize work item: {e}"),
570                        )
571                    })
572                })
573                .collect::<Result<Vec<_>, _>>()?;
574
575            // Step 4: Fetch history
576            let execution_id = locked_instance.current_execution_id;
577            let history_docs =
578                query::fetch_history(self.client(), instance_id, execution_id).await?;
579
580            let (history, history_error) = {
581                let mut events = Vec::new();
582                let mut error = None;
583                for doc in &history_docs {
584                    match serde_json::from_str::<Event>(&doc.event_data) {
585                        Ok(event) => events.push(event),
586                        Err(e) => {
587                            error = Some(format!(
588                                "Failed to deserialize history event {}: {e}",
589                                doc.event_id
590                            ));
591                            events.clear();
592                            break;
593                        }
594                    }
595                }
596                (events, error)
597            };
598
599            // Step 5: Load KV snapshot from kv_store only. Current-execution
600            // mutations remain in kv_delta until the execution reaches a terminal state.
601            let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = self
602                .load_kv_store_documents(instance_id)
603                .await?
604                .into_iter()
605                .map(|doc| {
606                    (
607                        doc.key,
608                        duroxide::providers::KvEntry {
609                            value: doc.value,
610                            last_updated_at_ms: doc.last_updated_at_ms,
611                        },
612                    )
613                })
614                .collect();
615
616            // Step 6: Build OrchestrationItem
617
618            // Orphan queue messages: if orchestration_name is empty (no StartOrchestration
619            // seen), there's no history, and ALL messages are QueueMessage items, these are
620            // orphan events enqueued before the orchestration started. Drop them by acking
621            // with empty deltas. Other work items (CancelInstance, etc.) may legitimately
622            // race with StartOrchestration and must not be dropped.
623            if locked_instance.orchestration_name.is_empty()
624                && history.is_empty()
625                && work_items
626                    .iter()
627                    .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
628            {
629                let message_count = work_items.len();
630                tracing::warn!(
631                    target = "duroxide::providers::cosmosdb",
632                    instance = %instance_id,
633                    message_count,
634                    "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
635                );
636                self.ack_orchestration_item(
637                    &lock_token,
638                    execution_id,
639                    vec![],
640                    vec![],
641                    vec![],
642                    ExecutionMetadata::default(),
643                    vec![],
644                )
645                .await?;
646                return Ok(None);
647            }
648
649            let item = OrchestrationItem {
650                instance: instance_id.to_string(),
651                orchestration_name: locked_instance.orchestration_name.clone(),
652                execution_id,
653                version: locked_instance.orchestration_version.clone(),
654                history,
655                messages: work_items,
656                history_error,
657                kv_snapshot,
658            };
659
660            return Ok(Some((item, lock_token, attempt_count)));
661        }
662
663        // All retries exhausted
664        Ok(None)
665    }
666
667    // ─── ack_orchestration_item ──────────────────────────────────
668
669    async fn ack_orchestration_item(
670        &self,
671        lock_token: &str,
672        execution_id: u64,
673        history_delta: Vec<Event>,
674        worker_items: Vec<WorkItem>,
675        orchestrator_items: Vec<WorkItem>,
676        metadata: ExecutionMetadata,
677        cancelled_activities: Vec<ScheduledActivityIdentifier>,
678    ) -> Result<(), ProviderError> {
679        // Find the locked instance
680        let instance = query::find_instance_by_lock_token(self.client(), lock_token)
681            .await?
682            .ok_or_else(|| {
683                ProviderError::permanent(
684                    "ack_orchestration_item",
685                    format!("Invalid lock token or lock expired: {lock_token}"),
686                )
687            })?;
688
689        let instance_id = &instance.instance_id;
690        let now = now_ms();
691
692        // Verify lock hasn't expired
693        if let Some(locked_until) = instance.locked_until {
694            if locked_until <= now {
695                return Err(ProviderError::permanent(
696                    "ack_orchestration_item",
697                    format!("Lock has expired for instance {instance_id}"),
698                ));
699            }
700        }
701
702        // Classify items by partition
703        let mut same_partition_worker = Vec::new();
704        let mut same_partition_orch = Vec::new();
705        let mut cross_partition_intents = Vec::new();
706
707        // Build a set of cancelled activity identities for fast lookup
708        let cancelled_set: std::collections::HashSet<(String, u64, u64)> = cancelled_activities
709            .iter()
710            .map(|c| (c.instance.clone(), c.execution_id, c.activity_id))
711            .collect();
712
713        // Process worker items, filtering out any that match a cancelled activity
714        for (seq, item) in worker_items.iter().enumerate() {
715            // Check if this worker item is being cancelled in the same ack
716            let is_cancelled = match item {
717                WorkItem::ActivityExecute {
718                    instance,
719                    execution_id,
720                    id,
721                    ..
722                } => cancelled_set.contains(&(instance.clone(), *execution_id, *id)),
723                _ => false,
724            };
725            if is_cancelled {
726                continue; // Skip: this activity is both scheduled and cancelled
727            }
728
729            let target_instance = work_item_instance(item);
730            let item_json = serde_json::to_string(item).map_err(|e| {
731                ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
732            })?;
733
734            if target_instance == instance_id {
735                // Same partition
736                let (exec_id, activity_id, session_id, tag) = match item {
737                    WorkItem::ActivityExecute {
738                        execution_id,
739                        id,
740                        session_id,
741                        tag,
742                        ..
743                    } => (
744                        Some(*execution_id),
745                        Some(*id),
746                        session_id.clone(),
747                        tag.clone(),
748                    ),
749                    _ => (None, None, None, None),
750                };
751                let doc = QueueItemDocument::new_worker_queue(
752                    instance_id,
753                    item_json,
754                    exec_id,
755                    activity_id,
756                    session_id,
757                    tag,
758                    now,
759                );
760                same_partition_worker.push(serde_json::to_value(&doc).unwrap());
761            } else {
762                // Cross partition: create outbox intent
763                let (exec_id, activity_id, session_id, tag) = match item {
764                    WorkItem::ActivityExecute {
765                        execution_id,
766                        id,
767                        session_id,
768                        tag,
769                        ..
770                    } => (
771                        Some(*execution_id),
772                        Some(*id),
773                        session_id.clone(),
774                        tag.clone(),
775                    ),
776                    _ => (None, None, None, None),
777                };
778                let target_doc = QueueItemDocument::new_worker_queue(
779                    target_instance,
780                    item_json,
781                    exec_id,
782                    activity_id,
783                    session_id,
784                    tag,
785                    now,
786                );
787                let target_json = serde_json::to_string(&target_doc).unwrap();
788                let idem_key = idempotency_key(instance_id, execution_id, seq as u64);
789                let intent = OutboxIntentDocument::new(
790                    instance_id,
791                    target_instance,
792                    DOC_TYPE_WORKER_QUEUE,
793                    target_json,
794                    idem_key,
795                    now,
796                );
797                cross_partition_intents.push(intent);
798            }
799        }
800
801        // Process orchestrator items
802        for (seq, item) in orchestrator_items.iter().enumerate() {
803            let target_instance = work_item_instance(item);
804            let item_json = serde_json::to_string(item).map_err(|e| {
805                ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
806            })?;
807
808            let delay = match item {
809                WorkItem::TimerFired { fire_at_ms, .. } => {
810                    let fire_at = *fire_at_ms;
811                    if fire_at > now {
812                        fire_at
813                    } else {
814                        now
815                    }
816                }
817                _ => now,
818            };
819
820            if target_instance == instance_id {
821                let doc = QueueItemDocument::new_orch_queue(instance_id, item_json, delay, now);
822                same_partition_orch.push(serde_json::to_value(&doc).unwrap());
823            } else {
824                let target_doc =
825                    QueueItemDocument::new_orch_queue(target_instance, item_json, delay, now);
826                let target_json = serde_json::to_string(&target_doc).unwrap();
827                let idem_key =
828                    idempotency_key(instance_id, execution_id, (worker_items.len() + seq) as u64);
829                let intent = OutboxIntentDocument::new(
830                    instance_id,
831                    target_instance,
832                    DOC_TYPE_ORCH_QUEUE,
833                    target_json,
834                    idem_key,
835                    now,
836                );
837                cross_partition_intents.push(intent);
838            }
839        }
840
841        // Find messages to delete (locked by our token)
842        let locked_messages =
843            query::find_items_by_lock_token(self.client(), lock_token, DOC_TYPE_ORCH_QUEUE).await?;
844        let messages_to_delete: Vec<String> =
845            locked_messages.iter().map(|m| m.id.clone()).collect();
846
847        // Find cancelled activity doc IDs
848        let mut cancelled_doc_ids = Vec::new();
849        for cancelled in &cancelled_activities {
850            // Query worker queue for matching activity
851            let sql = format!(
852                "SELECT c.id FROM c WHERE c.instanceId = @instanceId AND c.type = '{}' \
853                 AND c.executionId = @execId AND c.activityId = @activityId",
854                DOC_TYPE_WORKER_QUEUE
855            );
856            let params = vec![
857                crate::client::QueryParameter::new(
858                    "@instanceId",
859                    serde_json::json!(&cancelled.instance),
860                ),
861                crate::client::QueryParameter::new(
862                    "@execId",
863                    serde_json::json!(cancelled.execution_id),
864                ),
865                crate::client::QueryParameter::new(
866                    "@activityId",
867                    serde_json::json!(cancelled.activity_id),
868                ),
869            ];
870            let results = self
871                .client()
872                .query(&sql, params, Some(&cancelled.instance))
873                .await?;
874            for doc in results {
875                if let Some(id) = doc.get("id").and_then(|v| v.as_str()) {
876                    cancelled_doc_ids.push(id.to_string());
877                }
878            }
879        }
880
881        // Build history delta entries using event_id from each Event
882        let history_entries: Vec<(u64, String)> = history_delta
883            .iter()
884            .map(|event| {
885                let event_id = event.event_id;
886                let event_json = serde_json::to_string(event).unwrap();
887                (event_id, event_json)
888            })
889            .collect();
890
891        // Build instance update document
892        let mut updated_instance = instance.clone();
893        updated_instance.current_execution_id = execution_id;
894        updated_instance.updated_at = now;
895        updated_instance.lock_token = None;
896        updated_instance.locked_until = None;
897
898        if let Some(status) = &metadata.status {
899            updated_instance.status = status.clone();
900        }
901        if let Some(output) = &metadata.output {
902            updated_instance.output = Some(output.clone());
903        }
904        if let Some(name) = &metadata.orchestration_name {
905            updated_instance.orchestration_name = name.clone();
906        }
907        if let Some(version) = &metadata.orchestration_version {
908            updated_instance.orchestration_version = version.clone();
909        }
910        if let Some(parent) = &metadata.parent_instance_id {
911            updated_instance.parent_instance_id = Some(parent.clone());
912        }
913        if let Some(pinned) = &metadata.pinned_duroxide_version {
914            updated_instance.pinned_duroxide_version_packed = Some(pack_semver(pinned));
915        }
916
917        // Derive custom_status from history_delta events.
918        // Scan reverse to find the last CustomStatusUpdated event.
919        let custom_status_from_delta = history_delta.iter().rev().find_map(|e| match &e.kind {
920            EventKind::CustomStatusUpdated { status } => Some(status.clone()),
921            _ => None,
922        });
923
924        match custom_status_from_delta {
925            Some(Some(custom_status)) => {
926                updated_instance.custom_status = Some(custom_status);
927                updated_instance.custom_status_version += 1;
928            }
929            Some(None) => {
930                updated_instance.custom_status = None;
931                updated_instance.custom_status_version += 1;
932            }
933            None => {
934                // No CustomStatusUpdated in delta — preserve existing value
935            }
936        }
937
938        let instance_json = serde_json::to_value(&updated_instance).map_err(|e| {
939            ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
940        })?;
941
942        // Build outbox intent JSON values
943        let outbox_json: Vec<serde_json::Value> = cross_partition_intents
944            .iter()
945            .map(|intent| serde_json::to_value(intent).unwrap())
946            .collect();
947
948        let existing_store_docs = self.load_kv_store_documents(instance_id).await?;
949        let existing_delta_docs = self.load_kv_delta_documents(instance_id).await?;
950        let delta_state = Self::apply_kv_history_delta(
951            instance_id,
952            execution_id,
953            now,
954            &history_delta,
955            &existing_store_docs,
956            &existing_delta_docs,
957        );
958        let (changed_delta_keys, clear_all) =
959            Self::kv_delta_changed_keys(&history_delta, &existing_store_docs, &existing_delta_docs);
960        let is_terminal = metadata
961            .status
962            .as_deref()
963            .is_some_and(|status| matches!(status, "Completed" | "ContinuedAsNew" | "Failed"));
964
965        // Build KV batch operations using the kv_store + kv_delta model.
966        let existing_store_keys: std::collections::HashSet<String> = existing_store_docs
967            .iter()
968            .map(|doc| doc.key.clone())
969            .collect();
970        let mut kv_ops: Vec<BatchOperation> = Vec::new();
971
972        if is_terminal {
973            let mut delta_docs: Vec<KeyValueDeltaDocument> = delta_state.into_values().collect();
974            delta_docs.sort_by(|a, b| a.key.cmp(&b.key));
975
976            for delta_doc in delta_docs {
977                match delta_doc.value.as_deref() {
978                    Some(value) => {
979                        let store_doc = KeyValueDocument::new(
980                            instance_id,
981                            &delta_doc.key,
982                            value,
983                            delta_doc.execution_id,
984                            delta_doc.last_updated_at_ms,
985                        );
986                        let json = serde_json::to_value(&store_doc).unwrap();
987                        kv_ops.push(BatchOperation::Upsert { body: json });
988                    }
989                    None => {
990                        if existing_store_keys.contains(&delta_doc.key) {
991                            kv_ops.push(BatchOperation::Delete {
992                                id: KeyValueDocument::doc_id(instance_id, &delta_doc.key),
993                            });
994                        }
995                    }
996                }
997            }
998
999            for delta_doc in &existing_delta_docs {
1000                kv_ops.push(BatchOperation::Delete {
1001                    id: delta_doc.id.clone(),
1002                });
1003            }
1004        } else {
1005            let mut keys_to_upsert: Vec<String> = if clear_all {
1006                delta_state.keys().cloned().collect()
1007            } else {
1008                changed_delta_keys.into_iter().collect()
1009            };
1010            keys_to_upsert.sort();
1011            keys_to_upsert.dedup();
1012
1013            for key in keys_to_upsert {
1014                if let Some(delta_doc) = delta_state.get(&key) {
1015                    let json = serde_json::to_value(delta_doc).unwrap();
1016                    kv_ops.push(BatchOperation::Upsert { body: json });
1017                }
1018            }
1019        }
1020
1021        // Build and execute transactional batch
1022        // Note: cancelled_doc_ids are NOT included in the batch because the
1023        // worker dispatcher may have already fetched and deleted them. A batch
1024        // delete of a non-existent doc returns 404, which fails the entire
1025        // transactional batch (424). Instead, we delete them best-effort after.
1026        let ops = batch::build_ack_batch(
1027            instance_id,
1028            execution_id,
1029            lock_token,
1030            &messages_to_delete,
1031            &history_entries,
1032            same_partition_worker,
1033            same_partition_orch,
1034            outbox_json,
1035            kv_ops,
1036            &[], // no cancelled deletes in the batch
1037            instance_json,
1038        );
1039
1040        batch::execute_batch(self.client(), instance_id, ops).await?;
1041
1042        // Best-effort delete of cancelled activity worker_queue docs.
1043        // These may already be gone if the worker consumed them before
1044        // the orchestration decided to cancel.
1045        for doc_id in &cancelled_doc_ids {
1046            let _ = self.client().delete_document(doc_id, instance_id).await;
1047        }
1048
1049        // Best-effort delivery of cross-partition intents
1050        outbox::deliver_intents_best_effort(
1051            self.client(),
1052            &cross_partition_intents,
1053            self.inner.outbox_fault_injector.as_ref(),
1054        )
1055        .await;
1056
1057        Ok(())
1058    }
1059
1060    // ─── abandon_orchestration_item ──────────────────────────────
1061
1062    async fn abandon_orchestration_item(
1063        &self,
1064        lock_token: &str,
1065        delay: Option<Duration>,
1066        ignore_attempt: bool,
1067    ) -> Result<(), ProviderError> {
1068        let now = now_ms();
1069
1070        // Find and unlock instance
1071        let instance = query::find_instance_by_lock_token(self.client(), lock_token).await?;
1072        let inst = instance.ok_or_else(|| {
1073            ProviderError::permanent("abandon_orchestration_item", "Invalid lock token")
1074        })?;
1075
1076        let mut updated = inst.clone();
1077        updated.lock_token = None;
1078        updated.locked_until = None;
1079        updated.updated_at = now;
1080        let doc_json = serde_json::to_value(&updated).unwrap();
1081        let _ = self
1082            .client()
1083            .replace_document(
1084                &updated.id,
1085                &inst.instance_id,
1086                &doc_json,
1087                inst.etag.as_deref(),
1088            )
1089            .await;
1090
1091        // Find and unlock queue messages
1092        let messages =
1093            query::find_items_by_lock_token(self.client(), lock_token, DOC_TYPE_ORCH_QUEUE).await?;
1094        for msg in &messages {
1095            let mut updated = msg.clone();
1096            updated.lock_token = None;
1097            updated.locked_until = None;
1098            if let Some(d) = delay {
1099                updated.visible_at = now + d.as_millis() as u64;
1100            }
1101            if ignore_attempt && updated.attempt_count > 0 {
1102                updated.attempt_count -= 1;
1103            }
1104
1105            let doc_json = serde_json::to_value(&updated).unwrap();
1106            let _ = self
1107                .client()
1108                .replace_document(&msg.id, &msg.instance_id, &doc_json, msg.etag.as_deref())
1109                .await;
1110        }
1111
1112        Ok(())
1113    }
1114
1115    // ─── read ────────────────────────────────────────────────────
1116
1117    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1118        let inst = self.read_instance(instance).await?;
1119        let execution_id = inst.map(|i| i.current_execution_id).unwrap_or(1);
1120        self.read_with_execution(instance, execution_id).await
1121    }
1122
1123    async fn read_with_execution(
1124        &self,
1125        instance: &str,
1126        execution_id: u64,
1127    ) -> Result<Vec<Event>, ProviderError> {
1128        let docs = query::fetch_history(self.client(), instance, execution_id).await?;
1129        let mut events = Vec::new();
1130        for doc in docs {
1131            let event: Event = serde_json::from_str(&doc.event_data).map_err(|e| {
1132                ProviderError::permanent(
1133                    "read_with_execution",
1134                    format!("Failed to deserialize event: {e}"),
1135                )
1136            })?;
1137            events.push(event);
1138        }
1139        Ok(events)
1140    }
1141
1142    async fn append_with_execution(
1143        &self,
1144        instance: &str,
1145        execution_id: u64,
1146        new_events: Vec<Event>,
1147    ) -> Result<(), ProviderError> {
1148        // Get next event ID
1149        let existing = query::fetch_history(self.client(), instance, execution_id).await?;
1150        let next_id = existing
1151            .iter()
1152            .map(|h| h.event_id)
1153            .max()
1154            .map(|m| m + 1)
1155            .unwrap_or(0);
1156
1157        for (i, event) in new_events.iter().enumerate() {
1158            let event_id = next_id + i as u64;
1159            let event_json = serde_json::to_string(event).map_err(|e| {
1160                ProviderError::permanent("append_with_execution", format!("Serialize error: {e}"))
1161            })?;
1162            let doc = HistoryDocument::new(instance, execution_id, event_id, event_json);
1163            let doc_json = serde_json::to_value(&doc).unwrap();
1164
1165            let resp = self.client().create_document(instance, &doc_json).await?;
1166            if !resp.is_success() && !errors::is_conflict(resp.status) {
1167                return Err(errors::map_cosmosdb_error(
1168                    "append_with_execution",
1169                    resp.status,
1170                    &resp.body,
1171                ));
1172            }
1173        }
1174
1175        Ok(())
1176    }
1177
1178    // ─── Worker queue operations ─────────────────────────────────
1179
1180    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
1181        let instance_id = work_item_instance(&item).to_string();
1182        let item_json = serde_json::to_string(&item).map_err(|e| {
1183            ProviderError::permanent("enqueue_for_worker", format!("Serialize error: {e}"))
1184        })?;
1185
1186        let (exec_id, activity_id, session_id, tag) = match &item {
1187            WorkItem::ActivityExecute {
1188                execution_id,
1189                id,
1190                session_id,
1191                tag,
1192                ..
1193            } => (
1194                Some(*execution_id),
1195                Some(*id),
1196                session_id.clone(),
1197                tag.clone(),
1198            ),
1199            _ => (None, None, None, None),
1200        };
1201
1202        let now = now_ms();
1203        let doc = QueueItemDocument::new_worker_queue(
1204            &instance_id,
1205            item_json,
1206            exec_id,
1207            activity_id,
1208            session_id,
1209            tag,
1210            now,
1211        );
1212        let doc_json = serde_json::to_value(&doc).unwrap();
1213
1214        let resp = self
1215            .client()
1216            .create_document(&instance_id, &doc_json)
1217            .await?;
1218        if !resp.is_success() {
1219            return Err(errors::map_cosmosdb_error(
1220                "enqueue_for_worker",
1221                resp.status,
1222                &resp.body,
1223            ));
1224        }
1225
1226        Ok(())
1227    }
1228
1229    async fn fetch_work_item(
1230        &self,
1231        lock_timeout: Duration,
1232        _poll_timeout: Duration,
1233        session: Option<&SessionFetchConfig>,
1234        tag_filter: &TagFilter,
1235    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
1236        // TagFilter::None means this worker doesn't process any activities
1237        if matches!(tag_filter, TagFilter::None) {
1238            return Ok(None);
1239        }
1240
1241        let caller_id = task_id_u64();
1242        let my_slots = self.inner.worker_leases.acquire_slots(caller_id).await;
1243        let now = now_ms();
1244
1245        let mut excluded = Vec::new();
1246
1247        for _attempt in 0..10 {
1248            let candidate = query::find_candidate_work_item(
1249                self.client(),
1250                now,
1251                &my_slots,
1252                session.map(|s| s.owner_id.as_str()),
1253                &excluded,
1254                tag_filter,
1255            )
1256            .await?;
1257
1258            let Some(candidate) = candidate else {
1259                return Ok(None);
1260            };
1261
1262            // Session routing check before locking
1263            if let Some(ref sid) = candidate.session_id {
1264                if let Some(config) = session {
1265                    // Check existing session ownership
1266                    let session_doc_id = SessionDocument::doc_id(&candidate.instance_id, sid);
1267                    if let Ok(resp) = self
1268                        .client()
1269                        .read_document(&session_doc_id, &candidate.instance_id)
1270                        .await
1271                    {
1272                        if resp.is_success() {
1273                            if let Ok(session_doc) =
1274                                serde_json::from_str::<SessionDocument>(&resp.body)
1275                            {
1276                                // Session exists - check if it's still locked by another worker
1277                                if session_doc.locked_until > now
1278                                    && session_doc.owner_id != config.owner_id
1279                                {
1280                                    // Session owned by another worker, skip this item
1281                                    excluded.push(candidate.id.clone());
1282                                    continue;
1283                                }
1284                            }
1285                        }
1286                    }
1287                } else {
1288                    // No session config but item has session - skip
1289                    excluded.push(candidate.id.clone());
1290                    continue;
1291                }
1292            }
1293
1294            // Try to lock the work item via conditional replace
1295            let etag = candidate.etag.clone().unwrap_or_default();
1296            let lock_token = uuid::Uuid::new_v4().to_string();
1297
1298            let mut updated = candidate.clone();
1299            updated.lock_token = Some(lock_token.clone());
1300            updated.locked_until = Some(now + lock_timeout.as_millis() as u64);
1301            updated.attempt_count += 1;
1302
1303            let doc_json = serde_json::to_value(&updated).map_err(|e| {
1304                ProviderError::permanent("fetch_work_item", format!("Serialize error: {e}"))
1305            })?;
1306
1307            let resp = self
1308                .client()
1309                .replace_document(
1310                    &candidate.id,
1311                    &candidate.instance_id,
1312                    &doc_json,
1313                    Some(&etag),
1314                )
1315                .await?;
1316
1317            if resp.is_success() {
1318                // If session-bound, atomically upsert session
1319                if let (Some(ref sid), Some(config)) = (&candidate.session_id, session) {
1320                    // Use fresh timestamp for session lock — network latency
1321                    // since the start of fetch_work_item can be significant
1322                    let session_now = now_ms();
1323                    let session_locked_until = session_now + config.lock_timeout.as_millis() as u64;
1324                    let session_doc_id = SessionDocument::doc_id(&candidate.instance_id, sid);
1325
1326                    // Try to read existing session
1327                    let existing_session = self
1328                        .client()
1329                        .read_document(&session_doc_id, &candidate.instance_id)
1330                        .await;
1331
1332                    let session_claimed = match existing_session {
1333                        Ok(resp) if resp.is_success() => {
1334                            // Session exists, check ownership
1335                            match serde_json::from_str::<SessionDocument>(&resp.body) {
1336                                Ok(mut existing) => {
1337                                    if existing.locked_until <= session_now
1338                                        || existing.owner_id == config.owner_id
1339                                    {
1340                                        // Expired or we own it - update
1341                                        existing.owner_id = config.owner_id.clone();
1342                                        existing.locked_until = session_locked_until;
1343                                        existing.last_activity = session_now;
1344                                        let session_json = serde_json::to_value(&existing).unwrap();
1345                                        let update_resp = self
1346                                            .client()
1347                                            .replace_document(
1348                                                &session_doc_id,
1349                                                &candidate.instance_id,
1350                                                &session_json,
1351                                                resp.etag.as_deref(),
1352                                            )
1353                                            .await;
1354                                        update_resp.map(|r| r.is_success()).unwrap_or(false)
1355                                    } else {
1356                                        // Another worker owns this session
1357                                        false
1358                                    }
1359                                }
1360                                Err(_) => false,
1361                            }
1362                        }
1363                        _ => {
1364                            // Session doesn't exist, create it
1365                            let new_session = SessionDocument {
1366                                id: session_doc_id.clone(),
1367                                instance_id: candidate.instance_id.clone(),
1368                                doc_type: DOC_TYPE_SESSION.to_string(),
1369                                session_id: sid.clone(),
1370                                owner_id: config.owner_id.clone(),
1371                                locked_until: session_locked_until,
1372                                last_activity: session_now,
1373                                created_at: session_now,
1374                                etag: None,
1375                                rid: None,
1376                                self_link: None,
1377                                ts: None,
1378                                attachments: None,
1379                            };
1380                            let session_json = serde_json::to_value(&new_session).unwrap();
1381                            let create_resp = self
1382                                .client()
1383                                .create_document(&candidate.instance_id, &session_json)
1384                                .await;
1385                            match create_resp {
1386                                Ok(r) => r.is_success(),
1387                                Err(_) => false,
1388                            }
1389                        }
1390                    };
1391
1392                    if !session_claimed {
1393                        // Failed to claim session - unlock work item and skip
1394                        let mut rollback = updated.clone();
1395                        rollback.lock_token = None;
1396                        rollback.locked_until = None;
1397                        rollback.attempt_count -= 1;
1398                        let rollback_json = serde_json::to_value(&rollback).unwrap();
1399                        let _ = self
1400                            .client()
1401                            .replace_document(
1402                                &candidate.id,
1403                                &candidate.instance_id,
1404                                &rollback_json,
1405                                None, // no etag check for rollback
1406                            )
1407                            .await;
1408                        excluded.push(candidate.id.clone());
1409                        continue;
1410                    }
1411                }
1412
1413                // Parse the work item
1414                let work_item: WorkItem =
1415                    serde_json::from_str(&candidate.work_item).map_err(|e| {
1416                        ProviderError::permanent(
1417                            "fetch_work_item",
1418                            format!("Failed to deserialize work item: {e}"),
1419                        )
1420                    })?;
1421
1422                return Ok(Some((work_item, lock_token, updated.attempt_count as u32)));
1423            } else if errors::is_precondition_failed(resp.status)
1424                || errors::is_conflict(resp.status)
1425            {
1426                excluded.push(candidate.id.clone());
1427                continue;
1428            } else {
1429                return Err(errors::map_cosmosdb_error(
1430                    "fetch_work_item",
1431                    resp.status,
1432                    &resp.body,
1433                ));
1434            }
1435        }
1436
1437        Ok(None)
1438    }
1439
1440    async fn ack_work_item(
1441        &self,
1442        token: &str,
1443        completion: Option<WorkItem>,
1444    ) -> Result<(), ProviderError> {
1445        let now = now_ms();
1446
1447        // Find the locked worker item
1448        let items =
1449            query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1450
1451        let item = items.first().ok_or_else(|| {
1452            ProviderError::permanent(
1453                "ack_work_item",
1454                "Activity was cancelled or lock expired (worker queue row not found or lock invalid)",
1455            )
1456        })?;
1457
1458        // Verify lock hasn't expired
1459        if let Some(locked_until) = item.locked_until {
1460            if locked_until <= now {
1461                return Err(ProviderError::permanent(
1462                    "ack_work_item",
1463                    "Activity was cancelled or lock expired (worker queue row not found or lock invalid)",
1464                ));
1465            }
1466        }
1467
1468        let instance_id = &item.instance_id;
1469        let session_id = item.session_id.clone();
1470
1471        if let Some(completion_item) = completion {
1472            // Create completion in orch_queue + delete worker item in same partition
1473            let target_instance = work_item_instance(&completion_item).to_string();
1474            let item_json = serde_json::to_string(&completion_item).map_err(|e| {
1475                ProviderError::permanent("ack_work_item", format!("Serialize error: {e}"))
1476            })?;
1477
1478            if target_instance == *instance_id {
1479                // Same partition: transactional batch
1480                let orch_doc =
1481                    QueueItemDocument::new_orch_queue(&target_instance, item_json, now, now);
1482                let orch_json = serde_json::to_value(&orch_doc).unwrap();
1483
1484                let ops = vec![
1485                    BatchOperation::Delete {
1486                        id: item.id.clone(),
1487                    },
1488                    BatchOperation::Create { body: orch_json },
1489                ];
1490                batch::execute_batch(self.client(), instance_id, ops).await?;
1491            } else {
1492                // Different partition: delete worker item, then create orch item separately
1493                let _ = self.client().delete_document(&item.id, instance_id).await?;
1494
1495                let orch_doc =
1496                    QueueItemDocument::new_orch_queue(&target_instance, item_json, now, now);
1497                let orch_json = serde_json::to_value(&orch_doc).unwrap();
1498
1499                let resp = self
1500                    .client()
1501                    .create_document(&target_instance, &orch_json)
1502                    .await?;
1503                if !resp.is_success() {
1504                    return Err(errors::map_cosmosdb_error(
1505                        "ack_work_item",
1506                        resp.status,
1507                        &resp.body,
1508                    ));
1509                }
1510            }
1511        } else {
1512            // No completion: just delete the worker item (cancelled)
1513            let _ = self.client().delete_document(&item.id, instance_id).await?;
1514        }
1515
1516        // Piggyback: update last_activity_at for session-bound items
1517        if let Some(ref sid) = session_id {
1518            let session_doc_id = SessionDocument::doc_id(instance_id, sid);
1519            let piggyback_now = now_ms(); // Fresh timestamp for accurate idle tracking
1520            if let Ok(resp) = self
1521                .client()
1522                .read_document(&session_doc_id, instance_id)
1523                .await
1524            {
1525                if resp.is_success() {
1526                    if let Ok(mut session_doc) = serde_json::from_str::<SessionDocument>(&resp.body)
1527                    {
1528                        if session_doc.locked_until > piggyback_now {
1529                            session_doc.last_activity = piggyback_now;
1530                            session_doc.etag = resp.etag;
1531                            let doc_json = serde_json::to_value(&session_doc).unwrap();
1532                            let _ = self
1533                                .client()
1534                                .replace_document(
1535                                    &session_doc_id,
1536                                    instance_id,
1537                                    &doc_json,
1538                                    session_doc.etag.as_deref(),
1539                                )
1540                                .await;
1541                        }
1542                    }
1543                }
1544            }
1545        }
1546
1547        Ok(())
1548    }
1549
1550    async fn renew_work_item_lock(
1551        &self,
1552        token: &str,
1553        extend_for: Duration,
1554    ) -> Result<(), ProviderError> {
1555        let now = now_ms();
1556        let items =
1557            query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1558
1559        let item = items.first().ok_or_else(|| {
1560            ProviderError::permanent(
1561                "renew_work_item_lock",
1562                format!("No worker item found with lock token {token}"),
1563            )
1564        })?;
1565
1566        // Check if expired
1567        if let Some(locked_until) = item.locked_until {
1568            if locked_until <= now {
1569                return Err(ProviderError::permanent(
1570                    "renew_work_item_lock",
1571                    "Lock has expired".to_string(),
1572                ));
1573            }
1574        }
1575
1576        let mut updated = item.clone();
1577        updated.locked_until = Some(now + extend_for.as_millis() as u64);
1578
1579        let doc_json = serde_json::to_value(&updated).unwrap();
1580        let resp = self
1581            .client()
1582            .replace_document(&item.id, &item.instance_id, &doc_json, item.etag.as_deref())
1583            .await?;
1584
1585        if resp.is_success() {
1586            // Piggyback: update last_activity_at for session-bound items
1587            if let Some(ref sid) = item.session_id {
1588                let piggyback_now = now_ms(); // Fresh timestamp for accurate idle tracking
1589                let session_doc_id = SessionDocument::doc_id(&item.instance_id, sid);
1590                if let Ok(sess_resp) = self
1591                    .client()
1592                    .read_document(&session_doc_id, &item.instance_id)
1593                    .await
1594                {
1595                    if sess_resp.is_success() {
1596                        if let Ok(mut session_doc) =
1597                            serde_json::from_str::<SessionDocument>(&sess_resp.body)
1598                        {
1599                            if session_doc.locked_until > piggyback_now {
1600                                session_doc.last_activity = piggyback_now;
1601                                let sess_json = serde_json::to_value(&session_doc).unwrap();
1602                                let _ = self
1603                                    .client()
1604                                    .replace_document(
1605                                        &session_doc_id,
1606                                        &item.instance_id,
1607                                        &sess_json,
1608                                        sess_resp.etag.as_deref(),
1609                                    )
1610                                    .await;
1611                            }
1612                        }
1613                    }
1614                }
1615            }
1616            Ok(())
1617        } else {
1618            Err(errors::map_cosmosdb_error(
1619                "renew_work_item_lock",
1620                resp.status,
1621                &resp.body,
1622            ))
1623        }
1624    }
1625
1626    async fn renew_session_lock(
1627        &self,
1628        owner_ids: &[&str],
1629        extend_for: Duration,
1630        idle_timeout: Duration,
1631    ) -> Result<usize, ProviderError> {
1632        if owner_ids.is_empty() {
1633            return Ok(0);
1634        }
1635
1636        let now = now_ms();
1637        let locked_until = now + extend_for.as_millis() as u64;
1638        let idle_cutoff = now.saturating_sub(idle_timeout.as_millis() as u64);
1639
1640        // Query all session documents
1641        let sql = format!("SELECT * FROM c WHERE c.type = '{}'", DOC_TYPE_SESSION);
1642        let results = self.client().query(&sql, vec![], None).await?;
1643
1644        let mut count = 0usize;
1645        for doc in results {
1646            if let Ok(session) = serde_json::from_value::<SessionDocument>(doc) {
1647                // Only renew sessions that:
1648                // 1. Are owned by one of our worker IDs
1649                // 2. Have not expired yet
1650                // 3. Have recent activity (not idle)
1651                if owner_ids.contains(&session.owner_id.as_str())
1652                    && session.locked_until > now
1653                    && session.last_activity > idle_cutoff
1654                {
1655                    let mut updated = session.clone();
1656                    updated.locked_until = locked_until;
1657                    let doc_json = serde_json::to_value(&updated).unwrap();
1658                    if let Ok(resp) = self
1659                        .client()
1660                        .replace_document(
1661                            &session.id,
1662                            &session.instance_id,
1663                            &doc_json,
1664                            session.etag.as_deref(),
1665                        )
1666                        .await
1667                    {
1668                        if resp.is_success() {
1669                            count += 1;
1670                        }
1671                    }
1672                }
1673            }
1674        }
1675
1676        Ok(count)
1677    }
1678
1679    async fn cleanup_orphaned_sessions(
1680        &self,
1681        _idle_timeout: Duration,
1682    ) -> Result<usize, ProviderError> {
1683        let now = now_ms();
1684
1685        // Query all expired sessions
1686        let sql = format!(
1687            "SELECT * FROM c WHERE c.type = '{}' AND c.lockedUntil < @now",
1688            DOC_TYPE_SESSION
1689        );
1690        let params = vec![crate::client::QueryParameter::new(
1691            "@now",
1692            serde_json::json!(now),
1693        )];
1694        let results = self.client().query(&sql, params, None).await?;
1695
1696        let mut count = 0usize;
1697        for doc in results {
1698            if let Ok(session) = serde_json::from_value::<SessionDocument>(doc) {
1699                // Check if there are any pending worker queue items for this session
1700                let check_sql = format!(
1701                    "SELECT VALUE COUNT(1) FROM c WHERE c.type = '{}' AND c.sessionId = @sid AND c.instanceId = @iid",
1702                    DOC_TYPE_WORKER_QUEUE
1703                );
1704                let check_params = vec![
1705                    crate::client::QueryParameter::new(
1706                        "@sid",
1707                        serde_json::json!(&session.session_id),
1708                    ),
1709                    crate::client::QueryParameter::new(
1710                        "@iid",
1711                        serde_json::json!(&session.instance_id),
1712                    ),
1713                ];
1714                let worker_count = self
1715                    .client()
1716                    .query(&check_sql, check_params, Some(&session.instance_id))
1717                    .await?;
1718                let has_items = worker_count
1719                    .into_iter()
1720                    .next()
1721                    .and_then(|v| v.as_u64())
1722                    .unwrap_or(0)
1723                    > 0;
1724
1725                if !has_items {
1726                    // No pending work items - delete the session
1727                    let _ = self
1728                        .client()
1729                        .delete_document(&session.id, &session.instance_id)
1730                        .await;
1731                    count += 1;
1732                }
1733            }
1734        }
1735
1736        Ok(count)
1737    }
1738
1739    async fn abandon_work_item(
1740        &self,
1741        token: &str,
1742        delay: Option<Duration>,
1743        ignore_attempt: bool,
1744    ) -> Result<(), ProviderError> {
1745        let now = now_ms();
1746        let items =
1747            query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1748
1749        for item in &items {
1750            let mut updated = item.clone();
1751            updated.lock_token = None;
1752            updated.locked_until = None;
1753            if let Some(d) = delay {
1754                updated.visible_at = now + d.as_millis() as u64;
1755            }
1756            if ignore_attempt && updated.attempt_count > 0 {
1757                updated.attempt_count -= 1;
1758            }
1759
1760            let doc_json = serde_json::to_value(&updated).unwrap();
1761            let _ = self
1762                .client()
1763                .replace_document(&item.id, &item.instance_id, &doc_json, item.etag.as_deref())
1764                .await;
1765        }
1766
1767        Ok(())
1768    }
1769
1770    async fn renew_orchestration_item_lock(
1771        &self,
1772        token: &str,
1773        extend_for: Duration,
1774    ) -> Result<(), ProviderError> {
1775        let now = now_ms();
1776
1777        // Renew instance lock
1778        let instance = query::find_instance_by_lock_token(self.client(), token).await?;
1779        let inst = instance.ok_or_else(|| {
1780            ProviderError::permanent(
1781                "renew_orchestration_item_lock",
1782                format!("No instance found with lock token {token}"),
1783            )
1784        })?;
1785
1786        // Check if expired
1787        if let Some(locked_until) = inst.locked_until {
1788            if locked_until <= now {
1789                return Err(ProviderError::permanent(
1790                    "renew_orchestration_item_lock",
1791                    "Lock has expired".to_string(),
1792                ));
1793            }
1794        }
1795
1796        let mut updated_inst = inst.clone();
1797        updated_inst.locked_until = Some(now + extend_for.as_millis() as u64);
1798        let doc_json = serde_json::to_value(&updated_inst).unwrap();
1799        let resp = self
1800            .client()
1801            .replace_document(&inst.id, &inst.instance_id, &doc_json, inst.etag.as_deref())
1802            .await?;
1803
1804        if !resp.is_success() {
1805            return Err(errors::map_cosmosdb_error(
1806                "renew_orchestration_item_lock",
1807                resp.status,
1808                &resp.body,
1809            ));
1810        }
1811
1812        // Renew locks on all tagged queue messages
1813        let messages =
1814            query::find_items_by_lock_token(self.client(), token, DOC_TYPE_ORCH_QUEUE).await?;
1815        for msg in &messages {
1816            let mut updated = msg.clone();
1817            updated.locked_until = Some(now + extend_for.as_millis() as u64);
1818            let doc_json = serde_json::to_value(&updated).unwrap();
1819            let _ = self
1820                .client()
1821                .replace_document(&msg.id, &msg.instance_id, &doc_json, msg.etag.as_deref())
1822                .await;
1823        }
1824
1825        Ok(())
1826    }
1827
1828    // ─── Orchestrator queue ──────────────────────────────────────
1829
1830    async fn enqueue_for_orchestrator(
1831        &self,
1832        item: WorkItem,
1833        delay: Option<Duration>,
1834    ) -> Result<(), ProviderError> {
1835        let instance_id = work_item_instance(&item).to_string();
1836        let item_json = serde_json::to_string(&item).map_err(|e| {
1837            ProviderError::permanent("enqueue_for_orchestrator", format!("Serialize error: {e}"))
1838        })?;
1839
1840        let now = now_ms();
1841        let visible_at = delay.map(|d| now + d.as_millis() as u64).unwrap_or(now);
1842
1843        let doc = QueueItemDocument::new_orch_queue(&instance_id, item_json, visible_at, now);
1844        let doc_json = serde_json::to_value(&doc).unwrap();
1845
1846        let resp = self
1847            .client()
1848            .create_document(&instance_id, &doc_json)
1849            .await?;
1850        if !resp.is_success() {
1851            return Err(errors::map_cosmosdb_error(
1852                "enqueue_for_orchestrator",
1853                resp.status,
1854                &resp.body,
1855            ));
1856        }
1857
1858        Ok(())
1859    }
1860
1861    // ─── Management capability ───────────────────────────────────
1862
1863    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1864        Some(self)
1865    }
1866
1867    // ─── Custom status ───────────────────────────────────────────
1868
1869    async fn get_custom_status(
1870        &self,
1871        instance: &str,
1872        last_seen_version: u64,
1873    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1874        let inst = self.read_instance(instance).await?;
1875        match inst {
1876            Some(i) => {
1877                if i.custom_status_version > last_seen_version {
1878                    Ok(Some((i.custom_status, i.custom_status_version)))
1879                } else {
1880                    Ok(None)
1881                }
1882            }
1883            None => Ok(None),
1884        }
1885    }
1886
1887    async fn get_kv_value(
1888        &self,
1889        instance_id: &str,
1890        key: &str,
1891    ) -> Result<Option<String>, ProviderError> {
1892        let delta_doc_id = KeyValueDeltaDocument::doc_id(instance_id, key);
1893        let delta_resp = self
1894            .client()
1895            .read_document(&delta_doc_id, instance_id)
1896            .await?;
1897
1898        if delta_resp.is_success() {
1899            let doc: KeyValueDeltaDocument =
1900                serde_json::from_str(&delta_resp.body).map_err(|e| {
1901                    ProviderError::permanent("get_kv_value", format!("Deserialize error: {e}"))
1902                })?;
1903            return Ok(doc.value);
1904        }
1905        if !errors::is_not_found(delta_resp.status) {
1906            return Err(errors::map_cosmosdb_error(
1907                "get_kv_value",
1908                delta_resp.status,
1909                &delta_resp.body,
1910            ));
1911        }
1912
1913        let store_doc_id = KeyValueDocument::doc_id(instance_id, key);
1914        let store_resp = self
1915            .client()
1916            .read_document(&store_doc_id, instance_id)
1917            .await?;
1918
1919        if errors::is_not_found(store_resp.status) {
1920            return Ok(None);
1921        }
1922        if !store_resp.is_success() {
1923            return Err(errors::map_cosmosdb_error(
1924                "get_kv_value",
1925                store_resp.status,
1926                &store_resp.body,
1927            ));
1928        }
1929
1930        let doc: KeyValueDocument = serde_json::from_str(&store_resp.body).map_err(|e| {
1931            ProviderError::permanent("get_kv_value", format!("Deserialize error: {e}"))
1932        })?;
1933        Ok(Some(doc.value))
1934    }
1935
1936    async fn get_kv_all_values(
1937        &self,
1938        instance_id: &str,
1939    ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1940        let mut map: std::collections::HashMap<String, String> = self
1941            .load_kv_store_documents(instance_id)
1942            .await?
1943            .into_iter()
1944            .map(|doc| (doc.key, doc.value))
1945            .collect();
1946
1947        for delta_doc in self.load_kv_delta_documents(instance_id).await? {
1948            match delta_doc.value {
1949                Some(value) => {
1950                    map.insert(delta_doc.key, value);
1951                }
1952                None => {
1953                    map.remove(&delta_doc.key);
1954                }
1955            }
1956        }
1957
1958        Ok(map)
1959    }
1960
1961    async fn get_instance_stats(
1962        &self,
1963        instance_id: &str,
1964    ) -> Result<Option<duroxide::SystemStats>, ProviderError> {
1965        let inst = match self.read_instance(instance_id).await? {
1966            Some(inst) => inst,
1967            None => return Ok(None),
1968        };
1969
1970        let history_docs =
1971            query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_HISTORY)
1972                .await?
1973                .into_iter()
1974                .map(|doc| {
1975                    serde_json::from_value::<HistoryDocument>(doc).map_err(|e| {
1976                        ProviderError::permanent(
1977                            "get_instance_stats",
1978                            format!("Deserialize history document error: {e}"),
1979                        )
1980                    })
1981                })
1982                .collect::<Result<Vec<_>, _>>()?;
1983
1984        let (history_event_count, history_size_bytes) = history_docs
1985            .iter()
1986            .filter(|doc| doc.execution_id == inst.current_execution_id)
1987            .fold((0_u64, 0_u64), |(count, size), doc| {
1988                (count + 1, size + doc.event_data.len() as u64)
1989            });
1990
1991        let kv_values = self.get_kv_all_values(instance_id).await?;
1992        let kv_user_key_count = kv_values.len() as u64;
1993        let kv_total_value_bytes = kv_values.values().map(|value| value.len() as u64).sum();
1994
1995        let queue_pending_count = match history_docs
1996            .iter()
1997            .find(|doc| doc.execution_id == inst.current_execution_id && doc.event_id == 1)
1998        {
1999            Some(doc) => {
2000                let event = serde_json::from_str::<Event>(&doc.event_data).map_err(|e| {
2001                    ProviderError::permanent(
2002                        "get_instance_stats",
2003                        format!("Failed to deserialize OrchestrationStarted event: {e}"),
2004                    )
2005                })?;
2006                match event.kind {
2007                    EventKind::OrchestrationStarted {
2008                        carry_forward_events: Some(events),
2009                        ..
2010                    } => events.len() as u64,
2011                    _ => 0,
2012                }
2013            }
2014            None => 0,
2015        };
2016
2017        Ok(Some(duroxide::SystemStats {
2018            history_event_count,
2019            history_size_bytes,
2020            queue_pending_count,
2021            kv_user_key_count,
2022            kv_total_value_bytes,
2023        }))
2024    }
2025}
2026
2027// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2028// ProviderAdmin trait implementation
2029// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2030
2031#[async_trait::async_trait]
2032impl ProviderAdmin for CosmosDBProvider {
2033    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
2034        let instances = query::query_instances(self.client(), None).await?;
2035        Ok(instances.iter().map(|i| i.instance_id.clone()).collect())
2036    }
2037
2038    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
2039        let instances = query::query_instances(self.client(), Some(status)).await?;
2040        Ok(instances.iter().map(|i| i.instance_id.clone()).collect())
2041    }
2042
2043    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
2044        let sql = format!(
2045            "SELECT DISTINCT VALUE c.executionId FROM c \
2046             WHERE c.instanceId = @instanceId AND c.type = '{}'",
2047            DOC_TYPE_HISTORY
2048        );
2049        let params = vec![crate::client::QueryParameter::new(
2050            "@instanceId",
2051            serde_json::json!(instance),
2052        )];
2053        let results = self.client().query(&sql, params, Some(instance)).await?;
2054        let mut exec_ids: Vec<u64> = results.into_iter().filter_map(|v| v.as_u64()).collect();
2055        exec_ids.sort();
2056
2057        if exec_ids.is_empty() {
2058            // Check if instance exists
2059            let inst = self.read_instance(instance).await?;
2060            if let Some(i) = inst {
2061                exec_ids.push(i.current_execution_id);
2062            }
2063        }
2064
2065        Ok(exec_ids)
2066    }
2067
2068    async fn read_history_with_execution_id(
2069        &self,
2070        instance: &str,
2071        execution_id: u64,
2072    ) -> Result<Vec<Event>, ProviderError> {
2073        self.read_with_execution(instance, execution_id).await
2074    }
2075
2076    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
2077        self.read(instance).await
2078    }
2079
2080    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
2081        let inst = self.read_instance(instance).await?.ok_or_else(|| {
2082            ProviderError::permanent(
2083                "latest_execution_id",
2084                format!("Instance {instance} not found"),
2085            )
2086        })?;
2087        Ok(inst.current_execution_id)
2088    }
2089
2090    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
2091        let inst = self.read_instance(instance).await?.ok_or_else(|| {
2092            ProviderError::permanent(
2093                "get_instance_info",
2094                format!("Instance {instance} not found"),
2095            )
2096        })?;
2097
2098        Ok(InstanceInfo {
2099            instance_id: inst.instance_id,
2100            orchestration_name: inst.orchestration_name,
2101            orchestration_version: inst.orchestration_version,
2102            current_execution_id: inst.current_execution_id,
2103            status: inst.status,
2104            output: inst.output,
2105            created_at: inst.created_at,
2106            updated_at: inst.updated_at,
2107            parent_instance_id: inst.parent_instance_id,
2108        })
2109    }
2110
2111    async fn get_execution_info(
2112        &self,
2113        instance: &str,
2114        execution_id: u64,
2115    ) -> Result<ExecutionInfo, ProviderError> {
2116        let events = self.read_with_execution(instance, execution_id).await?;
2117        let inst = self.read_instance(instance).await?;
2118
2119        let event_count = events.len();
2120        let started_at = inst.as_ref().map(|i| i.created_at).unwrap_or(0);
2121
2122        let status = if let Some(i) = &inst {
2123            if i.current_execution_id == execution_id {
2124                i.status.clone()
2125            } else {
2126                "ContinuedAsNew".to_string()
2127            }
2128        } else {
2129            "Unknown".to_string()
2130        };
2131
2132        let completed_at = if status == "Running" {
2133            None
2134        } else {
2135            inst.as_ref().map(|i| i.updated_at)
2136        };
2137
2138        Ok(ExecutionInfo {
2139            execution_id,
2140            status,
2141            output: inst.and_then(|i| i.output),
2142            started_at,
2143            completed_at,
2144            event_count,
2145        })
2146    }
2147
2148    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
2149        let instances = query::query_instances(self.client(), None).await?;
2150
2151        let total = instances.len() as u64;
2152        let running = instances.iter().filter(|i| i.status == "Running").count() as u64;
2153        let completed = instances.iter().filter(|i| i.status == "Completed").count() as u64;
2154        let failed = instances.iter().filter(|i| i.status == "Failed").count() as u64;
2155
2156        // Count executions and events
2157        let total_events =
2158            query::count_by_type(self.client(), DOC_TYPE_HISTORY, None).await? as u64;
2159
2160        Ok(SystemMetrics {
2161            total_instances: total,
2162            total_executions: total, // Approximation — each instance has at least 1
2163            running_instances: running,
2164            completed_instances: completed,
2165            failed_instances: failed,
2166            total_events,
2167        })
2168    }
2169
2170    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
2171        let now = now_ms();
2172        let now_filter = format!("c.visibleAt <= {now} AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= {now})");
2173
2174        let orch =
2175            query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&now_filter)).await?;
2176
2177        let worker =
2178            query::count_by_type(self.client(), DOC_TYPE_WORKER_QUEUE, Some(&now_filter)).await?;
2179
2180        // Timer queue: orch queue items with visibleAt > now
2181        let timer_filter = format!("c.visibleAt > {now}");
2182        let timer =
2183            query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&timer_filter)).await?;
2184
2185        Ok(QueueDepths {
2186            orchestrator_queue: orch,
2187            worker_queue: worker,
2188            timer_queue: timer,
2189        })
2190    }
2191
2192    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2193        let sql = format!(
2194            "SELECT c.instanceId FROM c WHERE c.type = '{}' AND c.parentInstanceId = @parentId",
2195            DOC_TYPE_INSTANCE
2196        );
2197        let params = vec![crate::client::QueryParameter::new(
2198            "@parentId",
2199            serde_json::json!(instance_id),
2200        )];
2201        let results = self.client().query(&sql, params, None).await?;
2202        Ok(results
2203            .into_iter()
2204            .filter_map(|v| {
2205                v.get("instanceId")
2206                    .and_then(|id| id.as_str())
2207                    .map(|s| s.to_string())
2208            })
2209            .collect())
2210    }
2211
2212    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2213        let inst = self.read_instance(instance_id).await?.ok_or_else(|| {
2214            ProviderError::permanent("get_parent_id", format!("Instance {instance_id} not found"))
2215        })?;
2216        Ok(inst.parent_instance_id)
2217    }
2218
2219    async fn delete_instances_atomic(
2220        &self,
2221        ids: &[String],
2222        force: bool,
2223    ) -> Result<DeleteInstanceResult, ProviderError> {
2224        if ids.is_empty() {
2225            return Ok(DeleteInstanceResult::default());
2226        }
2227
2228        // Phase 1: Pre-checks (before any mutations)
2229        // Check running status for all instances
2230        if !force {
2231            for id in ids {
2232                if let Some(inst) = self.read_instance(id).await? {
2233                    if inst.status == "Running" {
2234                        return Err(ProviderError::permanent(
2235                            "delete_instances_atomic",
2236                            format!("Instance {id} is still running. Use force=true to delete anyway, or cancel first."),
2237                        ));
2238                    }
2239                }
2240            }
2241        }
2242
2243        // Orphan detection: check if any instance not in our delete set
2244        // has a parent in our delete set
2245        let id_set: std::collections::HashSet<&String> = ids.iter().collect();
2246        for id in ids {
2247            let children = self.list_children(id).await.unwrap_or_default();
2248            for child in &children {
2249                if !id_set.contains(child) {
2250                    return Err(ProviderError::permanent(
2251                        "delete_instances_atomic",
2252                        format!(
2253                            "Cannot delete: instance {id} has child {child} that was created after tree traversal. \
2254                             Re-fetch the tree and retry."
2255                        ),
2256                    ));
2257                }
2258            }
2259        }
2260
2261        // Phase 2: Count resources and delete
2262        let mut result = DeleteInstanceResult::default();
2263
2264        for id in ids {
2265            // Count events in this partition
2266            let history_filter = format!("c.instanceId = '{id}'");
2267            let events_count =
2268                query::count_by_type(self.client(), DOC_TYPE_HISTORY, Some(&history_filter))
2269                    .await
2270                    .unwrap_or(0) as u64;
2271
2272            let orch_q_count =
2273                query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&history_filter))
2274                    .await
2275                    .unwrap_or(0) as u64;
2276
2277            let worker_q_count =
2278                query::count_by_type(self.client(), DOC_TYPE_WORKER_QUEUE, Some(&history_filter))
2279                    .await
2280                    .unwrap_or(0) as u64;
2281
2282            // Count distinct execution IDs
2283            let exec_count = if self.read_instance(id).await?.is_some() {
2284                1u64
2285            } else {
2286                0u64
2287            };
2288
2289            // Get all documents in this partition and delete them
2290            let docs = query::query_all_in_partition(self.client(), id).await?;
2291            for doc in &docs {
2292                if let Some(doc_id) = doc.get("id").and_then(|v| v.as_str()) {
2293                    let _ = self.client().delete_document(doc_id, id).await;
2294                }
2295            }
2296
2297            result.instances_deleted += 1;
2298            result.executions_deleted += exec_count;
2299            result.events_deleted += events_count;
2300            result.queue_messages_deleted += orch_q_count + worker_q_count;
2301        }
2302
2303        Ok(result)
2304    }
2305
2306    async fn delete_instance_bulk(
2307        &self,
2308        filter: InstanceFilter,
2309    ) -> Result<DeleteInstanceResult, ProviderError> {
2310        // Find eligible instances: root instances (no parent) in terminal states
2311        let all_instances = query::query_instances(self.client(), None).await?;
2312
2313        let mut candidates: Vec<InstanceDocument> = all_instances
2314            .into_iter()
2315            .filter(|inst| {
2316                // Only root instances (no parent)
2317                inst.parent_instance_id.is_none()
2318                // Only terminal states
2319                && (inst.status == "Completed" || inst.status == "Failed" || inst.status == "ContinuedAsNew")
2320            })
2321            .collect();
2322
2323        // Apply instance_ids filter if provided
2324        if let Some(ref ids) = filter.instance_ids {
2325            if ids.is_empty() {
2326                return Ok(DeleteInstanceResult::default());
2327            }
2328            candidates.retain(|inst| ids.contains(&inst.instance_id));
2329        }
2330
2331        // Apply completed_before filter
2332        if let Some(before) = filter.completed_before {
2333            candidates.retain(|inst| inst.updated_at < before);
2334        }
2335
2336        // Apply limit
2337        let limit = filter.limit.unwrap_or(1000) as usize;
2338        candidates.truncate(limit);
2339
2340        if candidates.is_empty() {
2341            return Ok(DeleteInstanceResult::default());
2342        }
2343
2344        // Delete each instance (with cascade) using get_instance_tree
2345        let mut result = DeleteInstanceResult::default();
2346        for inst in &candidates {
2347            let tree = self.get_instance_tree(&inst.instance_id).await?;
2348            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
2349            result.instances_deleted += delete_result.instances_deleted;
2350            result.executions_deleted += delete_result.executions_deleted;
2351            result.events_deleted += delete_result.events_deleted;
2352            result.queue_messages_deleted += delete_result.queue_messages_deleted;
2353        }
2354
2355        Ok(result)
2356    }
2357
2358    async fn prune_executions(
2359        &self,
2360        instance_id: &str,
2361        options: PruneOptions,
2362    ) -> Result<PruneResult, ProviderError> {
2363        let inst = self.read_instance(instance_id).await?.ok_or_else(|| {
2364            ProviderError::permanent(
2365                "prune_executions",
2366                format!("Instance {instance_id} not found"),
2367            )
2368        })?;
2369
2370        let current_exec = inst.current_execution_id;
2371        let mut all_execs = self.list_executions(instance_id).await?;
2372        all_execs.sort();
2373
2374        // Determine which executions to prune:
2375        // 1. Never prune the current execution
2376        // 2. Never prune running executions (current is always running if status == Running)
2377        // 3. keep_last: keep the top N executions (by execution_id), prune the rest
2378        //    (current is always in the top N since it's the highest)
2379        let mut protected: std::collections::HashSet<u64> = std::collections::HashSet::new();
2380        protected.insert(current_exec);
2381
2382        if let Some(keep_last) = options.keep_last {
2383            let keep = keep_last as usize;
2384            // Keep the top N by execution_id (highest N)
2385            let skip = all_execs.len().saturating_sub(keep);
2386            for &exec_id in &all_execs[skip..] {
2387                protected.insert(exec_id);
2388            }
2389        }
2390
2391        let to_prune: Vec<u64> = all_execs
2392            .into_iter()
2393            .filter(|e| !protected.contains(e))
2394            .collect();
2395
2396        let mut events_deleted = 0u64;
2397        let mut execs_deleted = 0u64;
2398
2399        for exec_id in &to_prune {
2400            let docs = query::fetch_history(self.client(), instance_id, *exec_id).await?;
2401            for doc in &docs {
2402                let _ = self.client().delete_document(&doc.id, instance_id).await;
2403                events_deleted += 1;
2404            }
2405
2406            // KV entries are instance-scoped and must survive execution pruning.
2407
2408            execs_deleted += 1;
2409        }
2410
2411        Ok(PruneResult {
2412            instances_processed: 1,
2413            executions_deleted: execs_deleted,
2414            events_deleted,
2415        })
2416    }
2417
2418    async fn prune_executions_bulk(
2419        &self,
2420        filter: InstanceFilter,
2421        options: PruneOptions,
2422    ) -> Result<PruneResult, ProviderError> {
2423        let instances = if let Some(ids) = &filter.instance_ids {
2424            ids.clone()
2425        } else {
2426            self.list_instances().await?
2427        };
2428
2429        let mut total = PruneResult::default();
2430        for id in &instances {
2431            match self.prune_executions(id, options.clone()).await {
2432                Ok(r) => {
2433                    total.instances_processed += r.instances_processed;
2434                    total.executions_deleted += r.executions_deleted;
2435                    total.events_deleted += r.events_deleted;
2436                }
2437                Err(_) => continue,
2438            }
2439        }
2440
2441        Ok(total)
2442    }
2443}