1use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use awaken_runtime_contract::contract::lifecycle::{RunStatus, TerminationReason};
14use awaken_runtime_contract::contract::storage::StorageError;
15use serde::{Deserialize, Serialize};
16
17use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum RunDispatchStatus {
35 Queued,
36 Claimed,
37 Acked,
38 Cancelled,
39 Superseded,
40 DeadLetter,
41}
42
43impl RunDispatchStatus {
44 pub fn is_terminal(self) -> bool {
46 matches!(
47 self,
48 Self::Acked | Self::Cancelled | Self::Superseded | Self::DeadLetter
49 )
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct RunDispatchResult {
62 pub run_id: String,
64 pub dispatch_instance_id: String,
66 pub status: RunStatus,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub termination: Option<TerminationReason>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub response: Option<String>,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
76 pub error: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct RunDispatch {
88 dispatch_id: String,
91 thread_id: String,
93 run_id: String,
95
96 priority: u8,
99 dedupe_key: Option<String>,
101 dispatch_epoch: u64,
103
104 status: RunDispatchStatus,
107 available_at: u64,
109 attempt_count: u32,
111 max_attempts: u32,
113 last_error: Option<String>,
115
116 claim_token: Option<String>,
119 claimed_by: Option<String>,
121 lease_until: Option<u64>,
123
124 #[serde(default, skip_serializing_if = "Option::is_none")]
127 dispatch_instance_id: Option<String>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
130 run_status: Option<RunStatus>,
131 #[serde(default, skip_serializing_if = "Option::is_none")]
133 termination: Option<TerminationReason>,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
136 run_response: Option<String>,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
139 run_error: Option<String>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
142 completed_at: Option<u64>,
143
144 created_at: u64,
147 updated_at: u64,
149}
150
151#[derive(Debug, Clone)]
158pub struct RunDispatchParts {
159 pub dispatch_id: String,
160 pub thread_id: String,
161 pub run_id: String,
162 pub priority: u8,
163 pub dedupe_key: Option<String>,
164 pub dispatch_epoch: u64,
165 pub status: RunDispatchStatus,
166 pub available_at: u64,
167 pub attempt_count: u32,
168 pub max_attempts: u32,
169 pub last_error: Option<String>,
170 pub claim_token: Option<String>,
171 pub claimed_by: Option<String>,
172 pub lease_until: Option<u64>,
173 pub dispatch_instance_id: Option<String>,
174 pub run_status: Option<RunStatus>,
175 pub termination: Option<TerminationReason>,
176 pub run_response: Option<String>,
177 pub run_error: Option<String>,
178 pub completed_at: Option<u64>,
179 pub created_at: u64,
180 pub updated_at: u64,
181}
182
183impl RunDispatch {
184 #[must_use]
187 pub fn queued(
188 dispatch_id: impl Into<String>,
189 thread_id: impl Into<String>,
190 run_id: impl Into<String>,
191 created_at: u64,
192 ) -> Self {
193 Self {
194 dispatch_id: dispatch_id.into(),
195 thread_id: thread_id.into(),
196 run_id: run_id.into(),
197 priority: 128,
198 dedupe_key: None,
199 dispatch_epoch: 0,
200 status: RunDispatchStatus::Queued,
201 available_at: created_at,
202 attempt_count: 0,
203 max_attempts: 5,
204 last_error: None,
205 claim_token: None,
206 claimed_by: None,
207 lease_until: None,
208 dispatch_instance_id: None,
209 run_status: None,
210 termination: None,
211 run_response: None,
212 run_error: None,
213 completed_at: None,
214 created_at,
215 updated_at: created_at,
216 }
217 }
218
219 #[must_use]
220 pub fn with_priority(mut self, priority: u8) -> Self {
221 self.priority = priority;
222 self
223 }
224
225 #[must_use]
226 pub fn with_dedupe_key(mut self, dedupe_key: impl Into<Option<String>>) -> Self {
227 self.dedupe_key = dedupe_key.into();
228 self
229 }
230
231 #[must_use]
232 pub fn with_available_at(mut self, available_at: u64) -> Self {
233 self.available_at = available_at;
234 self
235 }
236
237 #[must_use]
238 pub fn with_created_at(mut self, created_at: u64) -> Self {
239 self.created_at = created_at;
240 self
241 }
242
243 #[must_use]
244 pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
245 self.max_attempts = max_attempts;
246 self
247 }
248
249 #[must_use]
250 pub fn with_attempt_count(mut self, attempt_count: u32) -> Self {
251 self.attempt_count = attempt_count;
252 self
253 }
254
255 #[must_use]
256 pub fn with_dispatch_epoch(mut self, dispatch_epoch: u64) -> Self {
257 self.dispatch_epoch = dispatch_epoch;
258 self
259 }
260
261 pub fn from_persisted_parts(parts: RunDispatchParts) -> Result<Self, StorageError> {
262 let dispatch = Self {
263 dispatch_id: parts.dispatch_id,
264 thread_id: parts.thread_id,
265 run_id: parts.run_id,
266 priority: parts.priority,
267 dedupe_key: parts.dedupe_key,
268 dispatch_epoch: parts.dispatch_epoch,
269 status: parts.status,
270 available_at: parts.available_at,
271 attempt_count: parts.attempt_count,
272 max_attempts: parts.max_attempts,
273 last_error: parts.last_error,
274 claim_token: parts.claim_token,
275 claimed_by: parts.claimed_by,
276 lease_until: parts.lease_until,
277 dispatch_instance_id: parts.dispatch_instance_id,
278 run_status: parts.run_status,
279 termination: parts.termination,
280 run_response: parts.run_response,
281 run_error: parts.run_error,
282 completed_at: parts.completed_at,
283 created_at: parts.created_at,
284 updated_at: parts.updated_at,
285 };
286 dispatch.validate_for_persist()?;
287 Ok(dispatch)
288 }
289
290 #[must_use]
291 pub fn to_persisted_parts(&self) -> RunDispatchParts {
292 RunDispatchParts {
293 dispatch_id: self.dispatch_id.clone(),
294 thread_id: self.thread_id.clone(),
295 run_id: self.run_id.clone(),
296 priority: self.priority,
297 dedupe_key: self.dedupe_key.clone(),
298 dispatch_epoch: self.dispatch_epoch,
299 status: self.status,
300 available_at: self.available_at,
301 attempt_count: self.attempt_count,
302 max_attempts: self.max_attempts,
303 last_error: self.last_error.clone(),
304 claim_token: self.claim_token.clone(),
305 claimed_by: self.claimed_by.clone(),
306 lease_until: self.lease_until,
307 dispatch_instance_id: self.dispatch_instance_id.clone(),
308 run_status: self.run_status,
309 termination: self.termination.clone(),
310 run_response: self.run_response.clone(),
311 run_error: self.run_error.clone(),
312 completed_at: self.completed_at,
313 created_at: self.created_at,
314 updated_at: self.updated_at,
315 }
316 }
317
318 #[must_use]
319 pub fn dispatch_id(&self) -> &String {
320 &self.dispatch_id
321 }
322
323 #[must_use]
324 pub fn thread_id(&self) -> &String {
325 &self.thread_id
326 }
327
328 #[must_use]
329 pub fn run_id(&self) -> &String {
330 &self.run_id
331 }
332
333 #[must_use]
334 pub fn priority(&self) -> u8 {
335 self.priority
336 }
337
338 #[must_use]
339 pub fn dedupe_key(&self) -> Option<&str> {
340 self.dedupe_key.as_deref()
341 }
342
343 #[must_use]
344 pub fn dispatch_epoch(&self) -> u64 {
345 self.dispatch_epoch
346 }
347
348 #[must_use]
349 pub fn status(&self) -> RunDispatchStatus {
350 self.status
351 }
352
353 #[must_use]
354 pub fn available_at(&self) -> u64 {
355 self.available_at
356 }
357
358 #[must_use]
359 pub fn attempt_count(&self) -> u32 {
360 self.attempt_count
361 }
362
363 #[must_use]
364 pub fn max_attempts(&self) -> u32 {
365 self.max_attempts
366 }
367
368 #[must_use]
369 pub fn last_error(&self) -> Option<&str> {
370 self.last_error.as_deref()
371 }
372
373 #[must_use]
374 pub fn claim_token(&self) -> Option<&str> {
375 self.claim_token.as_deref()
376 }
377
378 #[must_use]
379 pub fn claimed_by(&self) -> Option<&str> {
380 self.claimed_by.as_deref()
381 }
382
383 #[must_use]
384 pub fn lease_until(&self) -> Option<u64> {
385 self.lease_until
386 }
387
388 #[must_use]
389 pub fn dispatch_instance_id(&self) -> Option<&str> {
390 self.dispatch_instance_id.as_deref()
391 }
392
393 #[must_use]
394 pub fn run_status(&self) -> Option<RunStatus> {
395 self.run_status
396 }
397
398 #[must_use]
399 pub fn termination(&self) -> Option<&TerminationReason> {
400 self.termination.as_ref()
401 }
402
403 #[must_use]
404 pub fn run_response(&self) -> Option<&str> {
405 self.run_response.as_deref()
406 }
407
408 #[must_use]
409 pub fn run_error(&self) -> Option<&str> {
410 self.run_error.as_deref()
411 }
412
413 #[must_use]
414 pub fn completed_at(&self) -> Option<u64> {
415 self.completed_at
416 }
417
418 #[must_use]
419 pub fn created_at(&self) -> u64 {
420 self.created_at
421 }
422
423 #[must_use]
424 pub fn updated_at(&self) -> u64 {
425 self.updated_at
426 }
427
428 pub fn prepare_for_enqueue(&mut self, dispatch_epoch: u64) {
431 self.dispatch_epoch = dispatch_epoch;
432 self.status = RunDispatchStatus::Queued;
433 self.claim_token = None;
434 self.claimed_by = None;
435 self.lease_until = None;
436 self.dispatch_instance_id = None;
437 self.run_status = None;
438 self.termination = None;
439 self.run_response = None;
440 self.run_error = None;
441 self.completed_at = None;
442 }
443
444 pub fn claim(
446 &mut self,
447 consumer_id: impl Into<String>,
448 claim_token: impl Into<String>,
449 lease_until: u64,
450 now: u64,
451 ) -> Result<(), StorageError> {
452 self.require_status(RunDispatchStatus::Queued, "claim")?;
453 self.status = RunDispatchStatus::Claimed;
454 self.claim_token = Some(claim_token.into());
455 self.claimed_by = Some(consumer_id.into());
456 self.lease_until = Some(lease_until);
457 self.updated_at = now;
458 self.validate_for_persist()
459 }
460
461 pub fn extend_lease(&mut self, lease_until: u64, now: u64) -> Result<(), StorageError> {
462 self.require_status(RunDispatchStatus::Claimed, "lease extension")?;
463 self.lease_until = Some(lease_until);
464 self.updated_at = now;
465 self.validate_for_persist()
466 }
467
468 pub fn record_dispatch_start(
469 &mut self,
470 dispatch_instance_id: impl Into<String>,
471 now: u64,
472 ) -> Result<(), StorageError> {
473 self.require_status(RunDispatchStatus::Claimed, "recording runtime start")?;
474 self.dispatch_instance_id = Some(dispatch_instance_id.into());
475 self.run_status = Some(RunStatus::Running);
476 self.termination = None;
477 self.run_response = None;
478 self.run_error = None;
479 self.completed_at = None;
480 self.updated_at = now;
481 self.validate_for_persist()
482 }
483
484 pub fn record_run_result(
485 &mut self,
486 result: &RunDispatchResult,
487 now: u64,
488 ) -> Result<(), StorageError> {
489 self.require_status(RunDispatchStatus::Claimed, "recording runtime result")?;
490 if result.run_id != self.run_id {
491 return Err(StorageError::Validation(format!(
492 "dispatch '{}' result run_id '{}' does not match '{}'",
493 self.dispatch_id, result.run_id, self.run_id
494 )));
495 }
496 self.dispatch_instance_id = Some(result.dispatch_instance_id.clone());
497 self.run_status = Some(result.status);
498 self.termination = result.termination.clone();
499 self.run_response = result.response.clone();
500 self.run_error = result.error.clone();
501 self.completed_at = Some(now);
502 self.updated_at = now;
503 self.validate_for_persist()
504 }
505
506 pub fn mark_acked(&mut self, now: u64) -> Result<(), StorageError> {
507 self.require_status(RunDispatchStatus::Claimed, "ack")?;
508 self.status = RunDispatchStatus::Acked;
509 self.completed_at = Some(now);
510 self.updated_at = now;
511 self.clear_claim_fields();
512 self.validate_for_persist()
513 }
514
515 pub fn mark_cancelled(&mut self, now: u64) -> Result<(), StorageError> {
516 self.require_status(RunDispatchStatus::Queued, "cancel")?;
517 self.status = RunDispatchStatus::Cancelled;
518 self.completed_at = Some(now);
519 self.updated_at = now;
520 self.clear_claim_fields();
521 self.validate_for_persist()
522 }
523
524 pub fn mark_superseded(&mut self, now: u64, reason: Option<&str>) -> Result<(), StorageError> {
525 self.require_status(RunDispatchStatus::Queued, "supersede")?;
526 self.status = RunDispatchStatus::Superseded;
527 self.completed_at = Some(now);
528 self.updated_at = now;
529 if let Some(reason) = reason {
530 self.last_error = Some(reason.to_string());
531 }
532 self.clear_claim_fields();
533 self.clear_runtime_projection();
534 self.validate_for_persist()
535 }
536
537 pub fn mark_superseded_at_epoch(
538 &mut self,
539 now: u64,
540 epoch: u64,
541 reason: Option<&str>,
542 ) -> Result<(), StorageError> {
543 self.dispatch_epoch = epoch;
544 match self.status {
545 RunDispatchStatus::Queued => self.mark_superseded(now, reason),
546 RunDispatchStatus::Claimed => {
547 self.status = RunDispatchStatus::Superseded;
548 self.completed_at = Some(now);
549 self.updated_at = now;
550 if let Some(reason) = reason {
551 self.last_error = Some(reason.to_string());
552 }
553 self.clear_claim_fields();
554 self.clear_runtime_projection();
555 self.validate_for_persist()
556 }
557 _ => Err(StorageError::Validation(format!(
558 "dispatch '{}' must be Queued or Claimed before epoch supersede",
559 self.dispatch_id
560 ))),
561 }
562 }
563
564 pub fn mark_dead_letter(&mut self, now: u64, error: &str) -> Result<(), StorageError> {
565 self.require_status(RunDispatchStatus::Claimed, "dead letter")?;
566 self.status = RunDispatchStatus::DeadLetter;
567 self.last_error = Some(error.to_string());
568 self.completed_at = Some(now);
569 self.updated_at = now;
570 self.clear_claim_fields();
571 self.validate_for_persist()
572 }
573
574 pub fn mark_nack_result(
575 &mut self,
576 now: u64,
577 retry_at: u64,
578 error: &str,
579 ) -> Result<(), StorageError> {
580 self.require_status(RunDispatchStatus::Claimed, "nack")?;
581 self.attempt_count = self.attempt_count.saturating_add(1);
582 self.last_error = Some(error.to_string());
583 self.updated_at = now;
584 self.clear_claim_fields();
585 if self.attempt_count >= self.max_attempts {
586 self.status = RunDispatchStatus::DeadLetter;
587 self.completed_at = Some(now);
588 } else {
589 self.status = RunDispatchStatus::Queued;
590 self.available_at = retry_at;
591 self.completed_at = None;
592 self.clear_runtime_projection();
593 }
594 self.validate_for_persist()
595 }
596
597 pub fn mark_expired_lease(
598 &mut self,
599 now: u64,
600 max_attempts_error: &str,
601 ) -> Result<(), StorageError> {
602 self.require_status(RunDispatchStatus::Claimed, "lease expiration")?;
603 self.attempt_count = self.attempt_count.saturating_add(1);
604 self.available_at = now;
605 self.updated_at = now;
606 self.clear_claim_fields();
607 self.clear_runtime_projection();
613 if self.attempt_count >= self.max_attempts {
614 self.status = RunDispatchStatus::DeadLetter;
615 self.last_error = Some(max_attempts_error.to_string());
616 self.completed_at = Some(now);
617 } else {
618 self.status = RunDispatchStatus::Queued;
619 self.completed_at = None;
620 }
621 self.validate_for_persist()
622 }
623
624 pub fn remap_identity(
625 &mut self,
626 dispatch_id: impl Into<String>,
627 thread_id: impl Into<String>,
628 run_id: impl Into<String>,
629 dedupe_key: Option<String>,
630 ) {
631 self.dispatch_id = dispatch_id.into();
632 self.thread_id = thread_id.into();
633 self.run_id = run_id.into();
634 self.dedupe_key = dedupe_key;
635 }
636
637 fn clear_claim_fields(&mut self) {
638 self.claim_token = None;
639 self.claimed_by = None;
640 self.lease_until = None;
641 }
642
643 fn clear_runtime_projection(&mut self) {
644 self.dispatch_instance_id = None;
645 self.run_status = None;
646 self.termination = None;
647 self.run_response = None;
648 self.run_error = None;
649 }
650
651 fn require_status(
652 &self,
653 expected: RunDispatchStatus,
654 transition: &str,
655 ) -> Result<(), StorageError> {
656 if self.status != expected {
657 return Err(StorageError::Validation(format!(
658 "dispatch '{}' must be {:?} before {transition}",
659 self.dispatch_id, expected
660 )));
661 }
662 Ok(())
663 }
664
665 pub fn validate_for_enqueue(&self) -> Result<(), StorageError> {
667 self.validate_identity_and_retry()?;
668 if self.status != RunDispatchStatus::Queued {
669 return Err(StorageError::Validation(format!(
670 "enqueued dispatch '{}' must start as Queued",
671 self.dispatch_id
672 )));
673 }
674 self.validate_queued()
675 }
676
677 pub fn validate_for_persist(&self) -> Result<(), StorageError> {
679 self.validate_identity_and_retry()?;
680 match self.status {
681 RunDispatchStatus::Queued => self.validate_queued(),
682 RunDispatchStatus::Claimed => {
683 if self
684 .claim_token
685 .as_deref()
686 .is_none_or(|value| value.trim().is_empty())
687 || self
688 .claimed_by
689 .as_deref()
690 .is_none_or(|value| value.trim().is_empty())
691 || self.lease_until.is_none()
692 {
693 return Err(StorageError::Validation(format!(
694 "Claimed dispatch '{}' must carry claim_token, claimed_by, and lease_until",
695 self.dispatch_id
696 )));
697 }
698 Ok(())
699 }
700 RunDispatchStatus::Acked
701 | RunDispatchStatus::Cancelled
702 | RunDispatchStatus::Superseded
703 | RunDispatchStatus::DeadLetter => {
704 if self.claim_token.is_some()
705 || self.claimed_by.is_some()
706 || self.lease_until.is_some()
707 {
708 return Err(StorageError::Validation(format!(
709 "{:?} dispatch '{}' must not carry active lease fields",
710 self.status, self.dispatch_id
711 )));
712 }
713 if self.completed_at.is_none() {
714 return Err(StorageError::Validation(format!(
715 "{:?} dispatch '{}' must carry completed_at",
716 self.status, self.dispatch_id
717 )));
718 }
719 Ok(())
720 }
721 }
722 }
723
724 fn validate_identity_and_retry(&self) -> Result<(), StorageError> {
725 require_non_empty("dispatch_id", &self.dispatch_id)?;
726 require_non_empty("thread_id", &self.thread_id)?;
727 require_non_empty("run_id", &self.run_id)?;
728 if self.max_attempts == 0 {
729 return Err(StorageError::Validation(format!(
730 "dispatch '{}' max_attempts must be greater than zero",
731 self.dispatch_id
732 )));
733 }
734 if self.attempt_count > self.max_attempts {
735 return Err(StorageError::Validation(format!(
736 "dispatch '{}' attempt_count must not exceed max_attempts",
737 self.dispatch_id
738 )));
739 }
740 Ok(())
741 }
742
743 fn validate_queued(&self) -> Result<(), StorageError> {
744 if self.claim_token.is_some() || self.claimed_by.is_some() || self.lease_until.is_some() {
745 return Err(StorageError::Validation(format!(
746 "Queued dispatch '{}' must not carry claim fields",
747 self.dispatch_id
748 )));
749 }
750 if self.completed_at.is_some() {
751 return Err(StorageError::Validation(format!(
752 "Queued dispatch '{}' must not carry completed_at",
753 self.dispatch_id
754 )));
755 }
756 if self.dispatch_instance_id.is_some()
757 || self.run_status.is_some()
758 || self.termination.is_some()
759 || self.run_response.is_some()
760 || self.run_error.is_some()
761 {
762 return Err(StorageError::Validation(format!(
763 "Queued dispatch '{}' must not carry runtime result fields",
764 self.dispatch_id
765 )));
766 }
767 Ok(())
768 }
769}
770
771fn require_non_empty(field: &str, value: &str) -> Result<(), StorageError> {
772 if value.trim().is_empty() {
773 return Err(StorageError::Validation(format!(
774 "{field} must not be empty"
775 )));
776 }
777 Ok(())
778}
779
780#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct MailboxInterrupt {
785 pub new_dispatch_epoch: u64,
787 pub active_dispatch: Option<RunDispatch>,
790 pub superseded_count: usize,
792}
793
794#[derive(Debug, Clone, Serialize, Deserialize)]
801pub struct MailboxInterruptDetails {
802 pub new_dispatch_epoch: u64,
804 pub active_dispatch: Option<RunDispatch>,
807 pub superseded_count: usize,
809 #[serde(default)]
814 pub superseded_dispatches: Vec<RunDispatch>,
815}
816
817impl MailboxInterruptDetails {
818 #[must_use]
819 pub fn into_summary(self) -> MailboxInterrupt {
820 MailboxInterrupt {
821 new_dispatch_epoch: self.new_dispatch_epoch,
822 active_dispatch: self.active_dispatch,
823 superseded_count: self.superseded_count,
824 }
825 }
826
827 #[must_use]
828 pub fn summary(&self) -> MailboxInterrupt {
829 MailboxInterrupt {
830 new_dispatch_epoch: self.new_dispatch_epoch,
831 active_dispatch: self.active_dispatch.clone(),
832 superseded_count: self.superseded_count,
833 }
834 }
835}
836
837impl From<MailboxInterrupt> for MailboxInterruptDetails {
838 fn from(interrupt: MailboxInterrupt) -> Self {
839 Self {
840 new_dispatch_epoch: interrupt.new_dispatch_epoch,
841 active_dispatch: interrupt.active_dispatch,
842 superseded_count: interrupt.superseded_count,
843 superseded_dispatches: Vec::new(),
844 }
845 }
846}
847
848impl From<MailboxInterruptDetails> for MailboxInterrupt {
849 fn from(details: MailboxInterruptDetails) -> Self {
850 details.into_summary()
851 }
852}
853
854pub use awaken_runtime_contract::contract::live_control::{
855 LiveCommandReceipt, LiveControlError, LiveDeliveryOutcome, LiveRunCommand, LiveRunCommandEntry,
856 LiveRunCommandSource, LiveRunCommandStream, LiveRunTarget,
857};
858
859#[async_trait]
867pub trait DispatchSignalReceipt: Send + Sync {
868 fn redelivery_attempts(&self) -> Option<u64> {
869 None
870 }
871
872 async fn ack(self: Box<Self>) -> Result<(), StorageError>;
873 async fn nack(self: Box<Self>) -> Result<(), StorageError>;
874 async fn nack_with_delay(self: Box<Self>, delay: Duration) -> Result<(), StorageError> {
875 let _ = delay;
876 self.nack().await
877 }
878}
879
880pub struct DispatchSignalEntry {
882 pub thread_id: String,
883 pub dispatch_id: String,
884 pub receipt: Box<dyn DispatchSignalReceipt>,
885}
886
887impl std::fmt::Debug for DispatchSignalEntry {
888 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
889 f.debug_struct("DispatchSignalEntry")
890 .field("thread_id", &self.thread_id)
891 .field("dispatch_id", &self.dispatch_id)
892 .finish_non_exhaustive()
893 }
894}
895
896#[async_trait]
906pub trait MailboxStore: Send + Sync {
907 async fn enqueue(&self, dispatch: &RunDispatch) -> Result<(), StorageError>;
913
914 async fn claim(
919 &self,
920 thread_id: &str,
921 consumer_id: &str,
922 lease_ms: u64,
923 now: u64,
924 limit: usize,
925 ) -> Result<Vec<RunDispatch>, StorageError>;
926
927 async fn claim_dispatch(
930 &self,
931 dispatch_id: &str,
932 consumer_id: &str,
933 lease_ms: u64,
934 now: u64,
935 ) -> Result<Option<RunDispatch>, StorageError>;
936
937 async fn ack(&self, dispatch_id: &str, claim_token: &str, now: u64)
942 -> Result<(), StorageError>;
943
944 async fn record_dispatch_start(
949 &self,
950 dispatch_id: &str,
951 claim_token: &str,
952 dispatch_instance_id: &str,
953 now: u64,
954 ) -> Result<(), StorageError>;
955
956 async fn record_run_result(
961 &self,
962 dispatch_id: &str,
963 claim_token: &str,
964 result: &RunDispatchResult,
965 now: u64,
966 ) -> Result<(), StorageError>;
967
968 async fn nack(
972 &self,
973 dispatch_id: &str,
974 claim_token: &str,
975 retry_at: u64,
976 error: &str,
977 now: u64,
978 ) -> Result<(), StorageError>;
979
980 async fn dead_letter(
982 &self,
983 dispatch_id: &str,
984 claim_token: &str,
985 error: &str,
986 now: u64,
987 ) -> Result<(), StorageError>;
988
989 async fn cancel(
992 &self,
993 dispatch_id: &str,
994 now: u64,
995 ) -> Result<Option<RunDispatch>, StorageError>;
996
997 async fn extend_lease(
1000 &self,
1001 dispatch_id: &str,
1002 claim_token: &str,
1003 extension_ms: u64,
1004 now: u64,
1005 ) -> Result<bool, StorageError>;
1006
1007 async fn interrupt(&self, thread_id: &str, now: u64) -> Result<MailboxInterrupt, StorageError>;
1012
1013 async fn interrupt_detailed(
1019 &self,
1020 thread_id: &str,
1021 now: u64,
1022 ) -> Result<MailboxInterruptDetails, StorageError> {
1023 self.interrupt(thread_id, now).await.map(Into::into)
1024 }
1025
1026 async fn current_dispatch_epoch(&self, thread_id: &str) -> Result<u64, StorageError> {
1032 let _ = thread_id;
1033 Ok(0)
1034 }
1035
1036 async fn supersede_claimed(
1043 &self,
1044 dispatch_id: &str,
1045 claim_token: &str,
1046 now: u64,
1047 reason: &str,
1048 ) -> Result<Option<RunDispatch>, StorageError> {
1049 let _ = (dispatch_id, claim_token, now, reason);
1050 Err(StorageError::Io(
1051 "supersede claimed dispatch is not supported by this mailbox store".into(),
1052 ))
1053 }
1054
1055 async fn load_dispatch(&self, dispatch_id: &str) -> Result<Option<RunDispatch>, StorageError>;
1059
1060 async fn list_dispatches(
1062 &self,
1063 thread_id: &str,
1064 status_filter: Option<&[RunDispatchStatus]>,
1065 limit: usize,
1066 offset: usize,
1067 ) -> Result<Vec<RunDispatch>, StorageError>;
1068
1069 async fn count_dispatches_by_status(
1074 &self,
1075 status: RunDispatchStatus,
1076 ) -> Result<usize, StorageError> {
1077 let _ = status;
1078 Err(StorageError::Io(
1079 "count dispatches by status is not supported by this mailbox store".into(),
1080 ))
1081 }
1082
1083 async fn list_terminal_dispatches(
1089 &self,
1090 limit: usize,
1091 offset: usize,
1092 ) -> Result<Vec<RunDispatch>, StorageError> {
1093 let _ = (limit, offset);
1094 Err(StorageError::Io(
1095 "list terminal dispatches is not supported by this mailbox store".into(),
1096 ))
1097 }
1098
1099 async fn reclaim_expired_leases(
1105 &self,
1106 now: u64,
1107 limit: usize,
1108 ) -> Result<Vec<RunDispatch>, StorageError>;
1109
1110 async fn purge_terminal(&self, older_than: u64) -> Result<usize, StorageError>;
1113
1114 async fn queued_thread_ids(&self) -> Result<Vec<String>, StorageError>;
1117
1118 fn supports_dispatch_signals(&self) -> bool {
1122 false
1123 }
1124
1125 async fn pull_dispatch_signals(
1130 &self,
1131 max: usize,
1132 expires: Duration,
1133 ) -> Result<Vec<DispatchSignalEntry>, StorageError> {
1134 let _ = (max, expires);
1135 Ok(Vec::new())
1136 }
1137
1138 async fn deliver_live(
1151 &self,
1152 thread_id: &str,
1153 cmd: LiveRunCommand,
1154 ) -> Result<LiveDeliveryOutcome, StorageError> {
1155 let _ = (thread_id, cmd);
1156 Ok(LiveDeliveryOutcome::NoSubscriber)
1157 }
1158
1159 async fn deliver_live_to(
1165 &self,
1166 target: &LiveRunTarget,
1167 cmd: LiveRunCommand,
1168 ) -> Result<LiveDeliveryOutcome, StorageError> {
1169 self.deliver_live(&target.thread_id, cmd).await
1170 }
1171
1172 async fn open_live_channel(
1175 &self,
1176 thread_id: &str,
1177 ) -> Result<LiveRunCommandStream, StorageError> {
1178 let _ = thread_id;
1179 Ok(Box::pin(futures::stream::empty()))
1180 }
1181
1182 async fn open_live_channel_for(
1184 &self,
1185 target: &LiveRunTarget,
1186 ) -> Result<LiveRunCommandStream, StorageError> {
1187 self.open_live_channel(&target.thread_id).await
1188 }
1189}
1190
1191pub struct MailboxLiveControlSource(Arc<dyn MailboxStore>);
1200
1201impl MailboxLiveControlSource {
1202 pub fn new(store: Arc<dyn MailboxStore>) -> Self {
1203 Self(store)
1204 }
1205}
1206
1207#[async_trait]
1208impl LiveRunCommandSource for MailboxLiveControlSource {
1209 async fn open_live_channel_for(
1210 &self,
1211 target: &LiveRunTarget,
1212 ) -> Result<LiveRunCommandStream, LiveControlError> {
1213 MailboxStore::open_live_channel_for(self.0.as_ref(), target)
1214 .await
1215 .map_err(|error| LiveControlError::Subscribe(error.to_string()))
1216 }
1217}
1218
1219#[derive(Clone)]
1220pub struct ScopedMailboxStore {
1221 inner: Arc<dyn MailboxStore>,
1222 scope_id: ScopeId,
1223}
1224
1225impl ScopedMailboxStore {
1226 pub fn new(inner: Arc<dyn MailboxStore>, scope_id: ScopeId) -> Self {
1227 Self { inner, scope_id }
1228 }
1229
1230 pub fn scope_id(&self) -> &ScopeId {
1231 &self.scope_id
1232 }
1233
1234 pub fn inner(&self) -> &dyn MailboxStore {
1235 self.inner.as_ref()
1236 }
1237
1238 fn scoped(&self, id: &str) -> String {
1239 scoped_key(&self.scope_id, id)
1240 }
1241
1242 fn unscoped<'a>(&self, id: &'a str) -> Option<&'a str> {
1243 unscoped_key(&self.scope_id, id)
1244 }
1245
1246 fn encode_dispatch(&self, dispatch: &RunDispatch) -> RunDispatch {
1247 let mut dispatch = dispatch.clone();
1248 dispatch.dispatch_id = self.scoped(&dispatch.dispatch_id);
1249 dispatch.thread_id = self.scoped(&dispatch.thread_id);
1250 dispatch.run_id = self.scoped(&dispatch.run_id);
1251 dispatch.dedupe_key = dispatch.dedupe_key.as_deref().map(|key| self.scoped(key));
1252 dispatch
1253 }
1254
1255 fn decode_dispatch(&self, mut dispatch: RunDispatch) -> Option<RunDispatch> {
1256 dispatch.dispatch_id = self.unscoped(&dispatch.dispatch_id)?.to_string();
1257 dispatch.thread_id = self.unscoped(&dispatch.thread_id)?.to_string();
1258 dispatch.run_id = self.unscoped(&dispatch.run_id)?.to_string();
1259 dispatch.dedupe_key = dispatch
1260 .dedupe_key
1261 .as_deref()
1262 .map(|key| self.unscoped(key).map(str::to_string))
1263 .unwrap_or(None);
1264 Some(dispatch)
1265 }
1266
1267 fn encode_target(&self, target: &LiveRunTarget) -> LiveRunTarget {
1268 LiveRunTarget {
1269 thread_id: self.scoped(&target.thread_id),
1270 run_id: self.scoped(&target.run_id),
1271 dispatch_id: target.dispatch_id.as_deref().map(|id| self.scoped(id)),
1272 }
1273 }
1274
1275 fn encode_result(&self, result: &RunDispatchResult) -> RunDispatchResult {
1276 let mut result = result.clone();
1277 result.run_id = self.scoped(&result.run_id);
1278 result
1279 }
1280}
1281
1282#[async_trait]
1283impl MailboxStore for ScopedMailboxStore {
1284 async fn enqueue(&self, dispatch: &RunDispatch) -> Result<(), StorageError> {
1285 self.inner.enqueue(&self.encode_dispatch(dispatch)).await
1286 }
1287
1288 async fn claim(
1289 &self,
1290 thread_id: &str,
1291 consumer_id: &str,
1292 lease_ms: u64,
1293 now: u64,
1294 limit: usize,
1295 ) -> Result<Vec<RunDispatch>, StorageError> {
1296 Ok(self
1297 .inner
1298 .claim(&self.scoped(thread_id), consumer_id, lease_ms, now, limit)
1299 .await?
1300 .into_iter()
1301 .filter_map(|dispatch| self.decode_dispatch(dispatch))
1302 .collect())
1303 }
1304
1305 async fn claim_dispatch(
1306 &self,
1307 dispatch_id: &str,
1308 consumer_id: &str,
1309 lease_ms: u64,
1310 now: u64,
1311 ) -> Result<Option<RunDispatch>, StorageError> {
1312 Ok(self
1313 .inner
1314 .claim_dispatch(&self.scoped(dispatch_id), consumer_id, lease_ms, now)
1315 .await?
1316 .and_then(|dispatch| self.decode_dispatch(dispatch)))
1317 }
1318
1319 async fn ack(
1320 &self,
1321 dispatch_id: &str,
1322 claim_token: &str,
1323 now: u64,
1324 ) -> Result<(), StorageError> {
1325 self.inner
1326 .ack(&self.scoped(dispatch_id), claim_token, now)
1327 .await
1328 }
1329
1330 async fn record_dispatch_start(
1331 &self,
1332 dispatch_id: &str,
1333 claim_token: &str,
1334 dispatch_instance_id: &str,
1335 now: u64,
1336 ) -> Result<(), StorageError> {
1337 self.inner
1338 .record_dispatch_start(
1339 &self.scoped(dispatch_id),
1340 claim_token,
1341 dispatch_instance_id,
1342 now,
1343 )
1344 .await
1345 }
1346
1347 async fn record_run_result(
1348 &self,
1349 dispatch_id: &str,
1350 claim_token: &str,
1351 result: &RunDispatchResult,
1352 now: u64,
1353 ) -> Result<(), StorageError> {
1354 self.inner
1355 .record_run_result(
1356 &self.scoped(dispatch_id),
1357 claim_token,
1358 &self.encode_result(result),
1359 now,
1360 )
1361 .await
1362 }
1363
1364 async fn nack(
1365 &self,
1366 dispatch_id: &str,
1367 claim_token: &str,
1368 retry_at: u64,
1369 error: &str,
1370 now: u64,
1371 ) -> Result<(), StorageError> {
1372 self.inner
1373 .nack(&self.scoped(dispatch_id), claim_token, retry_at, error, now)
1374 .await
1375 }
1376
1377 async fn dead_letter(
1378 &self,
1379 dispatch_id: &str,
1380 claim_token: &str,
1381 error: &str,
1382 now: u64,
1383 ) -> Result<(), StorageError> {
1384 self.inner
1385 .dead_letter(&self.scoped(dispatch_id), claim_token, error, now)
1386 .await
1387 }
1388
1389 async fn cancel(
1390 &self,
1391 dispatch_id: &str,
1392 now: u64,
1393 ) -> Result<Option<RunDispatch>, StorageError> {
1394 Ok(self
1395 .inner
1396 .cancel(&self.scoped(dispatch_id), now)
1397 .await?
1398 .and_then(|dispatch| self.decode_dispatch(dispatch)))
1399 }
1400
1401 async fn extend_lease(
1402 &self,
1403 dispatch_id: &str,
1404 claim_token: &str,
1405 extension_ms: u64,
1406 now: u64,
1407 ) -> Result<bool, StorageError> {
1408 self.inner
1409 .extend_lease(&self.scoped(dispatch_id), claim_token, extension_ms, now)
1410 .await
1411 }
1412
1413 async fn interrupt(&self, thread_id: &str, now: u64) -> Result<MailboxInterrupt, StorageError> {
1414 let interrupt = self.inner.interrupt(&self.scoped(thread_id), now).await?;
1415 Ok(MailboxInterrupt {
1416 new_dispatch_epoch: interrupt.new_dispatch_epoch,
1417 active_dispatch: interrupt
1418 .active_dispatch
1419 .and_then(|dispatch| self.decode_dispatch(dispatch)),
1420 superseded_count: interrupt.superseded_count,
1421 })
1422 }
1423
1424 async fn interrupt_detailed(
1425 &self,
1426 thread_id: &str,
1427 now: u64,
1428 ) -> Result<MailboxInterruptDetails, StorageError> {
1429 let details = self
1430 .inner
1431 .interrupt_detailed(&self.scoped(thread_id), now)
1432 .await?;
1433 let superseded_dispatches: Vec<_> = details
1434 .superseded_dispatches
1435 .into_iter()
1436 .filter_map(|dispatch| self.decode_dispatch(dispatch))
1437 .collect();
1438 Ok(MailboxInterruptDetails {
1439 new_dispatch_epoch: details.new_dispatch_epoch,
1440 active_dispatch: details
1441 .active_dispatch
1442 .and_then(|dispatch| self.decode_dispatch(dispatch)),
1443 superseded_count: superseded_dispatches.len(),
1444 superseded_dispatches,
1445 })
1446 }
1447
1448 async fn current_dispatch_epoch(&self, thread_id: &str) -> Result<u64, StorageError> {
1449 self.inner
1450 .current_dispatch_epoch(&self.scoped(thread_id))
1451 .await
1452 }
1453
1454 async fn supersede_claimed(
1455 &self,
1456 dispatch_id: &str,
1457 claim_token: &str,
1458 now: u64,
1459 reason: &str,
1460 ) -> Result<Option<RunDispatch>, StorageError> {
1461 Ok(self
1462 .inner
1463 .supersede_claimed(&self.scoped(dispatch_id), claim_token, now, reason)
1464 .await?
1465 .and_then(|dispatch| self.decode_dispatch(dispatch)))
1466 }
1467
1468 async fn load_dispatch(&self, dispatch_id: &str) -> Result<Option<RunDispatch>, StorageError> {
1469 Ok(self
1470 .inner
1471 .load_dispatch(&self.scoped(dispatch_id))
1472 .await?
1473 .and_then(|dispatch| self.decode_dispatch(dispatch)))
1474 }
1475
1476 async fn list_dispatches(
1477 &self,
1478 thread_id: &str,
1479 status_filter: Option<&[RunDispatchStatus]>,
1480 limit: usize,
1481 offset: usize,
1482 ) -> Result<Vec<RunDispatch>, StorageError> {
1483 Ok(self
1484 .inner
1485 .list_dispatches(&self.scoped(thread_id), status_filter, limit, offset)
1486 .await?
1487 .into_iter()
1488 .filter_map(|dispatch| self.decode_dispatch(dispatch))
1489 .collect())
1490 }
1491
1492 async fn count_dispatches_by_status(
1493 &self,
1494 status: RunDispatchStatus,
1495 ) -> Result<usize, StorageError> {
1496 match status {
1497 RunDispatchStatus::Queued => {
1498 let mut total = 0;
1499 for thread_id in self.queued_thread_ids().await? {
1500 total += self
1501 .list_dispatches(
1502 &thread_id,
1503 Some(&[RunDispatchStatus::Queued]),
1504 usize::MAX,
1505 0,
1506 )
1507 .await?
1508 .len();
1509 }
1510 Ok(total)
1511 }
1512 status if status.is_terminal() => Ok(self
1513 .list_terminal_dispatches(usize::MAX, 0)
1514 .await?
1515 .into_iter()
1516 .filter(|dispatch| dispatch.status == status)
1517 .count()),
1518 _ => Err(StorageError::Io(
1519 "scoped claimed dispatch count is not supported".into(),
1520 )),
1521 }
1522 }
1523
1524 async fn list_terminal_dispatches(
1525 &self,
1526 limit: usize,
1527 offset: usize,
1528 ) -> Result<Vec<RunDispatch>, StorageError> {
1529 let all: Vec<_> = self
1530 .inner
1531 .list_terminal_dispatches(usize::MAX, 0)
1532 .await?
1533 .into_iter()
1534 .filter_map(|dispatch| self.decode_dispatch(dispatch))
1535 .collect();
1536 Ok(all.into_iter().skip(offset).take(limit).collect())
1537 }
1538
1539 async fn reclaim_expired_leases(
1540 &self,
1541 now: u64,
1542 limit: usize,
1543 ) -> Result<Vec<RunDispatch>, StorageError> {
1544 Ok(self
1545 .inner
1546 .reclaim_expired_leases(now, limit)
1547 .await?
1548 .into_iter()
1549 .filter_map(|dispatch| self.decode_dispatch(dispatch))
1550 .collect())
1551 }
1552
1553 async fn purge_terminal(&self, _older_than: u64) -> Result<usize, StorageError> {
1554 Err(StorageError::Io(
1555 "scoped terminal dispatch purge is not supported".into(),
1556 ))
1557 }
1558
1559 async fn queued_thread_ids(&self) -> Result<Vec<String>, StorageError> {
1560 Ok(self
1561 .inner
1562 .queued_thread_ids()
1563 .await?
1564 .into_iter()
1565 .filter_map(|thread_id| self.unscoped(&thread_id).map(str::to_string))
1566 .collect())
1567 }
1568
1569 fn supports_dispatch_signals(&self) -> bool {
1570 self.inner.supports_dispatch_signals()
1571 }
1572
1573 async fn pull_dispatch_signals(
1574 &self,
1575 max: usize,
1576 expires: Duration,
1577 ) -> Result<Vec<DispatchSignalEntry>, StorageError> {
1578 Ok(self
1579 .inner
1580 .pull_dispatch_signals(max, expires)
1581 .await?
1582 .into_iter()
1583 .filter_map(|entry| {
1584 Some(DispatchSignalEntry {
1585 thread_id: self.unscoped(&entry.thread_id)?.to_string(),
1586 dispatch_id: self.unscoped(&entry.dispatch_id)?.to_string(),
1587 receipt: entry.receipt,
1588 })
1589 })
1590 .collect())
1591 }
1592
1593 async fn deliver_live(
1594 &self,
1595 thread_id: &str,
1596 cmd: LiveRunCommand,
1597 ) -> Result<LiveDeliveryOutcome, StorageError> {
1598 self.inner.deliver_live(&self.scoped(thread_id), cmd).await
1599 }
1600
1601 async fn deliver_live_to(
1602 &self,
1603 target: &LiveRunTarget,
1604 cmd: LiveRunCommand,
1605 ) -> Result<LiveDeliveryOutcome, StorageError> {
1606 self.inner
1607 .deliver_live_to(&self.encode_target(target), cmd)
1608 .await
1609 }
1610
1611 async fn open_live_channel(
1612 &self,
1613 thread_id: &str,
1614 ) -> Result<LiveRunCommandStream, StorageError> {
1615 self.inner.open_live_channel(&self.scoped(thread_id)).await
1616 }
1617
1618 async fn open_live_channel_for(
1619 &self,
1620 target: &LiveRunTarget,
1621 ) -> Result<LiveRunCommandStream, StorageError> {
1622 self.inner
1623 .open_live_channel_for(&self.encode_target(target))
1624 .await
1625 }
1626}
1627
1628#[cfg(test)]
1629#[path = "mailbox_tests.rs"]
1630mod tests;