1use crate::batch;
2use crate::client::{BatchOperation, CosmosDBClient};
3use crate::containers;
4use crate::errors;
5use crate::leases::{InMemoryLeaseProvider, LeaseProvider};
6use crate::models::*;
7use crate::outbox;
8use crate::outbox::OutboxFaultInjector;
9use crate::query;
10use duroxide::providers::*;
11use duroxide::{Event, EventKind, TagFilter};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_util::sync::CancellationToken;
15
16const MAX_LOCK_RETRIES: usize = 20;
17
18#[derive(Debug, Clone)]
20pub struct CosmosDBProviderConfig {
21 pub endpoint: String,
22 pub key: String,
23 pub database: String,
24 pub container: String,
25 pub orch_concurrency: u32,
26 pub worker_concurrency: u32,
27 pub reconciler_interval: Duration,
28 pub reconciler_age_threshold: Duration,
29}
30
31impl Default for CosmosDBProviderConfig {
32 fn default() -> Self {
33 Self {
34 endpoint: String::new(),
35 key: String::new(),
36 database: "duroxide".to_string(),
37 container: "duroxide".to_string(),
38 orch_concurrency: 1,
39 worker_concurrency: 1,
40 reconciler_interval: Duration::from_secs(2),
41 reconciler_age_threshold: Duration::from_secs(2),
42 }
43 }
44}
45
46#[derive(Clone)]
49pub struct CosmosDBProvider {
50 inner: Arc<CosmosDBProviderInner>,
51}
52
53struct CosmosDBProviderInner {
54 client: CosmosDBClient,
55 orch_leases: Box<dyn LeaseProvider>,
56 worker_leases: Box<dyn LeaseProvider>,
57 cancel: CancellationToken,
58 _reconciler_handle: Option<tokio::task::JoinHandle<()>>,
59 outbox_fault_injector: Option<OutboxFaultInjector>,
60}
61
62impl CosmosDBProvider {
63 pub async fn new(endpoint: &str, key: &str, database: &str) -> Result<Self, ProviderError> {
65 let config = CosmosDBProviderConfig {
66 endpoint: endpoint.to_string(),
67 key: key.to_string(),
68 database: database.to_string(),
69 ..Default::default()
70 };
71 Self::new_with_config(config).await
72 }
73
74 pub async fn new_with_config(config: CosmosDBProviderConfig) -> Result<Self, ProviderError> {
76 let client = CosmosDBClient::new(
77 &config.endpoint,
78 &config.key,
79 &config.database,
80 &config.container,
81 )
82 .map_err(|e| ProviderError::permanent("new", e))?;
83
84 containers::ensure_infrastructure(&client).await?;
86
87 let cancel = CancellationToken::new();
88
89 let reconciler_handle = outbox::start_reconciler(
91 client.clone(),
92 config.reconciler_interval,
93 config.reconciler_age_threshold,
94 cancel.clone(),
95 );
96
97 let orch_leases = Box::new(InMemoryLeaseProvider::new(config.orch_concurrency));
98 let worker_leases = Box::new(InMemoryLeaseProvider::new(config.worker_concurrency));
99
100 Ok(Self {
101 inner: Arc::new(CosmosDBProviderInner {
102 client,
103 orch_leases,
104 worker_leases,
105 cancel,
106 _reconciler_handle: Some(reconciler_handle),
107 outbox_fault_injector: None,
108 }),
109 })
110 }
111
112 pub async fn new_with_container(
114 endpoint: &str,
115 key: &str,
116 database: &str,
117 container: &str,
118 ) -> Result<Self, ProviderError> {
119 let config = CosmosDBProviderConfig {
120 endpoint: endpoint.to_string(),
121 key: key.to_string(),
122 database: database.to_string(),
123 container: container.to_string(),
124 ..Default::default()
125 };
126 Self::new_with_config(config).await
127 }
128
129 fn client(&self) -> &CosmosDBClient {
130 &self.inner.client
131 }
132
133 pub fn set_outbox_fault_injector(&mut self, injector: OutboxFaultInjector) {
137 let inner = Arc::get_mut(&mut self.inner)
140 .expect("Cannot set fault injector after provider has been cloned");
141 inner.outbox_fault_injector = Some(injector);
142 }
143
144 pub async fn cleanup(&self) -> Result<(), ProviderError> {
146 self.inner.cancel.cancel();
147 self.client().delete_container().await
148 }
149
150 async fn load_kv_store_documents(
151 &self,
152 instance_id: &str,
153 ) -> Result<Vec<KeyValueDocument>, ProviderError> {
154 let docs =
155 query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_KV).await?;
156 docs.into_iter()
157 .map(|doc| {
158 serde_json::from_value(doc).map_err(|e| {
159 ProviderError::permanent(
160 "load_kv_store_documents",
161 format!("Deserialize error: {e}"),
162 )
163 })
164 })
165 .collect()
166 }
167
168 async fn load_kv_delta_documents(
169 &self,
170 instance_id: &str,
171 ) -> Result<Vec<KeyValueDeltaDocument>, ProviderError> {
172 let docs = query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_KV_DELTA)
173 .await?;
174 docs.into_iter()
175 .map(|doc| {
176 serde_json::from_value(doc).map_err(|e| {
177 ProviderError::permanent(
178 "load_kv_delta_documents",
179 format!("Deserialize error: {e}"),
180 )
181 })
182 })
183 .collect()
184 }
185
186 fn apply_kv_history_delta(
187 instance_id: &str,
188 execution_id: u64,
189 now: u64,
190 history_delta: &[Event],
191 store_docs: &[KeyValueDocument],
192 existing_delta_docs: &[KeyValueDeltaDocument],
193 ) -> std::collections::HashMap<String, KeyValueDeltaDocument> {
194 let mut delta_docs: std::collections::HashMap<String, KeyValueDeltaDocument> =
195 existing_delta_docs
196 .iter()
197 .cloned()
198 .map(|doc| (doc.key.clone(), doc))
199 .collect();
200 let store_keys: Vec<String> = store_docs.iter().map(|doc| doc.key.clone()).collect();
201
202 for event in history_delta {
203 match &event.kind {
204 EventKind::KeyValueSet {
205 key,
206 value,
207 last_updated_at_ms,
208 } => {
209 delta_docs.insert(
210 key.clone(),
211 KeyValueDeltaDocument::new(
212 instance_id,
213 key,
214 Some(value.clone()),
215 execution_id,
216 *last_updated_at_ms,
217 ),
218 );
219 }
220 EventKind::KeyValueCleared { key } => {
221 delta_docs.insert(
222 key.clone(),
223 KeyValueDeltaDocument::new(instance_id, key, None, execution_id, now),
224 );
225 }
226 EventKind::KeyValuesCleared => {
227 for doc in delta_docs.values_mut() {
228 doc.value = None;
229 doc.execution_id = execution_id;
230 doc.last_updated_at_ms = now;
231 }
232 for key in &store_keys {
233 delta_docs.entry(key.clone()).or_insert_with(|| {
234 KeyValueDeltaDocument::new(instance_id, key, None, execution_id, now)
235 });
236 }
237 }
238 _ => {}
239 }
240 }
241
242 delta_docs
243 }
244
245 fn kv_delta_changed_keys(
246 history_delta: &[Event],
247 store_docs: &[KeyValueDocument],
248 existing_delta_docs: &[KeyValueDeltaDocument],
249 ) -> (std::collections::HashSet<String>, bool) {
250 let mut changed_keys = std::collections::HashSet::new();
251 let mut clear_all = false;
252
253 for event in history_delta {
254 match &event.kind {
255 EventKind::KeyValueSet { key, .. } | EventKind::KeyValueCleared { key } => {
256 changed_keys.insert(key.clone());
257 }
258 EventKind::KeyValuesCleared => {
259 clear_all = true;
260 }
261 _ => {}
262 }
263 }
264
265 if clear_all {
266 changed_keys.extend(store_docs.iter().map(|doc| doc.key.clone()));
267 changed_keys.extend(existing_delta_docs.iter().map(|doc| doc.key.clone()));
268 }
269
270 (changed_keys, clear_all)
271 }
272
273 async fn read_instance(
275 &self,
276 instance_id: &str,
277 ) -> Result<Option<InstanceDocument>, ProviderError> {
278 let doc_id = InstanceDocument::doc_id(instance_id);
279 let resp = self.client().read_document(&doc_id, instance_id).await?;
280
281 if errors::is_not_found(resp.status) {
282 return Ok(None);
283 }
284 if !resp.is_success() {
285 return Err(errors::map_cosmosdb_error(
286 "read_instance",
287 resp.status,
288 &resp.body,
289 ));
290 }
291
292 let mut inst: InstanceDocument = serde_json::from_str(&resp.body).map_err(|e| {
293 ProviderError::permanent("read_instance", format!("Deserialize error: {e}"))
294 })?;
295 inst.etag = resp.etag;
296 Ok(Some(inst))
297 }
298
299 async fn try_lock_instance(
303 &self,
304 instance_id: &str,
305 lock_timeout: Duration,
306 now: u64,
307 work_item_json: Option<&str>,
308 ) -> Result<Option<(InstanceDocument, String)>, ProviderError> {
309 let inst = match self.read_instance(instance_id).await? {
310 Some(i) => i,
311 None => {
312 let (orch_name, orch_version) = if let Some(json) = work_item_json {
314 match serde_json::from_str::<WorkItem>(json) {
315 Ok(WorkItem::StartOrchestration {
316 orchestration,
317 version,
318 ..
319 }) => (orchestration, version.unwrap_or_default()),
320 _ => (String::new(), String::new()),
321 }
322 } else {
323 (String::new(), String::new())
324 };
325
326 let lock_token = uuid::Uuid::new_v4().to_string();
328 let locked_until = now + lock_timeout.as_millis() as u64;
329
330 let mut new_inst =
331 InstanceDocument::new(instance_id, &orch_name, &orch_version, 1, None, now);
332 new_inst.lock_token = Some(lock_token.clone());
333 new_inst.locked_until = Some(locked_until);
334
335 let doc_json = serde_json::to_value(&new_inst).map_err(|e| {
336 ProviderError::permanent("try_lock_instance", format!("Serialize error: {e}"))
337 })?;
338
339 let resp = self
340 .client()
341 .create_document(instance_id, &doc_json)
342 .await?;
343 if resp.is_success() {
344 new_inst.etag = resp.etag;
345 return Ok(Some((new_inst, lock_token)));
346 } else if errors::is_conflict(resp.status) {
347 match self.read_instance(instance_id).await? {
349 Some(i) => i,
350 None => return Ok(None),
351 }
352 } else {
353 return Err(errors::map_cosmosdb_error(
354 "try_lock_instance",
355 resp.status,
356 &resp.body,
357 ));
358 }
359 }
360 };
361
362 if let Some(locked_until) = inst.locked_until {
364 if locked_until > now {
365 return Ok(None); }
367 }
368
369 let etag = inst.etag.clone().unwrap_or_default();
370 let lock_token = uuid::Uuid::new_v4().to_string();
371 let locked_until = now + lock_timeout.as_millis() as u64;
372
373 let mut updated = inst.clone();
374 updated.lock_token = Some(lock_token.clone());
375 updated.locked_until = Some(locked_until);
376 updated.updated_at = now;
377
378 let doc_json = serde_json::to_value(&updated).map_err(|e| {
379 ProviderError::permanent("try_lock_instance", format!("Serialize error: {e}"))
380 })?;
381
382 let resp = self
383 .client()
384 .replace_document(&updated.id, instance_id, &doc_json, Some(&etag))
385 .await?;
386
387 if resp.is_success() {
388 Ok(Some((updated, lock_token)))
389 } else if errors::is_precondition_failed(resp.status) || errors::is_conflict(resp.status) {
390 Ok(None) } else {
392 Err(errors::map_cosmosdb_error(
393 "try_lock_instance",
394 resp.status,
395 &resp.body,
396 ))
397 }
398 }
399
400 async fn unlock_instance(&self, instance_id: &str) -> Result<(), ProviderError> {
402 let Some(inst) = self.read_instance(instance_id).await? else {
403 return Ok(());
404 };
405
406 let etag = inst.etag.clone().unwrap_or_default();
407 let mut updated = inst;
408 updated.lock_token = None;
409 updated.locked_until = None;
410
411 let doc_json = serde_json::to_value(&updated).map_err(|e| {
412 ProviderError::permanent("unlock_instance", format!("Serialize error: {e}"))
413 })?;
414
415 let resp = self
416 .client()
417 .replace_document(&updated.id, instance_id, &doc_json, Some(&etag))
418 .await?;
419
420 if resp.is_success() || errors::is_precondition_failed(resp.status) {
421 Ok(())
422 } else {
423 Err(errors::map_cosmosdb_error(
424 "unlock_instance",
425 resp.status,
426 &resp.body,
427 ))
428 }
429 }
430}
431
432impl Drop for CosmosDBProviderInner {
433 fn drop(&mut self) {
434 self.cancel.cancel();
435 }
436}
437
438#[async_trait::async_trait]
443impl Provider for CosmosDBProvider {
444 fn name(&self) -> &str {
445 "duroxide-cdb"
446 }
447
448 fn version(&self) -> &str {
449 env!("CARGO_PKG_VERSION")
450 }
451
452 async fn fetch_orchestration_item(
455 &self,
456 lock_timeout: Duration,
457 _poll_timeout: Duration,
458 filter: Option<&DispatcherCapabilityFilter>,
459 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
460 let caller_id = task_id_u64();
461 let my_slots = self.inner.orch_leases.acquire_slots(caller_id).await;
462 let now = now_ms();
463
464 let (min_packed, max_packed) = if let Some(f) = filter {
466 if f.supported_duroxide_versions.is_empty() {
467 return Ok(None);
468 }
469 let min = f
470 .supported_duroxide_versions
471 .iter()
472 .map(|r| pack_semver(&r.min))
473 .min()
474 .unwrap();
475 let max = f
476 .supported_duroxide_versions
477 .iter()
478 .map(|r| pack_semver(&r.max))
479 .max()
480 .unwrap();
481 (Some(min), Some(max))
482 } else {
483 (None, None)
484 };
485
486 let mut excluded = Vec::new();
487
488 for _attempt in 0..MAX_LOCK_RETRIES {
489 let candidate = query::find_candidate_orch_item(
491 self.client(),
492 now,
493 &my_slots,
494 None, None,
496 &excluded,
497 )
498 .await?;
499
500 let Some(candidate) = candidate else {
501 return Ok(None);
502 };
503
504 let instance_id = &candidate.instance_id;
505
506 let lock_result = self
508 .try_lock_instance(instance_id, lock_timeout, now, Some(&candidate.work_item))
509 .await?;
510
511 let Some((locked_instance, lock_token)) = lock_result else {
512 excluded.push(instance_id.to_string());
513 continue;
514 };
515
516 if let (Some(min_v), Some(max_v)) = (min_packed, max_packed) {
518 if let Some(pinned) = locked_instance.pinned_duroxide_version_packed {
519 if pinned < min_v || pinned > max_v {
520 self.unlock_instance(instance_id).await?;
522 excluded.push(instance_id.to_string());
523 continue;
524 }
525 }
526 }
528
529 let messages = query::collect_orch_messages(self.client(), instance_id, now).await?;
531
532 if messages.is_empty() {
533 self.unlock_instance(instance_id).await?;
535 excluded.push(instance_id.to_string());
536 continue;
537 }
538
539 let max_attempt = messages.iter().map(|m| m.attempt_count).max().unwrap_or(0);
541 for msg in &messages {
542 let mut updated_msg = msg.clone();
543 updated_msg.lock_token = Some(lock_token.clone());
544 updated_msg.locked_until = Some(now + lock_timeout.as_millis() as u64);
545 updated_msg.attempt_count += 1;
546
547 let doc_json = serde_json::to_value(&updated_msg).map_err(|e| {
548 ProviderError::permanent(
549 "fetch_orchestration_item",
550 format!("Serialize error: {e}"),
551 )
552 })?;
553
554 let _ = self
555 .client()
556 .replace_document(&msg.id, instance_id, &doc_json, msg.etag.as_deref())
557 .await?;
558 }
559
560 let attempt_count = (max_attempt + 1) as u32;
561
562 let work_items: Vec<WorkItem> = messages
564 .iter()
565 .map(|m| {
566 serde_json::from_str(&m.work_item).map_err(|e| {
567 ProviderError::permanent(
568 "fetch_orchestration_item",
569 format!("Failed to deserialize work item: {e}"),
570 )
571 })
572 })
573 .collect::<Result<Vec<_>, _>>()?;
574
575 let execution_id = locked_instance.current_execution_id;
577 let history_docs =
578 query::fetch_history(self.client(), instance_id, execution_id).await?;
579
580 let (history, history_error) = {
581 let mut events = Vec::new();
582 let mut error = None;
583 for doc in &history_docs {
584 match serde_json::from_str::<Event>(&doc.event_data) {
585 Ok(event) => events.push(event),
586 Err(e) => {
587 error = Some(format!(
588 "Failed to deserialize history event {}: {e}",
589 doc.event_id
590 ));
591 events.clear();
592 break;
593 }
594 }
595 }
596 (events, error)
597 };
598
599 let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = self
602 .load_kv_store_documents(instance_id)
603 .await?
604 .into_iter()
605 .map(|doc| {
606 (
607 doc.key,
608 duroxide::providers::KvEntry {
609 value: doc.value,
610 last_updated_at_ms: doc.last_updated_at_ms,
611 },
612 )
613 })
614 .collect();
615
616 if locked_instance.orchestration_name.is_empty()
624 && history.is_empty()
625 && work_items
626 .iter()
627 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
628 {
629 let message_count = work_items.len();
630 tracing::warn!(
631 target = "duroxide::providers::cosmosdb",
632 instance = %instance_id,
633 message_count,
634 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
635 );
636 self.ack_orchestration_item(
637 &lock_token,
638 execution_id,
639 vec![],
640 vec![],
641 vec![],
642 ExecutionMetadata::default(),
643 vec![],
644 )
645 .await?;
646 return Ok(None);
647 }
648
649 let item = OrchestrationItem {
650 instance: instance_id.to_string(),
651 orchestration_name: locked_instance.orchestration_name.clone(),
652 execution_id,
653 version: locked_instance.orchestration_version.clone(),
654 history,
655 messages: work_items,
656 history_error,
657 kv_snapshot,
658 };
659
660 return Ok(Some((item, lock_token, attempt_count)));
661 }
662
663 Ok(None)
665 }
666
667 async fn ack_orchestration_item(
670 &self,
671 lock_token: &str,
672 execution_id: u64,
673 history_delta: Vec<Event>,
674 worker_items: Vec<WorkItem>,
675 orchestrator_items: Vec<WorkItem>,
676 metadata: ExecutionMetadata,
677 cancelled_activities: Vec<ScheduledActivityIdentifier>,
678 ) -> Result<(), ProviderError> {
679 let instance = query::find_instance_by_lock_token(self.client(), lock_token)
681 .await?
682 .ok_or_else(|| {
683 ProviderError::permanent(
684 "ack_orchestration_item",
685 format!("Invalid lock token or lock expired: {lock_token}"),
686 )
687 })?;
688
689 let instance_id = &instance.instance_id;
690 let now = now_ms();
691
692 if let Some(locked_until) = instance.locked_until {
694 if locked_until <= now {
695 return Err(ProviderError::permanent(
696 "ack_orchestration_item",
697 format!("Lock has expired for instance {instance_id}"),
698 ));
699 }
700 }
701
702 let mut same_partition_worker = Vec::new();
704 let mut same_partition_orch = Vec::new();
705 let mut cross_partition_intents = Vec::new();
706
707 let cancelled_set: std::collections::HashSet<(String, u64, u64)> = cancelled_activities
709 .iter()
710 .map(|c| (c.instance.clone(), c.execution_id, c.activity_id))
711 .collect();
712
713 for (seq, item) in worker_items.iter().enumerate() {
715 let is_cancelled = match item {
717 WorkItem::ActivityExecute {
718 instance,
719 execution_id,
720 id,
721 ..
722 } => cancelled_set.contains(&(instance.clone(), *execution_id, *id)),
723 _ => false,
724 };
725 if is_cancelled {
726 continue; }
728
729 let target_instance = work_item_instance(item);
730 let item_json = serde_json::to_string(item).map_err(|e| {
731 ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
732 })?;
733
734 if target_instance == instance_id {
735 let (exec_id, activity_id, session_id, tag) = match item {
737 WorkItem::ActivityExecute {
738 execution_id,
739 id,
740 session_id,
741 tag,
742 ..
743 } => (
744 Some(*execution_id),
745 Some(*id),
746 session_id.clone(),
747 tag.clone(),
748 ),
749 _ => (None, None, None, None),
750 };
751 let doc = QueueItemDocument::new_worker_queue(
752 instance_id,
753 item_json,
754 exec_id,
755 activity_id,
756 session_id,
757 tag,
758 now,
759 );
760 same_partition_worker.push(serde_json::to_value(&doc).unwrap());
761 } else {
762 let (exec_id, activity_id, session_id, tag) = match item {
764 WorkItem::ActivityExecute {
765 execution_id,
766 id,
767 session_id,
768 tag,
769 ..
770 } => (
771 Some(*execution_id),
772 Some(*id),
773 session_id.clone(),
774 tag.clone(),
775 ),
776 _ => (None, None, None, None),
777 };
778 let target_doc = QueueItemDocument::new_worker_queue(
779 target_instance,
780 item_json,
781 exec_id,
782 activity_id,
783 session_id,
784 tag,
785 now,
786 );
787 let target_json = serde_json::to_string(&target_doc).unwrap();
788 let idem_key = idempotency_key(instance_id, execution_id, seq as u64);
789 let intent = OutboxIntentDocument::new(
790 instance_id,
791 target_instance,
792 DOC_TYPE_WORKER_QUEUE,
793 target_json,
794 idem_key,
795 now,
796 );
797 cross_partition_intents.push(intent);
798 }
799 }
800
801 for (seq, item) in orchestrator_items.iter().enumerate() {
803 let target_instance = work_item_instance(item);
804 let item_json = serde_json::to_string(item).map_err(|e| {
805 ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
806 })?;
807
808 let delay = match item {
809 WorkItem::TimerFired { fire_at_ms, .. } => {
810 let fire_at = *fire_at_ms;
811 if fire_at > now {
812 fire_at
813 } else {
814 now
815 }
816 }
817 _ => now,
818 };
819
820 if target_instance == instance_id {
821 let doc = QueueItemDocument::new_orch_queue(instance_id, item_json, delay, now);
822 same_partition_orch.push(serde_json::to_value(&doc).unwrap());
823 } else {
824 let target_doc =
825 QueueItemDocument::new_orch_queue(target_instance, item_json, delay, now);
826 let target_json = serde_json::to_string(&target_doc).unwrap();
827 let idem_key =
828 idempotency_key(instance_id, execution_id, (worker_items.len() + seq) as u64);
829 let intent = OutboxIntentDocument::new(
830 instance_id,
831 target_instance,
832 DOC_TYPE_ORCH_QUEUE,
833 target_json,
834 idem_key,
835 now,
836 );
837 cross_partition_intents.push(intent);
838 }
839 }
840
841 let locked_messages =
843 query::find_items_by_lock_token(self.client(), lock_token, DOC_TYPE_ORCH_QUEUE).await?;
844 let messages_to_delete: Vec<String> =
845 locked_messages.iter().map(|m| m.id.clone()).collect();
846
847 let mut cancelled_doc_ids = Vec::new();
849 for cancelled in &cancelled_activities {
850 let sql = format!(
852 "SELECT c.id FROM c WHERE c.instanceId = @instanceId AND c.type = '{}' \
853 AND c.executionId = @execId AND c.activityId = @activityId",
854 DOC_TYPE_WORKER_QUEUE
855 );
856 let params = vec![
857 crate::client::QueryParameter::new(
858 "@instanceId",
859 serde_json::json!(&cancelled.instance),
860 ),
861 crate::client::QueryParameter::new(
862 "@execId",
863 serde_json::json!(cancelled.execution_id),
864 ),
865 crate::client::QueryParameter::new(
866 "@activityId",
867 serde_json::json!(cancelled.activity_id),
868 ),
869 ];
870 let results = self
871 .client()
872 .query(&sql, params, Some(&cancelled.instance))
873 .await?;
874 for doc in results {
875 if let Some(id) = doc.get("id").and_then(|v| v.as_str()) {
876 cancelled_doc_ids.push(id.to_string());
877 }
878 }
879 }
880
881 let history_entries: Vec<(u64, String)> = history_delta
883 .iter()
884 .map(|event| {
885 let event_id = event.event_id;
886 let event_json = serde_json::to_string(event).unwrap();
887 (event_id, event_json)
888 })
889 .collect();
890
891 let mut updated_instance = instance.clone();
893 updated_instance.current_execution_id = execution_id;
894 updated_instance.updated_at = now;
895 updated_instance.lock_token = None;
896 updated_instance.locked_until = None;
897
898 if let Some(status) = &metadata.status {
899 updated_instance.status = status.clone();
900 }
901 if let Some(output) = &metadata.output {
902 updated_instance.output = Some(output.clone());
903 }
904 if let Some(name) = &metadata.orchestration_name {
905 updated_instance.orchestration_name = name.clone();
906 }
907 if let Some(version) = &metadata.orchestration_version {
908 updated_instance.orchestration_version = version.clone();
909 }
910 if let Some(parent) = &metadata.parent_instance_id {
911 updated_instance.parent_instance_id = Some(parent.clone());
912 }
913 if let Some(pinned) = &metadata.pinned_duroxide_version {
914 updated_instance.pinned_duroxide_version_packed = Some(pack_semver(pinned));
915 }
916
917 let custom_status_from_delta = history_delta.iter().rev().find_map(|e| match &e.kind {
920 EventKind::CustomStatusUpdated { status } => Some(status.clone()),
921 _ => None,
922 });
923
924 match custom_status_from_delta {
925 Some(Some(custom_status)) => {
926 updated_instance.custom_status = Some(custom_status);
927 updated_instance.custom_status_version += 1;
928 }
929 Some(None) => {
930 updated_instance.custom_status = None;
931 updated_instance.custom_status_version += 1;
932 }
933 None => {
934 }
936 }
937
938 let instance_json = serde_json::to_value(&updated_instance).map_err(|e| {
939 ProviderError::permanent("ack_orchestration_item", format!("Serialize error: {e}"))
940 })?;
941
942 let outbox_json: Vec<serde_json::Value> = cross_partition_intents
944 .iter()
945 .map(|intent| serde_json::to_value(intent).unwrap())
946 .collect();
947
948 let existing_store_docs = self.load_kv_store_documents(instance_id).await?;
949 let existing_delta_docs = self.load_kv_delta_documents(instance_id).await?;
950 let delta_state = Self::apply_kv_history_delta(
951 instance_id,
952 execution_id,
953 now,
954 &history_delta,
955 &existing_store_docs,
956 &existing_delta_docs,
957 );
958 let (changed_delta_keys, clear_all) =
959 Self::kv_delta_changed_keys(&history_delta, &existing_store_docs, &existing_delta_docs);
960 let is_terminal = metadata
961 .status
962 .as_deref()
963 .is_some_and(|status| matches!(status, "Completed" | "ContinuedAsNew" | "Failed"));
964
965 let existing_store_keys: std::collections::HashSet<String> = existing_store_docs
967 .iter()
968 .map(|doc| doc.key.clone())
969 .collect();
970 let mut kv_ops: Vec<BatchOperation> = Vec::new();
971
972 if is_terminal {
973 let mut delta_docs: Vec<KeyValueDeltaDocument> = delta_state.into_values().collect();
974 delta_docs.sort_by(|a, b| a.key.cmp(&b.key));
975
976 for delta_doc in delta_docs {
977 match delta_doc.value.as_deref() {
978 Some(value) => {
979 let store_doc = KeyValueDocument::new(
980 instance_id,
981 &delta_doc.key,
982 value,
983 delta_doc.execution_id,
984 delta_doc.last_updated_at_ms,
985 );
986 let json = serde_json::to_value(&store_doc).unwrap();
987 kv_ops.push(BatchOperation::Upsert { body: json });
988 }
989 None => {
990 if existing_store_keys.contains(&delta_doc.key) {
991 kv_ops.push(BatchOperation::Delete {
992 id: KeyValueDocument::doc_id(instance_id, &delta_doc.key),
993 });
994 }
995 }
996 }
997 }
998
999 for delta_doc in &existing_delta_docs {
1000 kv_ops.push(BatchOperation::Delete {
1001 id: delta_doc.id.clone(),
1002 });
1003 }
1004 } else {
1005 let mut keys_to_upsert: Vec<String> = if clear_all {
1006 delta_state.keys().cloned().collect()
1007 } else {
1008 changed_delta_keys.into_iter().collect()
1009 };
1010 keys_to_upsert.sort();
1011 keys_to_upsert.dedup();
1012
1013 for key in keys_to_upsert {
1014 if let Some(delta_doc) = delta_state.get(&key) {
1015 let json = serde_json::to_value(delta_doc).unwrap();
1016 kv_ops.push(BatchOperation::Upsert { body: json });
1017 }
1018 }
1019 }
1020
1021 let ops = batch::build_ack_batch(
1027 instance_id,
1028 execution_id,
1029 lock_token,
1030 &messages_to_delete,
1031 &history_entries,
1032 same_partition_worker,
1033 same_partition_orch,
1034 outbox_json,
1035 kv_ops,
1036 &[], instance_json,
1038 );
1039
1040 batch::execute_batch(self.client(), instance_id, ops).await?;
1041
1042 for doc_id in &cancelled_doc_ids {
1046 let _ = self.client().delete_document(doc_id, instance_id).await;
1047 }
1048
1049 outbox::deliver_intents_best_effort(
1051 self.client(),
1052 &cross_partition_intents,
1053 self.inner.outbox_fault_injector.as_ref(),
1054 )
1055 .await;
1056
1057 Ok(())
1058 }
1059
1060 async fn abandon_orchestration_item(
1063 &self,
1064 lock_token: &str,
1065 delay: Option<Duration>,
1066 ignore_attempt: bool,
1067 ) -> Result<(), ProviderError> {
1068 let now = now_ms();
1069
1070 let instance = query::find_instance_by_lock_token(self.client(), lock_token).await?;
1072 let inst = instance.ok_or_else(|| {
1073 ProviderError::permanent("abandon_orchestration_item", "Invalid lock token")
1074 })?;
1075
1076 let mut updated = inst.clone();
1077 updated.lock_token = None;
1078 updated.locked_until = None;
1079 updated.updated_at = now;
1080 let doc_json = serde_json::to_value(&updated).unwrap();
1081 let _ = self
1082 .client()
1083 .replace_document(
1084 &updated.id,
1085 &inst.instance_id,
1086 &doc_json,
1087 inst.etag.as_deref(),
1088 )
1089 .await;
1090
1091 let messages =
1093 query::find_items_by_lock_token(self.client(), lock_token, DOC_TYPE_ORCH_QUEUE).await?;
1094 for msg in &messages {
1095 let mut updated = msg.clone();
1096 updated.lock_token = None;
1097 updated.locked_until = None;
1098 if let Some(d) = delay {
1099 updated.visible_at = now + d.as_millis() as u64;
1100 }
1101 if ignore_attempt && updated.attempt_count > 0 {
1102 updated.attempt_count -= 1;
1103 }
1104
1105 let doc_json = serde_json::to_value(&updated).unwrap();
1106 let _ = self
1107 .client()
1108 .replace_document(&msg.id, &msg.instance_id, &doc_json, msg.etag.as_deref())
1109 .await;
1110 }
1111
1112 Ok(())
1113 }
1114
1115 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1118 let inst = self.read_instance(instance).await?;
1119 let execution_id = inst.map(|i| i.current_execution_id).unwrap_or(1);
1120 self.read_with_execution(instance, execution_id).await
1121 }
1122
1123 async fn read_with_execution(
1124 &self,
1125 instance: &str,
1126 execution_id: u64,
1127 ) -> Result<Vec<Event>, ProviderError> {
1128 let docs = query::fetch_history(self.client(), instance, execution_id).await?;
1129 let mut events = Vec::new();
1130 for doc in docs {
1131 let event: Event = serde_json::from_str(&doc.event_data).map_err(|e| {
1132 ProviderError::permanent(
1133 "read_with_execution",
1134 format!("Failed to deserialize event: {e}"),
1135 )
1136 })?;
1137 events.push(event);
1138 }
1139 Ok(events)
1140 }
1141
1142 async fn append_with_execution(
1143 &self,
1144 instance: &str,
1145 execution_id: u64,
1146 new_events: Vec<Event>,
1147 ) -> Result<(), ProviderError> {
1148 let existing = query::fetch_history(self.client(), instance, execution_id).await?;
1150 let next_id = existing
1151 .iter()
1152 .map(|h| h.event_id)
1153 .max()
1154 .map(|m| m + 1)
1155 .unwrap_or(0);
1156
1157 for (i, event) in new_events.iter().enumerate() {
1158 let event_id = next_id + i as u64;
1159 let event_json = serde_json::to_string(event).map_err(|e| {
1160 ProviderError::permanent("append_with_execution", format!("Serialize error: {e}"))
1161 })?;
1162 let doc = HistoryDocument::new(instance, execution_id, event_id, event_json);
1163 let doc_json = serde_json::to_value(&doc).unwrap();
1164
1165 let resp = self.client().create_document(instance, &doc_json).await?;
1166 if !resp.is_success() && !errors::is_conflict(resp.status) {
1167 return Err(errors::map_cosmosdb_error(
1168 "append_with_execution",
1169 resp.status,
1170 &resp.body,
1171 ));
1172 }
1173 }
1174
1175 Ok(())
1176 }
1177
1178 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
1181 let instance_id = work_item_instance(&item).to_string();
1182 let item_json = serde_json::to_string(&item).map_err(|e| {
1183 ProviderError::permanent("enqueue_for_worker", format!("Serialize error: {e}"))
1184 })?;
1185
1186 let (exec_id, activity_id, session_id, tag) = match &item {
1187 WorkItem::ActivityExecute {
1188 execution_id,
1189 id,
1190 session_id,
1191 tag,
1192 ..
1193 } => (
1194 Some(*execution_id),
1195 Some(*id),
1196 session_id.clone(),
1197 tag.clone(),
1198 ),
1199 _ => (None, None, None, None),
1200 };
1201
1202 let now = now_ms();
1203 let doc = QueueItemDocument::new_worker_queue(
1204 &instance_id,
1205 item_json,
1206 exec_id,
1207 activity_id,
1208 session_id,
1209 tag,
1210 now,
1211 );
1212 let doc_json = serde_json::to_value(&doc).unwrap();
1213
1214 let resp = self
1215 .client()
1216 .create_document(&instance_id, &doc_json)
1217 .await?;
1218 if !resp.is_success() {
1219 return Err(errors::map_cosmosdb_error(
1220 "enqueue_for_worker",
1221 resp.status,
1222 &resp.body,
1223 ));
1224 }
1225
1226 Ok(())
1227 }
1228
1229 async fn fetch_work_item(
1230 &self,
1231 lock_timeout: Duration,
1232 _poll_timeout: Duration,
1233 session: Option<&SessionFetchConfig>,
1234 tag_filter: &TagFilter,
1235 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
1236 if matches!(tag_filter, TagFilter::None) {
1238 return Ok(None);
1239 }
1240
1241 let caller_id = task_id_u64();
1242 let my_slots = self.inner.worker_leases.acquire_slots(caller_id).await;
1243 let now = now_ms();
1244
1245 let mut excluded = Vec::new();
1246
1247 for _attempt in 0..10 {
1248 let candidate = query::find_candidate_work_item(
1249 self.client(),
1250 now,
1251 &my_slots,
1252 session.map(|s| s.owner_id.as_str()),
1253 &excluded,
1254 tag_filter,
1255 )
1256 .await?;
1257
1258 let Some(candidate) = candidate else {
1259 return Ok(None);
1260 };
1261
1262 if let Some(ref sid) = candidate.session_id {
1264 if let Some(config) = session {
1265 let session_doc_id = SessionDocument::doc_id(&candidate.instance_id, sid);
1267 if let Ok(resp) = self
1268 .client()
1269 .read_document(&session_doc_id, &candidate.instance_id)
1270 .await
1271 {
1272 if resp.is_success() {
1273 if let Ok(session_doc) =
1274 serde_json::from_str::<SessionDocument>(&resp.body)
1275 {
1276 if session_doc.locked_until > now
1278 && session_doc.owner_id != config.owner_id
1279 {
1280 excluded.push(candidate.id.clone());
1282 continue;
1283 }
1284 }
1285 }
1286 }
1287 } else {
1288 excluded.push(candidate.id.clone());
1290 continue;
1291 }
1292 }
1293
1294 let etag = candidate.etag.clone().unwrap_or_default();
1296 let lock_token = uuid::Uuid::new_v4().to_string();
1297
1298 let mut updated = candidate.clone();
1299 updated.lock_token = Some(lock_token.clone());
1300 updated.locked_until = Some(now + lock_timeout.as_millis() as u64);
1301 updated.attempt_count += 1;
1302
1303 let doc_json = serde_json::to_value(&updated).map_err(|e| {
1304 ProviderError::permanent("fetch_work_item", format!("Serialize error: {e}"))
1305 })?;
1306
1307 let resp = self
1308 .client()
1309 .replace_document(
1310 &candidate.id,
1311 &candidate.instance_id,
1312 &doc_json,
1313 Some(&etag),
1314 )
1315 .await?;
1316
1317 if resp.is_success() {
1318 if let (Some(ref sid), Some(config)) = (&candidate.session_id, session) {
1320 let session_now = now_ms();
1323 let session_locked_until = session_now + config.lock_timeout.as_millis() as u64;
1324 let session_doc_id = SessionDocument::doc_id(&candidate.instance_id, sid);
1325
1326 let existing_session = self
1328 .client()
1329 .read_document(&session_doc_id, &candidate.instance_id)
1330 .await;
1331
1332 let session_claimed = match existing_session {
1333 Ok(resp) if resp.is_success() => {
1334 match serde_json::from_str::<SessionDocument>(&resp.body) {
1336 Ok(mut existing) => {
1337 if existing.locked_until <= session_now
1338 || existing.owner_id == config.owner_id
1339 {
1340 existing.owner_id = config.owner_id.clone();
1342 existing.locked_until = session_locked_until;
1343 existing.last_activity = session_now;
1344 let session_json = serde_json::to_value(&existing).unwrap();
1345 let update_resp = self
1346 .client()
1347 .replace_document(
1348 &session_doc_id,
1349 &candidate.instance_id,
1350 &session_json,
1351 resp.etag.as_deref(),
1352 )
1353 .await;
1354 update_resp.map(|r| r.is_success()).unwrap_or(false)
1355 } else {
1356 false
1358 }
1359 }
1360 Err(_) => false,
1361 }
1362 }
1363 _ => {
1364 let new_session = SessionDocument {
1366 id: session_doc_id.clone(),
1367 instance_id: candidate.instance_id.clone(),
1368 doc_type: DOC_TYPE_SESSION.to_string(),
1369 session_id: sid.clone(),
1370 owner_id: config.owner_id.clone(),
1371 locked_until: session_locked_until,
1372 last_activity: session_now,
1373 created_at: session_now,
1374 etag: None,
1375 rid: None,
1376 self_link: None,
1377 ts: None,
1378 attachments: None,
1379 };
1380 let session_json = serde_json::to_value(&new_session).unwrap();
1381 let create_resp = self
1382 .client()
1383 .create_document(&candidate.instance_id, &session_json)
1384 .await;
1385 match create_resp {
1386 Ok(r) => r.is_success(),
1387 Err(_) => false,
1388 }
1389 }
1390 };
1391
1392 if !session_claimed {
1393 let mut rollback = updated.clone();
1395 rollback.lock_token = None;
1396 rollback.locked_until = None;
1397 rollback.attempt_count -= 1;
1398 let rollback_json = serde_json::to_value(&rollback).unwrap();
1399 let _ = self
1400 .client()
1401 .replace_document(
1402 &candidate.id,
1403 &candidate.instance_id,
1404 &rollback_json,
1405 None, )
1407 .await;
1408 excluded.push(candidate.id.clone());
1409 continue;
1410 }
1411 }
1412
1413 let work_item: WorkItem =
1415 serde_json::from_str(&candidate.work_item).map_err(|e| {
1416 ProviderError::permanent(
1417 "fetch_work_item",
1418 format!("Failed to deserialize work item: {e}"),
1419 )
1420 })?;
1421
1422 return Ok(Some((work_item, lock_token, updated.attempt_count as u32)));
1423 } else if errors::is_precondition_failed(resp.status)
1424 || errors::is_conflict(resp.status)
1425 {
1426 excluded.push(candidate.id.clone());
1427 continue;
1428 } else {
1429 return Err(errors::map_cosmosdb_error(
1430 "fetch_work_item",
1431 resp.status,
1432 &resp.body,
1433 ));
1434 }
1435 }
1436
1437 Ok(None)
1438 }
1439
1440 async fn ack_work_item(
1441 &self,
1442 token: &str,
1443 completion: Option<WorkItem>,
1444 ) -> Result<(), ProviderError> {
1445 let now = now_ms();
1446
1447 let items =
1449 query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1450
1451 let item = items.first().ok_or_else(|| {
1452 ProviderError::permanent(
1453 "ack_work_item",
1454 "Activity was cancelled or lock expired (worker queue row not found or lock invalid)",
1455 )
1456 })?;
1457
1458 if let Some(locked_until) = item.locked_until {
1460 if locked_until <= now {
1461 return Err(ProviderError::permanent(
1462 "ack_work_item",
1463 "Activity was cancelled or lock expired (worker queue row not found or lock invalid)",
1464 ));
1465 }
1466 }
1467
1468 let instance_id = &item.instance_id;
1469 let session_id = item.session_id.clone();
1470
1471 if let Some(completion_item) = completion {
1472 let target_instance = work_item_instance(&completion_item).to_string();
1474 let item_json = serde_json::to_string(&completion_item).map_err(|e| {
1475 ProviderError::permanent("ack_work_item", format!("Serialize error: {e}"))
1476 })?;
1477
1478 if target_instance == *instance_id {
1479 let orch_doc =
1481 QueueItemDocument::new_orch_queue(&target_instance, item_json, now, now);
1482 let orch_json = serde_json::to_value(&orch_doc).unwrap();
1483
1484 let ops = vec![
1485 BatchOperation::Delete {
1486 id: item.id.clone(),
1487 },
1488 BatchOperation::Create { body: orch_json },
1489 ];
1490 batch::execute_batch(self.client(), instance_id, ops).await?;
1491 } else {
1492 let _ = self.client().delete_document(&item.id, instance_id).await?;
1494
1495 let orch_doc =
1496 QueueItemDocument::new_orch_queue(&target_instance, item_json, now, now);
1497 let orch_json = serde_json::to_value(&orch_doc).unwrap();
1498
1499 let resp = self
1500 .client()
1501 .create_document(&target_instance, &orch_json)
1502 .await?;
1503 if !resp.is_success() {
1504 return Err(errors::map_cosmosdb_error(
1505 "ack_work_item",
1506 resp.status,
1507 &resp.body,
1508 ));
1509 }
1510 }
1511 } else {
1512 let _ = self.client().delete_document(&item.id, instance_id).await?;
1514 }
1515
1516 if let Some(ref sid) = session_id {
1518 let session_doc_id = SessionDocument::doc_id(instance_id, sid);
1519 let piggyback_now = now_ms(); if let Ok(resp) = self
1521 .client()
1522 .read_document(&session_doc_id, instance_id)
1523 .await
1524 {
1525 if resp.is_success() {
1526 if let Ok(mut session_doc) = serde_json::from_str::<SessionDocument>(&resp.body)
1527 {
1528 if session_doc.locked_until > piggyback_now {
1529 session_doc.last_activity = piggyback_now;
1530 session_doc.etag = resp.etag;
1531 let doc_json = serde_json::to_value(&session_doc).unwrap();
1532 let _ = self
1533 .client()
1534 .replace_document(
1535 &session_doc_id,
1536 instance_id,
1537 &doc_json,
1538 session_doc.etag.as_deref(),
1539 )
1540 .await;
1541 }
1542 }
1543 }
1544 }
1545 }
1546
1547 Ok(())
1548 }
1549
1550 async fn renew_work_item_lock(
1551 &self,
1552 token: &str,
1553 extend_for: Duration,
1554 ) -> Result<(), ProviderError> {
1555 let now = now_ms();
1556 let items =
1557 query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1558
1559 let item = items.first().ok_or_else(|| {
1560 ProviderError::permanent(
1561 "renew_work_item_lock",
1562 format!("No worker item found with lock token {token}"),
1563 )
1564 })?;
1565
1566 if let Some(locked_until) = item.locked_until {
1568 if locked_until <= now {
1569 return Err(ProviderError::permanent(
1570 "renew_work_item_lock",
1571 "Lock has expired".to_string(),
1572 ));
1573 }
1574 }
1575
1576 let mut updated = item.clone();
1577 updated.locked_until = Some(now + extend_for.as_millis() as u64);
1578
1579 let doc_json = serde_json::to_value(&updated).unwrap();
1580 let resp = self
1581 .client()
1582 .replace_document(&item.id, &item.instance_id, &doc_json, item.etag.as_deref())
1583 .await?;
1584
1585 if resp.is_success() {
1586 if let Some(ref sid) = item.session_id {
1588 let piggyback_now = now_ms(); let session_doc_id = SessionDocument::doc_id(&item.instance_id, sid);
1590 if let Ok(sess_resp) = self
1591 .client()
1592 .read_document(&session_doc_id, &item.instance_id)
1593 .await
1594 {
1595 if sess_resp.is_success() {
1596 if let Ok(mut session_doc) =
1597 serde_json::from_str::<SessionDocument>(&sess_resp.body)
1598 {
1599 if session_doc.locked_until > piggyback_now {
1600 session_doc.last_activity = piggyback_now;
1601 let sess_json = serde_json::to_value(&session_doc).unwrap();
1602 let _ = self
1603 .client()
1604 .replace_document(
1605 &session_doc_id,
1606 &item.instance_id,
1607 &sess_json,
1608 sess_resp.etag.as_deref(),
1609 )
1610 .await;
1611 }
1612 }
1613 }
1614 }
1615 }
1616 Ok(())
1617 } else {
1618 Err(errors::map_cosmosdb_error(
1619 "renew_work_item_lock",
1620 resp.status,
1621 &resp.body,
1622 ))
1623 }
1624 }
1625
1626 async fn renew_session_lock(
1627 &self,
1628 owner_ids: &[&str],
1629 extend_for: Duration,
1630 idle_timeout: Duration,
1631 ) -> Result<usize, ProviderError> {
1632 if owner_ids.is_empty() {
1633 return Ok(0);
1634 }
1635
1636 let now = now_ms();
1637 let locked_until = now + extend_for.as_millis() as u64;
1638 let idle_cutoff = now.saturating_sub(idle_timeout.as_millis() as u64);
1639
1640 let sql = format!("SELECT * FROM c WHERE c.type = '{}'", DOC_TYPE_SESSION);
1642 let results = self.client().query(&sql, vec![], None).await?;
1643
1644 let mut count = 0usize;
1645 for doc in results {
1646 if let Ok(session) = serde_json::from_value::<SessionDocument>(doc) {
1647 if owner_ids.contains(&session.owner_id.as_str())
1652 && session.locked_until > now
1653 && session.last_activity > idle_cutoff
1654 {
1655 let mut updated = session.clone();
1656 updated.locked_until = locked_until;
1657 let doc_json = serde_json::to_value(&updated).unwrap();
1658 if let Ok(resp) = self
1659 .client()
1660 .replace_document(
1661 &session.id,
1662 &session.instance_id,
1663 &doc_json,
1664 session.etag.as_deref(),
1665 )
1666 .await
1667 {
1668 if resp.is_success() {
1669 count += 1;
1670 }
1671 }
1672 }
1673 }
1674 }
1675
1676 Ok(count)
1677 }
1678
1679 async fn cleanup_orphaned_sessions(
1680 &self,
1681 _idle_timeout: Duration,
1682 ) -> Result<usize, ProviderError> {
1683 let now = now_ms();
1684
1685 let sql = format!(
1687 "SELECT * FROM c WHERE c.type = '{}' AND c.lockedUntil < @now",
1688 DOC_TYPE_SESSION
1689 );
1690 let params = vec![crate::client::QueryParameter::new(
1691 "@now",
1692 serde_json::json!(now),
1693 )];
1694 let results = self.client().query(&sql, params, None).await?;
1695
1696 let mut count = 0usize;
1697 for doc in results {
1698 if let Ok(session) = serde_json::from_value::<SessionDocument>(doc) {
1699 let check_sql = format!(
1701 "SELECT VALUE COUNT(1) FROM c WHERE c.type = '{}' AND c.sessionId = @sid AND c.instanceId = @iid",
1702 DOC_TYPE_WORKER_QUEUE
1703 );
1704 let check_params = vec![
1705 crate::client::QueryParameter::new(
1706 "@sid",
1707 serde_json::json!(&session.session_id),
1708 ),
1709 crate::client::QueryParameter::new(
1710 "@iid",
1711 serde_json::json!(&session.instance_id),
1712 ),
1713 ];
1714 let worker_count = self
1715 .client()
1716 .query(&check_sql, check_params, Some(&session.instance_id))
1717 .await?;
1718 let has_items = worker_count
1719 .into_iter()
1720 .next()
1721 .and_then(|v| v.as_u64())
1722 .unwrap_or(0)
1723 > 0;
1724
1725 if !has_items {
1726 let _ = self
1728 .client()
1729 .delete_document(&session.id, &session.instance_id)
1730 .await;
1731 count += 1;
1732 }
1733 }
1734 }
1735
1736 Ok(count)
1737 }
1738
1739 async fn abandon_work_item(
1740 &self,
1741 token: &str,
1742 delay: Option<Duration>,
1743 ignore_attempt: bool,
1744 ) -> Result<(), ProviderError> {
1745 let now = now_ms();
1746 let items =
1747 query::find_items_by_lock_token(self.client(), token, DOC_TYPE_WORKER_QUEUE).await?;
1748
1749 for item in &items {
1750 let mut updated = item.clone();
1751 updated.lock_token = None;
1752 updated.locked_until = None;
1753 if let Some(d) = delay {
1754 updated.visible_at = now + d.as_millis() as u64;
1755 }
1756 if ignore_attempt && updated.attempt_count > 0 {
1757 updated.attempt_count -= 1;
1758 }
1759
1760 let doc_json = serde_json::to_value(&updated).unwrap();
1761 let _ = self
1762 .client()
1763 .replace_document(&item.id, &item.instance_id, &doc_json, item.etag.as_deref())
1764 .await;
1765 }
1766
1767 Ok(())
1768 }
1769
1770 async fn renew_orchestration_item_lock(
1771 &self,
1772 token: &str,
1773 extend_for: Duration,
1774 ) -> Result<(), ProviderError> {
1775 let now = now_ms();
1776
1777 let instance = query::find_instance_by_lock_token(self.client(), token).await?;
1779 let inst = instance.ok_or_else(|| {
1780 ProviderError::permanent(
1781 "renew_orchestration_item_lock",
1782 format!("No instance found with lock token {token}"),
1783 )
1784 })?;
1785
1786 if let Some(locked_until) = inst.locked_until {
1788 if locked_until <= now {
1789 return Err(ProviderError::permanent(
1790 "renew_orchestration_item_lock",
1791 "Lock has expired".to_string(),
1792 ));
1793 }
1794 }
1795
1796 let mut updated_inst = inst.clone();
1797 updated_inst.locked_until = Some(now + extend_for.as_millis() as u64);
1798 let doc_json = serde_json::to_value(&updated_inst).unwrap();
1799 let resp = self
1800 .client()
1801 .replace_document(&inst.id, &inst.instance_id, &doc_json, inst.etag.as_deref())
1802 .await?;
1803
1804 if !resp.is_success() {
1805 return Err(errors::map_cosmosdb_error(
1806 "renew_orchestration_item_lock",
1807 resp.status,
1808 &resp.body,
1809 ));
1810 }
1811
1812 let messages =
1814 query::find_items_by_lock_token(self.client(), token, DOC_TYPE_ORCH_QUEUE).await?;
1815 for msg in &messages {
1816 let mut updated = msg.clone();
1817 updated.locked_until = Some(now + extend_for.as_millis() as u64);
1818 let doc_json = serde_json::to_value(&updated).unwrap();
1819 let _ = self
1820 .client()
1821 .replace_document(&msg.id, &msg.instance_id, &doc_json, msg.etag.as_deref())
1822 .await;
1823 }
1824
1825 Ok(())
1826 }
1827
1828 async fn enqueue_for_orchestrator(
1831 &self,
1832 item: WorkItem,
1833 delay: Option<Duration>,
1834 ) -> Result<(), ProviderError> {
1835 let instance_id = work_item_instance(&item).to_string();
1836 let item_json = serde_json::to_string(&item).map_err(|e| {
1837 ProviderError::permanent("enqueue_for_orchestrator", format!("Serialize error: {e}"))
1838 })?;
1839
1840 let now = now_ms();
1841 let visible_at = delay.map(|d| now + d.as_millis() as u64).unwrap_or(now);
1842
1843 let doc = QueueItemDocument::new_orch_queue(&instance_id, item_json, visible_at, now);
1844 let doc_json = serde_json::to_value(&doc).unwrap();
1845
1846 let resp = self
1847 .client()
1848 .create_document(&instance_id, &doc_json)
1849 .await?;
1850 if !resp.is_success() {
1851 return Err(errors::map_cosmosdb_error(
1852 "enqueue_for_orchestrator",
1853 resp.status,
1854 &resp.body,
1855 ));
1856 }
1857
1858 Ok(())
1859 }
1860
1861 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1864 Some(self)
1865 }
1866
1867 async fn get_custom_status(
1870 &self,
1871 instance: &str,
1872 last_seen_version: u64,
1873 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1874 let inst = self.read_instance(instance).await?;
1875 match inst {
1876 Some(i) => {
1877 if i.custom_status_version > last_seen_version {
1878 Ok(Some((i.custom_status, i.custom_status_version)))
1879 } else {
1880 Ok(None)
1881 }
1882 }
1883 None => Ok(None),
1884 }
1885 }
1886
1887 async fn get_kv_value(
1888 &self,
1889 instance_id: &str,
1890 key: &str,
1891 ) -> Result<Option<String>, ProviderError> {
1892 let delta_doc_id = KeyValueDeltaDocument::doc_id(instance_id, key);
1893 let delta_resp = self
1894 .client()
1895 .read_document(&delta_doc_id, instance_id)
1896 .await?;
1897
1898 if delta_resp.is_success() {
1899 let doc: KeyValueDeltaDocument =
1900 serde_json::from_str(&delta_resp.body).map_err(|e| {
1901 ProviderError::permanent("get_kv_value", format!("Deserialize error: {e}"))
1902 })?;
1903 return Ok(doc.value);
1904 }
1905 if !errors::is_not_found(delta_resp.status) {
1906 return Err(errors::map_cosmosdb_error(
1907 "get_kv_value",
1908 delta_resp.status,
1909 &delta_resp.body,
1910 ));
1911 }
1912
1913 let store_doc_id = KeyValueDocument::doc_id(instance_id, key);
1914 let store_resp = self
1915 .client()
1916 .read_document(&store_doc_id, instance_id)
1917 .await?;
1918
1919 if errors::is_not_found(store_resp.status) {
1920 return Ok(None);
1921 }
1922 if !store_resp.is_success() {
1923 return Err(errors::map_cosmosdb_error(
1924 "get_kv_value",
1925 store_resp.status,
1926 &store_resp.body,
1927 ));
1928 }
1929
1930 let doc: KeyValueDocument = serde_json::from_str(&store_resp.body).map_err(|e| {
1931 ProviderError::permanent("get_kv_value", format!("Deserialize error: {e}"))
1932 })?;
1933 Ok(Some(doc.value))
1934 }
1935
1936 async fn get_kv_all_values(
1937 &self,
1938 instance_id: &str,
1939 ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1940 let mut map: std::collections::HashMap<String, String> = self
1941 .load_kv_store_documents(instance_id)
1942 .await?
1943 .into_iter()
1944 .map(|doc| (doc.key, doc.value))
1945 .collect();
1946
1947 for delta_doc in self.load_kv_delta_documents(instance_id).await? {
1948 match delta_doc.value {
1949 Some(value) => {
1950 map.insert(delta_doc.key, value);
1951 }
1952 None => {
1953 map.remove(&delta_doc.key);
1954 }
1955 }
1956 }
1957
1958 Ok(map)
1959 }
1960
1961 async fn get_instance_stats(
1962 &self,
1963 instance_id: &str,
1964 ) -> Result<Option<duroxide::SystemStats>, ProviderError> {
1965 let inst = match self.read_instance(instance_id).await? {
1966 Some(inst) => inst,
1967 None => return Ok(None),
1968 };
1969
1970 let history_docs =
1971 query::query_by_type_in_partition(self.client(), instance_id, DOC_TYPE_HISTORY)
1972 .await?
1973 .into_iter()
1974 .map(|doc| {
1975 serde_json::from_value::<HistoryDocument>(doc).map_err(|e| {
1976 ProviderError::permanent(
1977 "get_instance_stats",
1978 format!("Deserialize history document error: {e}"),
1979 )
1980 })
1981 })
1982 .collect::<Result<Vec<_>, _>>()?;
1983
1984 let (history_event_count, history_size_bytes) = history_docs
1985 .iter()
1986 .filter(|doc| doc.execution_id == inst.current_execution_id)
1987 .fold((0_u64, 0_u64), |(count, size), doc| {
1988 (count + 1, size + doc.event_data.len() as u64)
1989 });
1990
1991 let kv_values = self.get_kv_all_values(instance_id).await?;
1992 let kv_user_key_count = kv_values.len() as u64;
1993 let kv_total_value_bytes = kv_values.values().map(|value| value.len() as u64).sum();
1994
1995 let queue_pending_count = match history_docs
1996 .iter()
1997 .find(|doc| doc.execution_id == inst.current_execution_id && doc.event_id == 1)
1998 {
1999 Some(doc) => {
2000 let event = serde_json::from_str::<Event>(&doc.event_data).map_err(|e| {
2001 ProviderError::permanent(
2002 "get_instance_stats",
2003 format!("Failed to deserialize OrchestrationStarted event: {e}"),
2004 )
2005 })?;
2006 match event.kind {
2007 EventKind::OrchestrationStarted {
2008 carry_forward_events: Some(events),
2009 ..
2010 } => events.len() as u64,
2011 _ => 0,
2012 }
2013 }
2014 None => 0,
2015 };
2016
2017 Ok(Some(duroxide::SystemStats {
2018 history_event_count,
2019 history_size_bytes,
2020 queue_pending_count,
2021 kv_user_key_count,
2022 kv_total_value_bytes,
2023 }))
2024 }
2025}
2026
2027#[async_trait::async_trait]
2032impl ProviderAdmin for CosmosDBProvider {
2033 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
2034 let instances = query::query_instances(self.client(), None).await?;
2035 Ok(instances.iter().map(|i| i.instance_id.clone()).collect())
2036 }
2037
2038 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
2039 let instances = query::query_instances(self.client(), Some(status)).await?;
2040 Ok(instances.iter().map(|i| i.instance_id.clone()).collect())
2041 }
2042
2043 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
2044 let sql = format!(
2045 "SELECT DISTINCT VALUE c.executionId FROM c \
2046 WHERE c.instanceId = @instanceId AND c.type = '{}'",
2047 DOC_TYPE_HISTORY
2048 );
2049 let params = vec![crate::client::QueryParameter::new(
2050 "@instanceId",
2051 serde_json::json!(instance),
2052 )];
2053 let results = self.client().query(&sql, params, Some(instance)).await?;
2054 let mut exec_ids: Vec<u64> = results.into_iter().filter_map(|v| v.as_u64()).collect();
2055 exec_ids.sort();
2056
2057 if exec_ids.is_empty() {
2058 let inst = self.read_instance(instance).await?;
2060 if let Some(i) = inst {
2061 exec_ids.push(i.current_execution_id);
2062 }
2063 }
2064
2065 Ok(exec_ids)
2066 }
2067
2068 async fn read_history_with_execution_id(
2069 &self,
2070 instance: &str,
2071 execution_id: u64,
2072 ) -> Result<Vec<Event>, ProviderError> {
2073 self.read_with_execution(instance, execution_id).await
2074 }
2075
2076 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
2077 self.read(instance).await
2078 }
2079
2080 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
2081 let inst = self.read_instance(instance).await?.ok_or_else(|| {
2082 ProviderError::permanent(
2083 "latest_execution_id",
2084 format!("Instance {instance} not found"),
2085 )
2086 })?;
2087 Ok(inst.current_execution_id)
2088 }
2089
2090 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
2091 let inst = self.read_instance(instance).await?.ok_or_else(|| {
2092 ProviderError::permanent(
2093 "get_instance_info",
2094 format!("Instance {instance} not found"),
2095 )
2096 })?;
2097
2098 Ok(InstanceInfo {
2099 instance_id: inst.instance_id,
2100 orchestration_name: inst.orchestration_name,
2101 orchestration_version: inst.orchestration_version,
2102 current_execution_id: inst.current_execution_id,
2103 status: inst.status,
2104 output: inst.output,
2105 created_at: inst.created_at,
2106 updated_at: inst.updated_at,
2107 parent_instance_id: inst.parent_instance_id,
2108 })
2109 }
2110
2111 async fn get_execution_info(
2112 &self,
2113 instance: &str,
2114 execution_id: u64,
2115 ) -> Result<ExecutionInfo, ProviderError> {
2116 let events = self.read_with_execution(instance, execution_id).await?;
2117 let inst = self.read_instance(instance).await?;
2118
2119 let event_count = events.len();
2120 let started_at = inst.as_ref().map(|i| i.created_at).unwrap_or(0);
2121
2122 let status = if let Some(i) = &inst {
2123 if i.current_execution_id == execution_id {
2124 i.status.clone()
2125 } else {
2126 "ContinuedAsNew".to_string()
2127 }
2128 } else {
2129 "Unknown".to_string()
2130 };
2131
2132 let completed_at = if status == "Running" {
2133 None
2134 } else {
2135 inst.as_ref().map(|i| i.updated_at)
2136 };
2137
2138 Ok(ExecutionInfo {
2139 execution_id,
2140 status,
2141 output: inst.and_then(|i| i.output),
2142 started_at,
2143 completed_at,
2144 event_count,
2145 })
2146 }
2147
2148 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
2149 let instances = query::query_instances(self.client(), None).await?;
2150
2151 let total = instances.len() as u64;
2152 let running = instances.iter().filter(|i| i.status == "Running").count() as u64;
2153 let completed = instances.iter().filter(|i| i.status == "Completed").count() as u64;
2154 let failed = instances.iter().filter(|i| i.status == "Failed").count() as u64;
2155
2156 let total_events =
2158 query::count_by_type(self.client(), DOC_TYPE_HISTORY, None).await? as u64;
2159
2160 Ok(SystemMetrics {
2161 total_instances: total,
2162 total_executions: total, running_instances: running,
2164 completed_instances: completed,
2165 failed_instances: failed,
2166 total_events,
2167 })
2168 }
2169
2170 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
2171 let now = now_ms();
2172 let now_filter = format!("c.visibleAt <= {now} AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= {now})");
2173
2174 let orch =
2175 query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&now_filter)).await?;
2176
2177 let worker =
2178 query::count_by_type(self.client(), DOC_TYPE_WORKER_QUEUE, Some(&now_filter)).await?;
2179
2180 let timer_filter = format!("c.visibleAt > {now}");
2182 let timer =
2183 query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&timer_filter)).await?;
2184
2185 Ok(QueueDepths {
2186 orchestrator_queue: orch,
2187 worker_queue: worker,
2188 timer_queue: timer,
2189 })
2190 }
2191
2192 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2193 let sql = format!(
2194 "SELECT c.instanceId FROM c WHERE c.type = '{}' AND c.parentInstanceId = @parentId",
2195 DOC_TYPE_INSTANCE
2196 );
2197 let params = vec![crate::client::QueryParameter::new(
2198 "@parentId",
2199 serde_json::json!(instance_id),
2200 )];
2201 let results = self.client().query(&sql, params, None).await?;
2202 Ok(results
2203 .into_iter()
2204 .filter_map(|v| {
2205 v.get("instanceId")
2206 .and_then(|id| id.as_str())
2207 .map(|s| s.to_string())
2208 })
2209 .collect())
2210 }
2211
2212 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2213 let inst = self.read_instance(instance_id).await?.ok_or_else(|| {
2214 ProviderError::permanent("get_parent_id", format!("Instance {instance_id} not found"))
2215 })?;
2216 Ok(inst.parent_instance_id)
2217 }
2218
2219 async fn delete_instances_atomic(
2220 &self,
2221 ids: &[String],
2222 force: bool,
2223 ) -> Result<DeleteInstanceResult, ProviderError> {
2224 if ids.is_empty() {
2225 return Ok(DeleteInstanceResult::default());
2226 }
2227
2228 if !force {
2231 for id in ids {
2232 if let Some(inst) = self.read_instance(id).await? {
2233 if inst.status == "Running" {
2234 return Err(ProviderError::permanent(
2235 "delete_instances_atomic",
2236 format!("Instance {id} is still running. Use force=true to delete anyway, or cancel first."),
2237 ));
2238 }
2239 }
2240 }
2241 }
2242
2243 let id_set: std::collections::HashSet<&String> = ids.iter().collect();
2246 for id in ids {
2247 let children = self.list_children(id).await.unwrap_or_default();
2248 for child in &children {
2249 if !id_set.contains(child) {
2250 return Err(ProviderError::permanent(
2251 "delete_instances_atomic",
2252 format!(
2253 "Cannot delete: instance {id} has child {child} that was created after tree traversal. \
2254 Re-fetch the tree and retry."
2255 ),
2256 ));
2257 }
2258 }
2259 }
2260
2261 let mut result = DeleteInstanceResult::default();
2263
2264 for id in ids {
2265 let history_filter = format!("c.instanceId = '{id}'");
2267 let events_count =
2268 query::count_by_type(self.client(), DOC_TYPE_HISTORY, Some(&history_filter))
2269 .await
2270 .unwrap_or(0) as u64;
2271
2272 let orch_q_count =
2273 query::count_by_type(self.client(), DOC_TYPE_ORCH_QUEUE, Some(&history_filter))
2274 .await
2275 .unwrap_or(0) as u64;
2276
2277 let worker_q_count =
2278 query::count_by_type(self.client(), DOC_TYPE_WORKER_QUEUE, Some(&history_filter))
2279 .await
2280 .unwrap_or(0) as u64;
2281
2282 let exec_count = if self.read_instance(id).await?.is_some() {
2284 1u64
2285 } else {
2286 0u64
2287 };
2288
2289 let docs = query::query_all_in_partition(self.client(), id).await?;
2291 for doc in &docs {
2292 if let Some(doc_id) = doc.get("id").and_then(|v| v.as_str()) {
2293 let _ = self.client().delete_document(doc_id, id).await;
2294 }
2295 }
2296
2297 result.instances_deleted += 1;
2298 result.executions_deleted += exec_count;
2299 result.events_deleted += events_count;
2300 result.queue_messages_deleted += orch_q_count + worker_q_count;
2301 }
2302
2303 Ok(result)
2304 }
2305
2306 async fn delete_instance_bulk(
2307 &self,
2308 filter: InstanceFilter,
2309 ) -> Result<DeleteInstanceResult, ProviderError> {
2310 let all_instances = query::query_instances(self.client(), None).await?;
2312
2313 let mut candidates: Vec<InstanceDocument> = all_instances
2314 .into_iter()
2315 .filter(|inst| {
2316 inst.parent_instance_id.is_none()
2318 && (inst.status == "Completed" || inst.status == "Failed" || inst.status == "ContinuedAsNew")
2320 })
2321 .collect();
2322
2323 if let Some(ref ids) = filter.instance_ids {
2325 if ids.is_empty() {
2326 return Ok(DeleteInstanceResult::default());
2327 }
2328 candidates.retain(|inst| ids.contains(&inst.instance_id));
2329 }
2330
2331 if let Some(before) = filter.completed_before {
2333 candidates.retain(|inst| inst.updated_at < before);
2334 }
2335
2336 let limit = filter.limit.unwrap_or(1000) as usize;
2338 candidates.truncate(limit);
2339
2340 if candidates.is_empty() {
2341 return Ok(DeleteInstanceResult::default());
2342 }
2343
2344 let mut result = DeleteInstanceResult::default();
2346 for inst in &candidates {
2347 let tree = self.get_instance_tree(&inst.instance_id).await?;
2348 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
2349 result.instances_deleted += delete_result.instances_deleted;
2350 result.executions_deleted += delete_result.executions_deleted;
2351 result.events_deleted += delete_result.events_deleted;
2352 result.queue_messages_deleted += delete_result.queue_messages_deleted;
2353 }
2354
2355 Ok(result)
2356 }
2357
2358 async fn prune_executions(
2359 &self,
2360 instance_id: &str,
2361 options: PruneOptions,
2362 ) -> Result<PruneResult, ProviderError> {
2363 let inst = self.read_instance(instance_id).await?.ok_or_else(|| {
2364 ProviderError::permanent(
2365 "prune_executions",
2366 format!("Instance {instance_id} not found"),
2367 )
2368 })?;
2369
2370 let current_exec = inst.current_execution_id;
2371 let mut all_execs = self.list_executions(instance_id).await?;
2372 all_execs.sort();
2373
2374 let mut protected: std::collections::HashSet<u64> = std::collections::HashSet::new();
2380 protected.insert(current_exec);
2381
2382 if let Some(keep_last) = options.keep_last {
2383 let keep = keep_last as usize;
2384 let skip = all_execs.len().saturating_sub(keep);
2386 for &exec_id in &all_execs[skip..] {
2387 protected.insert(exec_id);
2388 }
2389 }
2390
2391 let to_prune: Vec<u64> = all_execs
2392 .into_iter()
2393 .filter(|e| !protected.contains(e))
2394 .collect();
2395
2396 let mut events_deleted = 0u64;
2397 let mut execs_deleted = 0u64;
2398
2399 for exec_id in &to_prune {
2400 let docs = query::fetch_history(self.client(), instance_id, *exec_id).await?;
2401 for doc in &docs {
2402 let _ = self.client().delete_document(&doc.id, instance_id).await;
2403 events_deleted += 1;
2404 }
2405
2406 execs_deleted += 1;
2409 }
2410
2411 Ok(PruneResult {
2412 instances_processed: 1,
2413 executions_deleted: execs_deleted,
2414 events_deleted,
2415 })
2416 }
2417
2418 async fn prune_executions_bulk(
2419 &self,
2420 filter: InstanceFilter,
2421 options: PruneOptions,
2422 ) -> Result<PruneResult, ProviderError> {
2423 let instances = if let Some(ids) = &filter.instance_ids {
2424 ids.clone()
2425 } else {
2426 self.list_instances().await?
2427 };
2428
2429 let mut total = PruneResult::default();
2430 for id in &instances {
2431 match self.prune_executions(id, options.clone()).await {
2432 Ok(r) => {
2433 total.instances_processed += r.instances_processed;
2434 total.executions_deleted += r.executions_deleted;
2435 total.events_deleted += r.events_deleted;
2436 }
2437 Err(_) => continue,
2438 }
2439 }
2440
2441 Ok(total)
2442 }
2443}