Skip to main content

awaken_server_contract/contract/
mailbox.rs

1//! Run dispatch data vocabulary and the mailbox store contract.
2//!
3//! Both the `RunDispatch*` data types and the `MailboxStore` persistence trait
4//! (plus the `DispatchSignal*` durable-signal pair and the
5//! `MailboxStore`->`LiveRunCommandSource` bridge) are server/store concerns and
6//! live here. The runtime engine references none of them; it steers a live run
7//! only through the narrower `live_control` port in runtime-contract.
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use awaken_runtime_contract::contract::lifecycle::{RunStatus, TerminationReason};
14use awaken_runtime_contract::contract::storage::StorageError;
15use serde::{Deserialize, Serialize};
16
17use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
18
19// ── RunDispatchStatus ───────────────────────────────────────────────
20
21/// Six-state lifecycle for a dispatch attempt.
22///
23/// ```text
24/// Queued ──claim──> Claimed ──ack──> Acked (terminal)
25///   |                  |
26///   |               nack(retry) ──> Queued (attempt_count++, available_at = retry_at)
27///   |                  |
28///   |               nack(permanent) ──> DeadLetter (terminal)
29///   |
30///   |── cancel ──> Cancelled (terminal)
31///   └── interrupt(dispatch epoch bump) ──> Superseded (terminal)
32/// ```
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum RunDispatchStatus {
35    Queued,
36    Claimed,
37    Acked,
38    Cancelled,
39    Superseded,
40    DeadLetter,
41}
42
43impl RunDispatchStatus {
44    /// Returns `true` for terminal states that cannot transition further.
45    pub fn is_terminal(self) -> bool {
46        matches!(
47            self,
48            Self::Acked | Self::Cancelled | Self::Superseded | Self::DeadLetter
49        )
50    }
51}
52
53// ── RunDispatchResult ────────────────────────────────────────────────
54
55/// Durable runtime-result projection for the dispatch that consumed a run.
56///
57/// `RunRecord` remains the source of truth for business outcome. This compact
58/// projection exists on the queue record so operators can inspect what happened
59/// to a claimed dispatch without treating `Acked` as agent success.
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct RunDispatchResult {
62    /// Runtime run ID used by the execution engine.
63    pub run_id: String,
64    /// Dispatch attempt ID that links a queue claim to a runtime invocation.
65    pub dispatch_instance_id: String,
66    /// Durable runtime status reached by this run.
67    pub status: RunStatus,
68    /// Structured terminal reason, if the runtime reached a terminal state.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub termination: Option<TerminationReason>,
71    /// Final response text, when available.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub response: Option<String>,
74    /// Runtime error text, when available.
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub error: Option<String>,
77}
78
79// ── RunDispatch ─────────────────────────────────────────────────────
80
81/// A run dispatch persisted in the mailbox queue.
82///
83/// This record owns delivery/lease/retry state only. Business request,
84/// message, and outcome semantics live on `RunRecord` and the thread message
85/// log.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct RunDispatch {
88    // ── identity ──
89    /// UUID v7, globally unique.
90    dispatch_id: String,
91    /// Thread ID, routing anchor.
92    thread_id: String,
93    /// Canonical runtime run ID this dispatch activates.
94    run_id: String,
95
96    // ── queue semantics ──
97    /// 0 = highest, 255 = lowest, default 128.
98    priority: u8,
99    /// Idempotent delivery key.
100    dedupe_key: Option<String>,
101    /// Thread dispatch epoch captured when this dispatch was created.
102    dispatch_epoch: u64,
103
104    // ── lifecycle ──
105    /// Current status.
106    status: RunDispatchStatus,
107    /// Unix millis; future value = delayed delivery.
108    available_at: u64,
109    /// Number of claim attempts so far.
110    attempt_count: u32,
111    /// Maximum attempts before dead-lettering (default 5).
112    max_attempts: u32,
113    /// Last error message.
114    last_error: Option<String>,
115
116    // ── lease ──
117    /// UUID set on claim.
118    claim_token: Option<String>,
119    /// Consumer identifier (process) that claimed this dispatch.
120    claimed_by: Option<String>,
121    /// Unix millis, extended by heartbeat.
122    lease_until: Option<u64>,
123
124    // ── runtime trace ──
125    /// Dispatch attempt ID associated with the current/latest claim.
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    dispatch_instance_id: Option<String>,
128    /// Runtime status associated with this dispatch's current/latest run.
129    #[serde(default, skip_serializing_if = "Option::is_none")]
130    run_status: Option<RunStatus>,
131    /// Structured terminal reason for the current/latest run.
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    termination: Option<TerminationReason>,
134    /// Final response text for the current/latest run.
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    run_response: Option<String>,
137    /// Runtime error text for the current/latest run.
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    run_error: Option<String>,
140    /// Unix millis when the runtime result was recorded.
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    completed_at: Option<u64>,
143
144    // ── timestamps ──
145    /// Unix millis when the dispatch was created.
146    created_at: u64,
147    /// Unix millis of the last update.
148    updated_at: u64,
149}
150
151/// Explicit persisted representation used by store codecs.
152///
153/// This bag keeps deserialization and backend row decoding possible without
154/// reopening `RunDispatch` to arbitrary field mutation. Convert it with
155/// [`RunDispatch::from_persisted_parts`], which validates lifecycle
156/// invariants before returning a dispatch.
157#[derive(Debug, Clone)]
158pub struct RunDispatchParts {
159    pub dispatch_id: String,
160    pub thread_id: String,
161    pub run_id: String,
162    pub priority: u8,
163    pub dedupe_key: Option<String>,
164    pub dispatch_epoch: u64,
165    pub status: RunDispatchStatus,
166    pub available_at: u64,
167    pub attempt_count: u32,
168    pub max_attempts: u32,
169    pub last_error: Option<String>,
170    pub claim_token: Option<String>,
171    pub claimed_by: Option<String>,
172    pub lease_until: Option<u64>,
173    pub dispatch_instance_id: Option<String>,
174    pub run_status: Option<RunStatus>,
175    pub termination: Option<TerminationReason>,
176    pub run_response: Option<String>,
177    pub run_error: Option<String>,
178    pub completed_at: Option<u64>,
179    pub created_at: u64,
180    pub updated_at: u64,
181}
182
183impl RunDispatch {
184    /// Build a queued dispatch. The result is validateable for enqueue and is
185    /// the only public constructor for new mailbox records.
186    #[must_use]
187    pub fn queued(
188        dispatch_id: impl Into<String>,
189        thread_id: impl Into<String>,
190        run_id: impl Into<String>,
191        created_at: u64,
192    ) -> Self {
193        Self {
194            dispatch_id: dispatch_id.into(),
195            thread_id: thread_id.into(),
196            run_id: run_id.into(),
197            priority: 128,
198            dedupe_key: None,
199            dispatch_epoch: 0,
200            status: RunDispatchStatus::Queued,
201            available_at: created_at,
202            attempt_count: 0,
203            max_attempts: 5,
204            last_error: None,
205            claim_token: None,
206            claimed_by: None,
207            lease_until: None,
208            dispatch_instance_id: None,
209            run_status: None,
210            termination: None,
211            run_response: None,
212            run_error: None,
213            completed_at: None,
214            created_at,
215            updated_at: created_at,
216        }
217    }
218
219    #[must_use]
220    pub fn with_priority(mut self, priority: u8) -> Self {
221        self.priority = priority;
222        self
223    }
224
225    #[must_use]
226    pub fn with_dedupe_key(mut self, dedupe_key: impl Into<Option<String>>) -> Self {
227        self.dedupe_key = dedupe_key.into();
228        self
229    }
230
231    #[must_use]
232    pub fn with_available_at(mut self, available_at: u64) -> Self {
233        self.available_at = available_at;
234        self
235    }
236
237    #[must_use]
238    pub fn with_created_at(mut self, created_at: u64) -> Self {
239        self.created_at = created_at;
240        self
241    }
242
243    #[must_use]
244    pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
245        self.max_attempts = max_attempts;
246        self
247    }
248
249    #[must_use]
250    pub fn with_attempt_count(mut self, attempt_count: u32) -> Self {
251        self.attempt_count = attempt_count;
252        self
253    }
254
255    #[must_use]
256    pub fn with_dispatch_epoch(mut self, dispatch_epoch: u64) -> Self {
257        self.dispatch_epoch = dispatch_epoch;
258        self
259    }
260
261    pub fn from_persisted_parts(parts: RunDispatchParts) -> Result<Self, StorageError> {
262        let dispatch = Self {
263            dispatch_id: parts.dispatch_id,
264            thread_id: parts.thread_id,
265            run_id: parts.run_id,
266            priority: parts.priority,
267            dedupe_key: parts.dedupe_key,
268            dispatch_epoch: parts.dispatch_epoch,
269            status: parts.status,
270            available_at: parts.available_at,
271            attempt_count: parts.attempt_count,
272            max_attempts: parts.max_attempts,
273            last_error: parts.last_error,
274            claim_token: parts.claim_token,
275            claimed_by: parts.claimed_by,
276            lease_until: parts.lease_until,
277            dispatch_instance_id: parts.dispatch_instance_id,
278            run_status: parts.run_status,
279            termination: parts.termination,
280            run_response: parts.run_response,
281            run_error: parts.run_error,
282            completed_at: parts.completed_at,
283            created_at: parts.created_at,
284            updated_at: parts.updated_at,
285        };
286        dispatch.validate_for_persist()?;
287        Ok(dispatch)
288    }
289
290    #[must_use]
291    pub fn to_persisted_parts(&self) -> RunDispatchParts {
292        RunDispatchParts {
293            dispatch_id: self.dispatch_id.clone(),
294            thread_id: self.thread_id.clone(),
295            run_id: self.run_id.clone(),
296            priority: self.priority,
297            dedupe_key: self.dedupe_key.clone(),
298            dispatch_epoch: self.dispatch_epoch,
299            status: self.status,
300            available_at: self.available_at,
301            attempt_count: self.attempt_count,
302            max_attempts: self.max_attempts,
303            last_error: self.last_error.clone(),
304            claim_token: self.claim_token.clone(),
305            claimed_by: self.claimed_by.clone(),
306            lease_until: self.lease_until,
307            dispatch_instance_id: self.dispatch_instance_id.clone(),
308            run_status: self.run_status,
309            termination: self.termination.clone(),
310            run_response: self.run_response.clone(),
311            run_error: self.run_error.clone(),
312            completed_at: self.completed_at,
313            created_at: self.created_at,
314            updated_at: self.updated_at,
315        }
316    }
317
318    #[must_use]
319    pub fn dispatch_id(&self) -> &String {
320        &self.dispatch_id
321    }
322
323    #[must_use]
324    pub fn thread_id(&self) -> &String {
325        &self.thread_id
326    }
327
328    #[must_use]
329    pub fn run_id(&self) -> &String {
330        &self.run_id
331    }
332
333    #[must_use]
334    pub fn priority(&self) -> u8 {
335        self.priority
336    }
337
338    #[must_use]
339    pub fn dedupe_key(&self) -> Option<&str> {
340        self.dedupe_key.as_deref()
341    }
342
343    #[must_use]
344    pub fn dispatch_epoch(&self) -> u64 {
345        self.dispatch_epoch
346    }
347
348    #[must_use]
349    pub fn status(&self) -> RunDispatchStatus {
350        self.status
351    }
352
353    #[must_use]
354    pub fn available_at(&self) -> u64 {
355        self.available_at
356    }
357
358    #[must_use]
359    pub fn attempt_count(&self) -> u32 {
360        self.attempt_count
361    }
362
363    #[must_use]
364    pub fn max_attempts(&self) -> u32 {
365        self.max_attempts
366    }
367
368    #[must_use]
369    pub fn last_error(&self) -> Option<&str> {
370        self.last_error.as_deref()
371    }
372
373    #[must_use]
374    pub fn claim_token(&self) -> Option<&str> {
375        self.claim_token.as_deref()
376    }
377
378    #[must_use]
379    pub fn claimed_by(&self) -> Option<&str> {
380        self.claimed_by.as_deref()
381    }
382
383    #[must_use]
384    pub fn lease_until(&self) -> Option<u64> {
385        self.lease_until
386    }
387
388    #[must_use]
389    pub fn dispatch_instance_id(&self) -> Option<&str> {
390        self.dispatch_instance_id.as_deref()
391    }
392
393    #[must_use]
394    pub fn run_status(&self) -> Option<RunStatus> {
395        self.run_status
396    }
397
398    #[must_use]
399    pub fn termination(&self) -> Option<&TerminationReason> {
400        self.termination.as_ref()
401    }
402
403    #[must_use]
404    pub fn run_response(&self) -> Option<&str> {
405        self.run_response.as_deref()
406    }
407
408    #[must_use]
409    pub fn run_error(&self) -> Option<&str> {
410        self.run_error.as_deref()
411    }
412
413    #[must_use]
414    pub fn completed_at(&self) -> Option<u64> {
415        self.completed_at
416    }
417
418    #[must_use]
419    pub fn created_at(&self) -> u64 {
420        self.created_at
421    }
422
423    #[must_use]
424    pub fn updated_at(&self) -> u64 {
425        self.updated_at
426    }
427
428    /// Store enqueue normalization: bind the dispatch to the current thread
429    /// epoch and clear all runtime/terminal state.
430    pub fn prepare_for_enqueue(&mut self, dispatch_epoch: u64) {
431        self.dispatch_epoch = dispatch_epoch;
432        self.status = RunDispatchStatus::Queued;
433        self.claim_token = None;
434        self.claimed_by = None;
435        self.lease_until = None;
436        self.dispatch_instance_id = None;
437        self.run_status = None;
438        self.termination = None;
439        self.run_response = None;
440        self.run_error = None;
441        self.completed_at = None;
442    }
443
444    /// Transition a queued dispatch to claimed.
445    pub fn claim(
446        &mut self,
447        consumer_id: impl Into<String>,
448        claim_token: impl Into<String>,
449        lease_until: u64,
450        now: u64,
451    ) -> Result<(), StorageError> {
452        self.require_status(RunDispatchStatus::Queued, "claim")?;
453        self.status = RunDispatchStatus::Claimed;
454        self.claim_token = Some(claim_token.into());
455        self.claimed_by = Some(consumer_id.into());
456        self.lease_until = Some(lease_until);
457        self.updated_at = now;
458        self.validate_for_persist()
459    }
460
461    pub fn extend_lease(&mut self, lease_until: u64, now: u64) -> Result<(), StorageError> {
462        self.require_status(RunDispatchStatus::Claimed, "lease extension")?;
463        self.lease_until = Some(lease_until);
464        self.updated_at = now;
465        self.validate_for_persist()
466    }
467
468    pub fn record_dispatch_start(
469        &mut self,
470        dispatch_instance_id: impl Into<String>,
471        now: u64,
472    ) -> Result<(), StorageError> {
473        self.require_status(RunDispatchStatus::Claimed, "recording runtime start")?;
474        self.dispatch_instance_id = Some(dispatch_instance_id.into());
475        self.run_status = Some(RunStatus::Running);
476        self.termination = None;
477        self.run_response = None;
478        self.run_error = None;
479        self.completed_at = None;
480        self.updated_at = now;
481        self.validate_for_persist()
482    }
483
484    pub fn record_run_result(
485        &mut self,
486        result: &RunDispatchResult,
487        now: u64,
488    ) -> Result<(), StorageError> {
489        self.require_status(RunDispatchStatus::Claimed, "recording runtime result")?;
490        if result.run_id != self.run_id {
491            return Err(StorageError::Validation(format!(
492                "dispatch '{}' result run_id '{}' does not match '{}'",
493                self.dispatch_id, result.run_id, self.run_id
494            )));
495        }
496        self.dispatch_instance_id = Some(result.dispatch_instance_id.clone());
497        self.run_status = Some(result.status);
498        self.termination = result.termination.clone();
499        self.run_response = result.response.clone();
500        self.run_error = result.error.clone();
501        self.completed_at = Some(now);
502        self.updated_at = now;
503        self.validate_for_persist()
504    }
505
506    pub fn mark_acked(&mut self, now: u64) -> Result<(), StorageError> {
507        self.require_status(RunDispatchStatus::Claimed, "ack")?;
508        self.status = RunDispatchStatus::Acked;
509        self.completed_at = Some(now);
510        self.updated_at = now;
511        self.clear_claim_fields();
512        self.validate_for_persist()
513    }
514
515    pub fn mark_cancelled(&mut self, now: u64) -> Result<(), StorageError> {
516        self.require_status(RunDispatchStatus::Queued, "cancel")?;
517        self.status = RunDispatchStatus::Cancelled;
518        self.completed_at = Some(now);
519        self.updated_at = now;
520        self.clear_claim_fields();
521        self.validate_for_persist()
522    }
523
524    pub fn mark_superseded(&mut self, now: u64, reason: Option<&str>) -> Result<(), StorageError> {
525        self.require_status(RunDispatchStatus::Queued, "supersede")?;
526        self.status = RunDispatchStatus::Superseded;
527        self.completed_at = Some(now);
528        self.updated_at = now;
529        if let Some(reason) = reason {
530            self.last_error = Some(reason.to_string());
531        }
532        self.clear_claim_fields();
533        self.clear_runtime_projection();
534        self.validate_for_persist()
535    }
536
537    pub fn mark_superseded_at_epoch(
538        &mut self,
539        now: u64,
540        epoch: u64,
541        reason: Option<&str>,
542    ) -> Result<(), StorageError> {
543        self.dispatch_epoch = epoch;
544        match self.status {
545            RunDispatchStatus::Queued => self.mark_superseded(now, reason),
546            RunDispatchStatus::Claimed => {
547                self.status = RunDispatchStatus::Superseded;
548                self.completed_at = Some(now);
549                self.updated_at = now;
550                if let Some(reason) = reason {
551                    self.last_error = Some(reason.to_string());
552                }
553                self.clear_claim_fields();
554                self.clear_runtime_projection();
555                self.validate_for_persist()
556            }
557            _ => Err(StorageError::Validation(format!(
558                "dispatch '{}' must be Queued or Claimed before epoch supersede",
559                self.dispatch_id
560            ))),
561        }
562    }
563
564    pub fn mark_dead_letter(&mut self, now: u64, error: &str) -> Result<(), StorageError> {
565        self.require_status(RunDispatchStatus::Claimed, "dead letter")?;
566        self.status = RunDispatchStatus::DeadLetter;
567        self.last_error = Some(error.to_string());
568        self.completed_at = Some(now);
569        self.updated_at = now;
570        self.clear_claim_fields();
571        self.validate_for_persist()
572    }
573
574    pub fn mark_nack_result(
575        &mut self,
576        now: u64,
577        retry_at: u64,
578        error: &str,
579    ) -> Result<(), StorageError> {
580        self.require_status(RunDispatchStatus::Claimed, "nack")?;
581        self.attempt_count = self.attempt_count.saturating_add(1);
582        self.last_error = Some(error.to_string());
583        self.updated_at = now;
584        self.clear_claim_fields();
585        if self.attempt_count >= self.max_attempts {
586            self.status = RunDispatchStatus::DeadLetter;
587            self.completed_at = Some(now);
588        } else {
589            self.status = RunDispatchStatus::Queued;
590            self.available_at = retry_at;
591            self.completed_at = None;
592            self.clear_runtime_projection();
593        }
594        self.validate_for_persist()
595    }
596
597    pub fn mark_expired_lease(
598        &mut self,
599        now: u64,
600        max_attempts_error: &str,
601    ) -> Result<(), StorageError> {
602        self.require_status(RunDispatchStatus::Claimed, "lease expiration")?;
603        self.attempt_count = self.attempt_count.saturating_add(1);
604        self.available_at = now;
605        self.updated_at = now;
606        self.clear_claim_fields();
607        // A lease expiry abandons the in-flight attempt without a terminal run
608        // result, so the runtime projection (e.g. run_status=Running,
609        // dispatch_instance_id) is stale on every outcome — retry and the
610        // max-attempt dead-letter alike. Clear it unconditionally so a terminal
611        // DeadLetter dispatch can never project the abandoned attempt as Running.
612        self.clear_runtime_projection();
613        if self.attempt_count >= self.max_attempts {
614            self.status = RunDispatchStatus::DeadLetter;
615            self.last_error = Some(max_attempts_error.to_string());
616            self.completed_at = Some(now);
617        } else {
618            self.status = RunDispatchStatus::Queued;
619            self.completed_at = None;
620        }
621        self.validate_for_persist()
622    }
623
624    pub fn remap_identity(
625        &mut self,
626        dispatch_id: impl Into<String>,
627        thread_id: impl Into<String>,
628        run_id: impl Into<String>,
629        dedupe_key: Option<String>,
630    ) {
631        self.dispatch_id = dispatch_id.into();
632        self.thread_id = thread_id.into();
633        self.run_id = run_id.into();
634        self.dedupe_key = dedupe_key;
635    }
636
637    fn clear_claim_fields(&mut self) {
638        self.claim_token = None;
639        self.claimed_by = None;
640        self.lease_until = None;
641    }
642
643    fn clear_runtime_projection(&mut self) {
644        self.dispatch_instance_id = None;
645        self.run_status = None;
646        self.termination = None;
647        self.run_response = None;
648        self.run_error = None;
649    }
650
651    fn require_status(
652        &self,
653        expected: RunDispatchStatus,
654        transition: &str,
655    ) -> Result<(), StorageError> {
656        if self.status != expected {
657            return Err(StorageError::Validation(format!(
658                "dispatch '{}' must be {:?} before {transition}",
659                self.dispatch_id, expected
660            )));
661        }
662        Ok(())
663    }
664
665    /// Validate an externally supplied dispatch before queue admission.
666    pub fn validate_for_enqueue(&self) -> Result<(), StorageError> {
667        self.validate_identity_and_retry()?;
668        if self.status != RunDispatchStatus::Queued {
669            return Err(StorageError::Validation(format!(
670                "enqueued dispatch '{}' must start as Queued",
671                self.dispatch_id
672            )));
673        }
674        self.validate_queued()
675    }
676
677    /// Validate persisted dispatch lifecycle invariants.
678    pub fn validate_for_persist(&self) -> Result<(), StorageError> {
679        self.validate_identity_and_retry()?;
680        match self.status {
681            RunDispatchStatus::Queued => self.validate_queued(),
682            RunDispatchStatus::Claimed => {
683                if self
684                    .claim_token
685                    .as_deref()
686                    .is_none_or(|value| value.trim().is_empty())
687                    || self
688                        .claimed_by
689                        .as_deref()
690                        .is_none_or(|value| value.trim().is_empty())
691                    || self.lease_until.is_none()
692                {
693                    return Err(StorageError::Validation(format!(
694                        "Claimed dispatch '{}' must carry claim_token, claimed_by, and lease_until",
695                        self.dispatch_id
696                    )));
697                }
698                Ok(())
699            }
700            RunDispatchStatus::Acked
701            | RunDispatchStatus::Cancelled
702            | RunDispatchStatus::Superseded
703            | RunDispatchStatus::DeadLetter => {
704                if self.claim_token.is_some()
705                    || self.claimed_by.is_some()
706                    || self.lease_until.is_some()
707                {
708                    return Err(StorageError::Validation(format!(
709                        "{:?} dispatch '{}' must not carry active lease fields",
710                        self.status, self.dispatch_id
711                    )));
712                }
713                if self.completed_at.is_none() {
714                    return Err(StorageError::Validation(format!(
715                        "{:?} dispatch '{}' must carry completed_at",
716                        self.status, self.dispatch_id
717                    )));
718                }
719                Ok(())
720            }
721        }
722    }
723
724    fn validate_identity_and_retry(&self) -> Result<(), StorageError> {
725        require_non_empty("dispatch_id", &self.dispatch_id)?;
726        require_non_empty("thread_id", &self.thread_id)?;
727        require_non_empty("run_id", &self.run_id)?;
728        if self.max_attempts == 0 {
729            return Err(StorageError::Validation(format!(
730                "dispatch '{}' max_attempts must be greater than zero",
731                self.dispatch_id
732            )));
733        }
734        if self.attempt_count > self.max_attempts {
735            return Err(StorageError::Validation(format!(
736                "dispatch '{}' attempt_count must not exceed max_attempts",
737                self.dispatch_id
738            )));
739        }
740        Ok(())
741    }
742
743    fn validate_queued(&self) -> Result<(), StorageError> {
744        if self.claim_token.is_some() || self.claimed_by.is_some() || self.lease_until.is_some() {
745            return Err(StorageError::Validation(format!(
746                "Queued dispatch '{}' must not carry claim fields",
747                self.dispatch_id
748            )));
749        }
750        if self.completed_at.is_some() {
751            return Err(StorageError::Validation(format!(
752                "Queued dispatch '{}' must not carry completed_at",
753                self.dispatch_id
754            )));
755        }
756        if self.dispatch_instance_id.is_some()
757            || self.run_status.is_some()
758            || self.termination.is_some()
759            || self.run_response.is_some()
760            || self.run_error.is_some()
761        {
762            return Err(StorageError::Validation(format!(
763                "Queued dispatch '{}' must not carry runtime result fields",
764                self.dispatch_id
765            )));
766        }
767        Ok(())
768    }
769}
770
771fn require_non_empty(field: &str, value: &str) -> Result<(), StorageError> {
772    if value.trim().is_empty() {
773        return Err(StorageError::Validation(format!(
774            "{field} must not be empty"
775        )));
776    }
777    Ok(())
778}
779
780// ── MailboxInterrupt ────────────────────────────────────────────────
781
782/// Result of a mailbox interrupt operation.
783#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct MailboxInterrupt {
785    /// New thread dispatch epoch after bump.
786    pub new_dispatch_epoch: u64,
787    /// The dispatch that was Claimed (running) at interrupt time, if any.
788    /// Caller should cancel the corresponding runtime run.
789    pub active_dispatch: Option<RunDispatch>,
790    /// Number of Queued dispatches superseded.
791    pub superseded_count: usize,
792}
793
794/// Detailed result of a mailbox interrupt operation.
795///
796/// `MailboxInterrupt` intentionally keeps the 0.2 public struct shape so
797/// downstream struct literals remain source-compatible. New code that needs the
798/// exact superseded dispatch records should use this type via
799/// [`MailboxStore::interrupt_detailed`].
800#[derive(Debug, Clone, Serialize, Deserialize)]
801pub struct MailboxInterruptDetails {
802    /// New thread dispatch epoch after bump.
803    pub new_dispatch_epoch: u64,
804    /// The dispatch that was Claimed (running) at interrupt time, if any.
805    /// Caller should cancel the corresponding runtime run.
806    pub active_dispatch: Option<RunDispatch>,
807    /// Number of Queued dispatches superseded.
808    pub superseded_count: usize,
809    /// Queued dispatches that were atomically superseded by this interrupt.
810    ///
811    /// This is the authoritative set callers should use to reconcile terminal
812    /// dispatch state back to the durable run lifecycle.
813    #[serde(default)]
814    pub superseded_dispatches: Vec<RunDispatch>,
815}
816
817impl MailboxInterruptDetails {
818    #[must_use]
819    pub fn into_summary(self) -> MailboxInterrupt {
820        MailboxInterrupt {
821            new_dispatch_epoch: self.new_dispatch_epoch,
822            active_dispatch: self.active_dispatch,
823            superseded_count: self.superseded_count,
824        }
825    }
826
827    #[must_use]
828    pub fn summary(&self) -> MailboxInterrupt {
829        MailboxInterrupt {
830            new_dispatch_epoch: self.new_dispatch_epoch,
831            active_dispatch: self.active_dispatch.clone(),
832            superseded_count: self.superseded_count,
833        }
834    }
835}
836
837impl From<MailboxInterrupt> for MailboxInterruptDetails {
838    fn from(interrupt: MailboxInterrupt) -> Self {
839        Self {
840            new_dispatch_epoch: interrupt.new_dispatch_epoch,
841            active_dispatch: interrupt.active_dispatch,
842            superseded_count: interrupt.superseded_count,
843            superseded_dispatches: Vec::new(),
844        }
845    }
846}
847
848impl From<MailboxInterruptDetails> for MailboxInterrupt {
849    fn from(details: MailboxInterruptDetails) -> Self {
850        details.into_summary()
851    }
852}
853
854pub use awaken_runtime_contract::contract::live_control::{
855    LiveCommandReceipt, LiveControlError, LiveDeliveryOutcome, LiveRunCommand, LiveRunCommandEntry,
856    LiveRunCommandSource, LiveRunCommandStream, LiveRunTarget,
857};
858
859// ── DispatchSignal ─────────────────────────────────────────────────────────
860
861/// Receipt for a durable dispatch delivery signal.
862///
863/// Implementations should ack only after the scheduler has attempted to claim
864/// the indicated thread. Nack requests redelivery when the scheduler cannot
865/// safely make a claim decision.
866#[async_trait]
867pub trait DispatchSignalReceipt: Send + Sync {
868    fn redelivery_attempts(&self) -> Option<u64> {
869        None
870    }
871
872    async fn ack(self: Box<Self>) -> Result<(), StorageError>;
873    async fn nack(self: Box<Self>) -> Result<(), StorageError>;
874    async fn nack_with_delay(self: Box<Self>, delay: Duration) -> Result<(), StorageError> {
875        let _ = delay;
876        self.nack().await
877    }
878}
879
880/// One durable dispatch delivery signal pulled from a backend work queue.
881pub struct DispatchSignalEntry {
882    pub thread_id: String,
883    pub dispatch_id: String,
884    pub receipt: Box<dyn DispatchSignalReceipt>,
885}
886
887impl std::fmt::Debug for DispatchSignalEntry {
888    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
889        f.debug_struct("DispatchSignalEntry")
890            .field("thread_id", &self.thread_id)
891            .field("dispatch_id", &self.dispatch_id)
892            .finish_non_exhaustive()
893    }
894}
895
896// ── MailboxStore trait ──────────────────────────────────────────────
897
898/// Persistent mailbox queue with lease-based distributed claim.
899///
900/// Implementations must guarantee:
901/// - enqueue is durable before returning
902/// - claim is atomic (exactly one consumer wins)
903/// - interrupt atomically bumps dispatch epoch + supersedes stale dispatches
904/// - ack/nack/dead_letter validate claim_token (reject stale claims)
905#[async_trait]
906pub trait MailboxStore: Send + Sync {
907    // ── write path ──
908
909    /// Persist a dispatch. Sets dispatch epoch from current thread state
910    /// (auto-creates state if first dispatch for this thread_id).
911    /// Rejects if dedupe_key matches an existing non-terminal dispatch.
912    async fn enqueue(&self, dispatch: &RunDispatch) -> Result<(), StorageError>;
913
914    /// Atomically claim up to `limit` Queued dispatches for a thread
915    /// where `available_at <= now`. Sets status=Claimed, claim_token,
916    /// claimed_by, lease_until = now + lease_ms.
917    /// Returns claimed dispatches ordered by (priority ASC, created_at ASC).
918    async fn claim(
919        &self,
920        thread_id: &str,
921        consumer_id: &str,
922        lease_ms: u64,
923        now: u64,
924        limit: usize,
925    ) -> Result<Vec<RunDispatch>, StorageError>;
926
927    /// Claim a specific dispatch by dispatch_id. Same semantics as `claim()`
928    /// but targets a single known dispatch (used for inline streaming).
929    async fn claim_dispatch(
930        &self,
931        dispatch_id: &str,
932        consumer_id: &str,
933        lease_ms: u64,
934        now: u64,
935    ) -> Result<Option<RunDispatch>, StorageError>;
936
937    /// Mark mailbox delivery as consumed and no longer retryable.
938    ///
939    /// This validates `claim_token` and only records dispatch consumption. Use
940    /// `record_run_result` for the agent run outcome.
941    async fn ack(&self, dispatch_id: &str, claim_token: &str, now: u64)
942    -> Result<(), StorageError>;
943
944    /// Record the runtime dispatch identity for a claimed dispatch.
945    ///
946    /// Implementations should validate the claim token and set
947    /// `run_status=Running`, while clearing any prior terminal result fields.
948    async fn record_dispatch_start(
949        &self,
950        dispatch_id: &str,
951        claim_token: &str,
952        dispatch_instance_id: &str,
953        now: u64,
954    ) -> Result<(), StorageError>;
955
956    /// Record the runtime result for a claimed dispatch.
957    ///
958    /// This is intentionally separate from `ack`: `Acked` means the mailbox
959    /// delivery was consumed, while these fields describe the agent run outcome.
960    async fn record_run_result(
961        &self,
962        dispatch_id: &str,
963        claim_token: &str,
964        result: &RunDispatchResult,
965        now: u64,
966    ) -> Result<(), StorageError>;
967
968    /// Return dispatch to queue for retry. Sets available_at = retry_at,
969    /// increments attempt_count, records error.
970    /// If attempt_count >= max_attempts, transitions to DeadLetter instead.
971    async fn nack(
972        &self,
973        dispatch_id: &str,
974        claim_token: &str,
975        retry_at: u64,
976        error: &str,
977        now: u64,
978    ) -> Result<(), StorageError>;
979
980    /// Permanently fail a dispatch. Terminal state.
981    async fn dead_letter(
982        &self,
983        dispatch_id: &str,
984        claim_token: &str,
985        error: &str,
986        now: u64,
987    ) -> Result<(), StorageError>;
988
989    /// Cancel a specific dispatch. Works on Queued dispatches only.
990    /// For Claimed dispatches, caller must also cancel the runtime run.
991    async fn cancel(
992        &self,
993        dispatch_id: &str,
994        now: u64,
995    ) -> Result<Option<RunDispatch>, StorageError>;
996
997    /// Extend an active lease. Returns false if dispatch not Claimed
998    /// or claim_token mismatch (lease already expired and reclaimed).
999    async fn extend_lease(
1000        &self,
1001        dispatch_id: &str,
1002        claim_token: &str,
1003        extension_ms: u64,
1004        now: u64,
1005    ) -> Result<bool, StorageError>;
1006
1007    // ── interrupt ──
1008
1009    /// Atomically: bump dispatch epoch, supersede stale Queued dispatches,
1010    /// return the Claimed dispatch (if any) so caller can cancel its runtime run.
1011    async fn interrupt(&self, thread_id: &str, now: u64) -> Result<MailboxInterrupt, StorageError>;
1012
1013    /// Detailed interrupt result including the exact queued dispatches that
1014    /// were superseded.
1015    ///
1016    /// The default delegates to the 0.2-compatible summary method. Stores that
1017    /// can return authoritative superseded records should override this method.
1018    async fn interrupt_detailed(
1019        &self,
1020        thread_id: &str,
1021        now: u64,
1022    ) -> Result<MailboxInterruptDetails, StorageError> {
1023        self.interrupt(thread_id, now).await.map(Into::into)
1024    }
1025
1026    /// Return the authoritative dispatch epoch for a thread.
1027    ///
1028    /// Implementations that do not persist epochs may keep the default `0`;
1029    /// production mailbox stores must override this so dispatch workers can
1030    /// reject claimed work that became stale after an interrupt.
1031    async fn current_dispatch_epoch(&self, thread_id: &str) -> Result<u64, StorageError> {
1032        let _ = thread_id;
1033        Ok(0)
1034    }
1035
1036    /// Terminalize a claimed dispatch as superseded.
1037    ///
1038    /// Used when an interrupt wins the race after a dispatch was claimed but
1039    /// before (or while) the runtime starts. Implementations must validate the
1040    /// claim token and clear lease/claim ownership. Returning `Ok(None)` means
1041    /// the dispatch is gone or no longer claimed.
1042    async fn supersede_claimed(
1043        &self,
1044        dispatch_id: &str,
1045        claim_token: &str,
1046        now: u64,
1047        reason: &str,
1048    ) -> Result<Option<RunDispatch>, StorageError> {
1049        let _ = (dispatch_id, claim_token, now, reason);
1050        Err(StorageError::Io(
1051            "supersede claimed dispatch is not supported by this mailbox store".into(),
1052        ))
1053    }
1054
1055    // ── read path ──
1056
1057    /// Load a single dispatch by ID.
1058    async fn load_dispatch(&self, dispatch_id: &str) -> Result<Option<RunDispatch>, StorageError>;
1059
1060    /// List dispatches for a thread, filtered by status.
1061    async fn list_dispatches(
1062        &self,
1063        thread_id: &str,
1064        status_filter: Option<&[RunDispatchStatus]>,
1065        limit: usize,
1066        offset: usize,
1067    ) -> Result<Vec<RunDispatch>, StorageError>;
1068
1069    /// Count dispatches by status for low-cardinality operational gauges.
1070    ///
1071    /// Implementations that cannot provide an efficient count may return a
1072    /// storage error; callers must treat this as a metrics-only failure.
1073    async fn count_dispatches_by_status(
1074        &self,
1075        status: RunDispatchStatus,
1076    ) -> Result<usize, StorageError> {
1077        let _ = status;
1078        Err(StorageError::Io(
1079            "count dispatches by status is not supported by this mailbox store".into(),
1080        ))
1081    }
1082
1083    /// List terminal dispatches across all threads.
1084    ///
1085    /// Used by recovery/maintenance reconciliation to repair run lifecycle
1086    /// records after a process crashes between a mailbox terminal transition
1087    /// and the corresponding run-store checkpoint.
1088    async fn list_terminal_dispatches(
1089        &self,
1090        limit: usize,
1091        offset: usize,
1092    ) -> Result<Vec<RunDispatch>, StorageError> {
1093        let _ = (limit, offset);
1094        Err(StorageError::Io(
1095            "list terminal dispatches is not supported by this mailbox store".into(),
1096        ))
1097    }
1098
1099    // ── maintenance ──
1100
1101    /// Reclaim dispatches whose lease_until < now (orphaned by crashed consumers).
1102    /// Resets to Queued with incremented attempt_count.
1103    /// Returns reclaimed dispatches for immediate execution.
1104    async fn reclaim_expired_leases(
1105        &self,
1106        now: u64,
1107        limit: usize,
1108    ) -> Result<Vec<RunDispatch>, StorageError>;
1109
1110    /// Purge terminal dispatches (Acked, Cancelled, Superseded, DeadLetter)
1111    /// older than `older_than` timestamp. Returns count purged.
1112    async fn purge_terminal(&self, older_than: u64) -> Result<usize, StorageError>;
1113
1114    /// List distinct thread_ids that have at least one Queued dispatch.
1115    /// Used by recover() at startup.
1116    async fn queued_thread_ids(&self) -> Result<Vec<String>, StorageError>;
1117
1118    // ── dispatch signals (durable wakeups) ──
1119
1120    /// Whether this store exposes durable dispatch delivery signals.
1121    fn supports_dispatch_signals(&self) -> bool {
1122        false
1123    }
1124
1125    /// Pull durable dispatch delivery signals, if supported by the backend.
1126    ///
1127    /// The default is empty so non-work-queue stores continue relying on local
1128    /// submit, startup recovery, and sweep.
1129    async fn pull_dispatch_signals(
1130        &self,
1131        max: usize,
1132        expires: Duration,
1133    ) -> Result<Vec<DispatchSignalEntry>, StorageError> {
1134        let _ = (max, expires);
1135        Ok(Vec::new())
1136    }
1137
1138    // ── live-channel (ephemeral steering) ──
1139    //
1140    // Separate from durable dispatch: these deliver best-effort control
1141    // commands to whichever node currently owns the run. Default impls are
1142    // no-ops so stores that don't support live delivery (test fakes) opt out.
1143
1144    /// Deliver a `LiveRunCommand` to the run currently active for `thread_id`.
1145    /// Implementations report `Delivered` when at least one subscriber has
1146    /// observed the command, or `NoSubscriber` when delivery would be a
1147    /// silent drop (the caller then owns durable-fallback policy). The
1148    /// default implementation is `NoSubscriber` so stores that opt out of
1149    /// live delivery force every caller to fall back automatically.
1150    async fn deliver_live(
1151        &self,
1152        thread_id: &str,
1153        cmd: LiveRunCommand,
1154    ) -> Result<LiveDeliveryOutcome, StorageError> {
1155        let _ = (thread_id, cmd);
1156        Ok(LiveDeliveryOutcome::NoSubscriber)
1157    }
1158
1159    /// Deliver a live command to an exact run target.
1160    ///
1161    /// Backends with targeted live subjects should override this. The default
1162    /// preserves compatibility for stores that only support thread-level live
1163    /// routing.
1164    async fn deliver_live_to(
1165        &self,
1166        target: &LiveRunTarget,
1167        cmd: LiveRunCommand,
1168    ) -> Result<LiveDeliveryOutcome, StorageError> {
1169        self.deliver_live(&target.thread_id, cmd).await
1170    }
1171
1172    /// Subscribe to the live-command stream for `thread_id`. Called by the
1173    /// runtime on the owning node when a run is registered.
1174    async fn open_live_channel(
1175        &self,
1176        thread_id: &str,
1177    ) -> Result<LiveRunCommandStream, StorageError> {
1178        let _ = thread_id;
1179        Ok(Box::pin(futures::stream::empty()))
1180    }
1181
1182    /// Subscribe to the live-command stream for an exact run target.
1183    async fn open_live_channel_for(
1184        &self,
1185        target: &LiveRunTarget,
1186    ) -> Result<LiveRunCommandStream, StorageError> {
1187        self.open_live_channel(&target.thread_id).await
1188    }
1189}
1190
1191/// Adapter exposing any [`MailboxStore`] as a runtime [`LiveRunCommandSource`].
1192///
1193/// The runtime consumes live commands through `LiveRunCommandSource` (defined
1194/// in runtime-contract); the mailbox store is the durable source of those
1195/// commands. With `MailboxStore` now living in server-contract, a blanket
1196/// `impl<T: MailboxStore> LiveRunCommandSource for T` would violate the orphan
1197/// rule (foreign trait over a generic type), so this concrete wrapper provides
1198/// the bridge instead.
1199pub struct MailboxLiveControlSource(Arc<dyn MailboxStore>);
1200
1201impl MailboxLiveControlSource {
1202    pub fn new(store: Arc<dyn MailboxStore>) -> Self {
1203        Self(store)
1204    }
1205}
1206
1207#[async_trait]
1208impl LiveRunCommandSource for MailboxLiveControlSource {
1209    async fn open_live_channel_for(
1210        &self,
1211        target: &LiveRunTarget,
1212    ) -> Result<LiveRunCommandStream, LiveControlError> {
1213        MailboxStore::open_live_channel_for(self.0.as_ref(), target)
1214            .await
1215            .map_err(|error| LiveControlError::Subscribe(error.to_string()))
1216    }
1217}
1218
1219#[derive(Clone)]
1220pub struct ScopedMailboxStore {
1221    inner: Arc<dyn MailboxStore>,
1222    scope_id: ScopeId,
1223}
1224
1225impl ScopedMailboxStore {
1226    pub fn new(inner: Arc<dyn MailboxStore>, scope_id: ScopeId) -> Self {
1227        Self { inner, scope_id }
1228    }
1229
1230    pub fn scope_id(&self) -> &ScopeId {
1231        &self.scope_id
1232    }
1233
1234    pub fn inner(&self) -> &dyn MailboxStore {
1235        self.inner.as_ref()
1236    }
1237
1238    fn scoped(&self, id: &str) -> String {
1239        scoped_key(&self.scope_id, id)
1240    }
1241
1242    fn unscoped<'a>(&self, id: &'a str) -> Option<&'a str> {
1243        unscoped_key(&self.scope_id, id)
1244    }
1245
1246    fn encode_dispatch(&self, dispatch: &RunDispatch) -> RunDispatch {
1247        let mut dispatch = dispatch.clone();
1248        dispatch.dispatch_id = self.scoped(&dispatch.dispatch_id);
1249        dispatch.thread_id = self.scoped(&dispatch.thread_id);
1250        dispatch.run_id = self.scoped(&dispatch.run_id);
1251        dispatch.dedupe_key = dispatch.dedupe_key.as_deref().map(|key| self.scoped(key));
1252        dispatch
1253    }
1254
1255    fn decode_dispatch(&self, mut dispatch: RunDispatch) -> Option<RunDispatch> {
1256        dispatch.dispatch_id = self.unscoped(&dispatch.dispatch_id)?.to_string();
1257        dispatch.thread_id = self.unscoped(&dispatch.thread_id)?.to_string();
1258        dispatch.run_id = self.unscoped(&dispatch.run_id)?.to_string();
1259        dispatch.dedupe_key = dispatch
1260            .dedupe_key
1261            .as_deref()
1262            .map(|key| self.unscoped(key).map(str::to_string))
1263            .unwrap_or(None);
1264        Some(dispatch)
1265    }
1266
1267    fn encode_target(&self, target: &LiveRunTarget) -> LiveRunTarget {
1268        LiveRunTarget {
1269            thread_id: self.scoped(&target.thread_id),
1270            run_id: self.scoped(&target.run_id),
1271            dispatch_id: target.dispatch_id.as_deref().map(|id| self.scoped(id)),
1272        }
1273    }
1274
1275    fn encode_result(&self, result: &RunDispatchResult) -> RunDispatchResult {
1276        let mut result = result.clone();
1277        result.run_id = self.scoped(&result.run_id);
1278        result
1279    }
1280}
1281
1282#[async_trait]
1283impl MailboxStore for ScopedMailboxStore {
1284    async fn enqueue(&self, dispatch: &RunDispatch) -> Result<(), StorageError> {
1285        self.inner.enqueue(&self.encode_dispatch(dispatch)).await
1286    }
1287
1288    async fn claim(
1289        &self,
1290        thread_id: &str,
1291        consumer_id: &str,
1292        lease_ms: u64,
1293        now: u64,
1294        limit: usize,
1295    ) -> Result<Vec<RunDispatch>, StorageError> {
1296        Ok(self
1297            .inner
1298            .claim(&self.scoped(thread_id), consumer_id, lease_ms, now, limit)
1299            .await?
1300            .into_iter()
1301            .filter_map(|dispatch| self.decode_dispatch(dispatch))
1302            .collect())
1303    }
1304
1305    async fn claim_dispatch(
1306        &self,
1307        dispatch_id: &str,
1308        consumer_id: &str,
1309        lease_ms: u64,
1310        now: u64,
1311    ) -> Result<Option<RunDispatch>, StorageError> {
1312        Ok(self
1313            .inner
1314            .claim_dispatch(&self.scoped(dispatch_id), consumer_id, lease_ms, now)
1315            .await?
1316            .and_then(|dispatch| self.decode_dispatch(dispatch)))
1317    }
1318
1319    async fn ack(
1320        &self,
1321        dispatch_id: &str,
1322        claim_token: &str,
1323        now: u64,
1324    ) -> Result<(), StorageError> {
1325        self.inner
1326            .ack(&self.scoped(dispatch_id), claim_token, now)
1327            .await
1328    }
1329
1330    async fn record_dispatch_start(
1331        &self,
1332        dispatch_id: &str,
1333        claim_token: &str,
1334        dispatch_instance_id: &str,
1335        now: u64,
1336    ) -> Result<(), StorageError> {
1337        self.inner
1338            .record_dispatch_start(
1339                &self.scoped(dispatch_id),
1340                claim_token,
1341                dispatch_instance_id,
1342                now,
1343            )
1344            .await
1345    }
1346
1347    async fn record_run_result(
1348        &self,
1349        dispatch_id: &str,
1350        claim_token: &str,
1351        result: &RunDispatchResult,
1352        now: u64,
1353    ) -> Result<(), StorageError> {
1354        self.inner
1355            .record_run_result(
1356                &self.scoped(dispatch_id),
1357                claim_token,
1358                &self.encode_result(result),
1359                now,
1360            )
1361            .await
1362    }
1363
1364    async fn nack(
1365        &self,
1366        dispatch_id: &str,
1367        claim_token: &str,
1368        retry_at: u64,
1369        error: &str,
1370        now: u64,
1371    ) -> Result<(), StorageError> {
1372        self.inner
1373            .nack(&self.scoped(dispatch_id), claim_token, retry_at, error, now)
1374            .await
1375    }
1376
1377    async fn dead_letter(
1378        &self,
1379        dispatch_id: &str,
1380        claim_token: &str,
1381        error: &str,
1382        now: u64,
1383    ) -> Result<(), StorageError> {
1384        self.inner
1385            .dead_letter(&self.scoped(dispatch_id), claim_token, error, now)
1386            .await
1387    }
1388
1389    async fn cancel(
1390        &self,
1391        dispatch_id: &str,
1392        now: u64,
1393    ) -> Result<Option<RunDispatch>, StorageError> {
1394        Ok(self
1395            .inner
1396            .cancel(&self.scoped(dispatch_id), now)
1397            .await?
1398            .and_then(|dispatch| self.decode_dispatch(dispatch)))
1399    }
1400
1401    async fn extend_lease(
1402        &self,
1403        dispatch_id: &str,
1404        claim_token: &str,
1405        extension_ms: u64,
1406        now: u64,
1407    ) -> Result<bool, StorageError> {
1408        self.inner
1409            .extend_lease(&self.scoped(dispatch_id), claim_token, extension_ms, now)
1410            .await
1411    }
1412
1413    async fn interrupt(&self, thread_id: &str, now: u64) -> Result<MailboxInterrupt, StorageError> {
1414        let interrupt = self.inner.interrupt(&self.scoped(thread_id), now).await?;
1415        Ok(MailboxInterrupt {
1416            new_dispatch_epoch: interrupt.new_dispatch_epoch,
1417            active_dispatch: interrupt
1418                .active_dispatch
1419                .and_then(|dispatch| self.decode_dispatch(dispatch)),
1420            superseded_count: interrupt.superseded_count,
1421        })
1422    }
1423
1424    async fn interrupt_detailed(
1425        &self,
1426        thread_id: &str,
1427        now: u64,
1428    ) -> Result<MailboxInterruptDetails, StorageError> {
1429        let details = self
1430            .inner
1431            .interrupt_detailed(&self.scoped(thread_id), now)
1432            .await?;
1433        let superseded_dispatches: Vec<_> = details
1434            .superseded_dispatches
1435            .into_iter()
1436            .filter_map(|dispatch| self.decode_dispatch(dispatch))
1437            .collect();
1438        Ok(MailboxInterruptDetails {
1439            new_dispatch_epoch: details.new_dispatch_epoch,
1440            active_dispatch: details
1441                .active_dispatch
1442                .and_then(|dispatch| self.decode_dispatch(dispatch)),
1443            superseded_count: superseded_dispatches.len(),
1444            superseded_dispatches,
1445        })
1446    }
1447
1448    async fn current_dispatch_epoch(&self, thread_id: &str) -> Result<u64, StorageError> {
1449        self.inner
1450            .current_dispatch_epoch(&self.scoped(thread_id))
1451            .await
1452    }
1453
1454    async fn supersede_claimed(
1455        &self,
1456        dispatch_id: &str,
1457        claim_token: &str,
1458        now: u64,
1459        reason: &str,
1460    ) -> Result<Option<RunDispatch>, StorageError> {
1461        Ok(self
1462            .inner
1463            .supersede_claimed(&self.scoped(dispatch_id), claim_token, now, reason)
1464            .await?
1465            .and_then(|dispatch| self.decode_dispatch(dispatch)))
1466    }
1467
1468    async fn load_dispatch(&self, dispatch_id: &str) -> Result<Option<RunDispatch>, StorageError> {
1469        Ok(self
1470            .inner
1471            .load_dispatch(&self.scoped(dispatch_id))
1472            .await?
1473            .and_then(|dispatch| self.decode_dispatch(dispatch)))
1474    }
1475
1476    async fn list_dispatches(
1477        &self,
1478        thread_id: &str,
1479        status_filter: Option<&[RunDispatchStatus]>,
1480        limit: usize,
1481        offset: usize,
1482    ) -> Result<Vec<RunDispatch>, StorageError> {
1483        Ok(self
1484            .inner
1485            .list_dispatches(&self.scoped(thread_id), status_filter, limit, offset)
1486            .await?
1487            .into_iter()
1488            .filter_map(|dispatch| self.decode_dispatch(dispatch))
1489            .collect())
1490    }
1491
1492    async fn count_dispatches_by_status(
1493        &self,
1494        status: RunDispatchStatus,
1495    ) -> Result<usize, StorageError> {
1496        match status {
1497            RunDispatchStatus::Queued => {
1498                let mut total = 0;
1499                for thread_id in self.queued_thread_ids().await? {
1500                    total += self
1501                        .list_dispatches(
1502                            &thread_id,
1503                            Some(&[RunDispatchStatus::Queued]),
1504                            usize::MAX,
1505                            0,
1506                        )
1507                        .await?
1508                        .len();
1509                }
1510                Ok(total)
1511            }
1512            status if status.is_terminal() => Ok(self
1513                .list_terminal_dispatches(usize::MAX, 0)
1514                .await?
1515                .into_iter()
1516                .filter(|dispatch| dispatch.status == status)
1517                .count()),
1518            _ => Err(StorageError::Io(
1519                "scoped claimed dispatch count is not supported".into(),
1520            )),
1521        }
1522    }
1523
1524    async fn list_terminal_dispatches(
1525        &self,
1526        limit: usize,
1527        offset: usize,
1528    ) -> Result<Vec<RunDispatch>, StorageError> {
1529        let all: Vec<_> = self
1530            .inner
1531            .list_terminal_dispatches(usize::MAX, 0)
1532            .await?
1533            .into_iter()
1534            .filter_map(|dispatch| self.decode_dispatch(dispatch))
1535            .collect();
1536        Ok(all.into_iter().skip(offset).take(limit).collect())
1537    }
1538
1539    async fn reclaim_expired_leases(
1540        &self,
1541        now: u64,
1542        limit: usize,
1543    ) -> Result<Vec<RunDispatch>, StorageError> {
1544        Ok(self
1545            .inner
1546            .reclaim_expired_leases(now, limit)
1547            .await?
1548            .into_iter()
1549            .filter_map(|dispatch| self.decode_dispatch(dispatch))
1550            .collect())
1551    }
1552
1553    async fn purge_terminal(&self, _older_than: u64) -> Result<usize, StorageError> {
1554        Err(StorageError::Io(
1555            "scoped terminal dispatch purge is not supported".into(),
1556        ))
1557    }
1558
1559    async fn queued_thread_ids(&self) -> Result<Vec<String>, StorageError> {
1560        Ok(self
1561            .inner
1562            .queued_thread_ids()
1563            .await?
1564            .into_iter()
1565            .filter_map(|thread_id| self.unscoped(&thread_id).map(str::to_string))
1566            .collect())
1567    }
1568
1569    fn supports_dispatch_signals(&self) -> bool {
1570        self.inner.supports_dispatch_signals()
1571    }
1572
1573    async fn pull_dispatch_signals(
1574        &self,
1575        max: usize,
1576        expires: Duration,
1577    ) -> Result<Vec<DispatchSignalEntry>, StorageError> {
1578        Ok(self
1579            .inner
1580            .pull_dispatch_signals(max, expires)
1581            .await?
1582            .into_iter()
1583            .filter_map(|entry| {
1584                Some(DispatchSignalEntry {
1585                    thread_id: self.unscoped(&entry.thread_id)?.to_string(),
1586                    dispatch_id: self.unscoped(&entry.dispatch_id)?.to_string(),
1587                    receipt: entry.receipt,
1588                })
1589            })
1590            .collect())
1591    }
1592
1593    async fn deliver_live(
1594        &self,
1595        thread_id: &str,
1596        cmd: LiveRunCommand,
1597    ) -> Result<LiveDeliveryOutcome, StorageError> {
1598        self.inner.deliver_live(&self.scoped(thread_id), cmd).await
1599    }
1600
1601    async fn deliver_live_to(
1602        &self,
1603        target: &LiveRunTarget,
1604        cmd: LiveRunCommand,
1605    ) -> Result<LiveDeliveryOutcome, StorageError> {
1606        self.inner
1607            .deliver_live_to(&self.encode_target(target), cmd)
1608            .await
1609    }
1610
1611    async fn open_live_channel(
1612        &self,
1613        thread_id: &str,
1614    ) -> Result<LiveRunCommandStream, StorageError> {
1615        self.inner.open_live_channel(&self.scoped(thread_id)).await
1616    }
1617
1618    async fn open_live_channel_for(
1619        &self,
1620        target: &LiveRunTarget,
1621    ) -> Result<LiveRunCommandStream, StorageError> {
1622        self.inner
1623            .open_live_channel_for(&self.encode_target(target))
1624            .await
1625    }
1626}
1627
1628#[cfg(test)]
1629#[path = "mailbox_tests.rs"]
1630mod tests;