1use crate::{
2 effects::task::{CancellationToken, NeverCancel, TaskSpawner},
3 effects::{CapabilityKey, CapabilityTokenRequest},
4 AuraError, ProtocolErrorCode, TimeoutBudget,
5};
6use futures::future::{BoxFuture, LocalBoxFuture};
7use serde::{Deserialize, Serialize};
8use std::future::Future;
9use std::marker::PhantomData;
10use std::sync::Arc;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum OwnershipCategory {
16 Pure,
17 MoveOwned,
18 ActorOwned,
19 Observed,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum BoundaryDeclarationCategory {
27 MoveOwned,
28 ActorOwned,
29 CapabilityGated,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34#[serde(rename_all = "snake_case")]
35pub enum SemanticOwnerHandoffPolicy {
36 FrontendSettlesLocally,
38 HandoffBeforeFirstAwait,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum SemanticOwnerAwaitPolicy {
47 BoundedOnly,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum SemanticOwnerBestEffortPolicy {
55 TerminalBeforeBestEffort,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
62pub struct SemanticOwnerProtocol {
63 handoff_policy: SemanticOwnerHandoffPolicy,
64 await_policy: SemanticOwnerAwaitPolicy,
65 best_effort_policy: SemanticOwnerBestEffortPolicy,
66}
67
68impl SemanticOwnerProtocol {
69 pub const CANONICAL: Self = Self {
71 handoff_policy: SemanticOwnerHandoffPolicy::HandoffBeforeFirstAwait,
72 await_policy: SemanticOwnerAwaitPolicy::BoundedOnly,
73 best_effort_policy: SemanticOwnerBestEffortPolicy::TerminalBeforeBestEffort,
74 };
75
76 pub const fn handoff_policy(self) -> SemanticOwnerHandoffPolicy {
77 self.handoff_policy
78 }
79
80 pub const fn await_policy(self) -> SemanticOwnerAwaitPolicy {
81 self.await_policy
82 }
83
84 pub const fn best_effort_policy(self) -> SemanticOwnerBestEffortPolicy {
85 self.best_effort_policy
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
91pub struct SemanticOwnerPostcondition {
92 name: &'static str,
93}
94
95impl SemanticOwnerPostcondition {
96 pub const fn new(name: &'static str) -> Self {
97 Self { name }
98 }
99
100 pub const fn name(self) -> &'static str {
101 self.name
102 }
103}
104
105pub trait SemanticSuccessProof {
111 fn declared_postcondition(&self) -> SemanticOwnerPostcondition;
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
116pub struct SemanticOwnerDependency {
117 name: &'static str,
118}
119
120impl SemanticOwnerDependency {
121 pub const fn new(name: &'static str) -> Self {
122 Self { name }
123 }
124
125 pub const fn name(self) -> &'static str {
126 self.name
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
132pub struct SemanticOwnerAuthoritativeInput {
133 name: &'static str,
134}
135
136impl SemanticOwnerAuthoritativeInput {
137 pub const fn new(name: &'static str) -> Self {
138 Self { name }
139 }
140
141 pub const fn name(self) -> &'static str {
142 self.name
143 }
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
148pub struct SemanticOwnerChildOperation {
149 name: &'static str,
150}
151
152impl SemanticOwnerChildOperation {
153 pub const fn new(name: &'static str) -> Self {
154 Self { name }
155 }
156
157 pub const fn name(self) -> &'static str {
158 self.name
159 }
160}
161
162#[derive(Clone)]
165pub struct ChildOperationSpawner {
166 inner: OwnedTaskSpawner,
167}
168
169impl std::fmt::Debug for ChildOperationSpawner {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("ChildOperationSpawner")
172 .finish_non_exhaustive()
173 }
174}
175
176impl ChildOperationSpawner {
177 pub fn new(inner: OwnedTaskSpawner) -> Self {
178 Self { inner }
179 }
180
181 pub fn shutdown_token(&self) -> &OwnedShutdownToken {
182 self.inner.shutdown_token()
183 }
184
185 pub fn spawn_child_operation(
186 &self,
187 _child: SemanticOwnerChildOperation,
188 fut: BoxFuture<'static, ()>,
189 ) {
190 self.inner.spawn(fut);
191 }
192
193 pub fn spawn_local_child_operation(
194 &self,
195 _child: SemanticOwnerChildOperation,
196 fut: LocalBoxFuture<'static, ()>,
197 ) {
198 self.inner.spawn_local(fut);
199 }
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
205pub struct BestEffortBoundaryProtocol {
206 terminal_relation: SemanticOwnerBestEffortPolicy,
207}
208
209impl BestEffortBoundaryProtocol {
210 pub const POST_TERMINAL_ONLY: Self = Self {
212 terminal_relation: SemanticOwnerBestEffortPolicy::TerminalBeforeBestEffort,
213 };
214
215 pub const fn terminal_relation(self) -> SemanticOwnerBestEffortPolicy {
216 self.terminal_relation
217 }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq)]
226pub struct PostTerminalBestEffort<E> {
227 protocol: BestEffortBoundaryProtocol,
228 first_error: Option<E>,
229}
230
231impl<E> PostTerminalBestEffort<E> {
232 #[must_use]
233 pub const fn post_terminal_only() -> Self {
234 Self {
235 protocol: BestEffortBoundaryProtocol::POST_TERMINAL_ONLY,
236 first_error: None,
237 }
238 }
239
240 #[must_use]
241 pub const fn protocol(&self) -> BestEffortBoundaryProtocol {
242 self.protocol
243 }
244
245 pub fn record<T>(&mut self, result: Result<T, E>) -> Option<T> {
246 match result {
247 Ok(value) => Some(value),
248 Err(error) => {
249 if self.first_error.is_none() {
250 self.first_error = Some(error);
251 }
252 None
253 }
254 }
255 }
256
257 pub async fn capture<T, Fut>(&mut self, future: Fut) -> Option<T>
258 where
259 Fut: Future<Output = Result<T, E>>,
260 {
261 self.record(future.await)
262 }
263
264 #[must_use]
265 pub fn first_error(&self) -> Option<&E> {
266 self.first_error.as_ref()
267 }
268
269 pub fn finish(self) -> Result<(), E> {
270 match self.first_error {
271 Some(error) => Err(error),
272 None => Ok(()),
273 }
274 }
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum OwnershipErrorDomain {
281 Ownership,
282 Capability,
283 Lifecycle,
284 Timeout,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
289pub enum OwnershipError {
290 #[error("missing capability: {capability}")]
291 MissingCapability { capability: String },
292 #[error("stale owner token: {detail}")]
293 StaleOwner { detail: String },
294 #[error("invalid ownership transfer: {detail}")]
295 InvalidTransfer { detail: String },
296 #[error("owner dropped before terminal publication: {detail}")]
297 OwnerDropped { detail: String },
298 #[error("terminal lifecycle regression: {detail}")]
299 TerminalRegression { detail: String },
300 #[error("operation timed out: {detail}")]
301 Timeout { detail: String },
302}
303
304impl OwnershipError {
305 pub fn missing_capability(capability: impl Into<String>) -> Self {
306 Self::MissingCapability {
307 capability: capability.into(),
308 }
309 }
310
311 pub fn stale_owner(detail: impl Into<String>) -> Self {
312 Self::StaleOwner {
313 detail: detail.into(),
314 }
315 }
316
317 pub fn invalid_transfer(detail: impl Into<String>) -> Self {
318 Self::InvalidTransfer {
319 detail: detail.into(),
320 }
321 }
322
323 pub fn owner_dropped(detail: impl Into<String>) -> Self {
324 Self::OwnerDropped {
325 detail: detail.into(),
326 }
327 }
328
329 pub fn terminal_regression(detail: impl Into<String>) -> Self {
330 Self::TerminalRegression {
331 detail: detail.into(),
332 }
333 }
334
335 pub fn timeout(detail: impl Into<String>) -> Self {
336 Self::Timeout {
337 detail: detail.into(),
338 }
339 }
340
341 pub fn domain(&self) -> OwnershipErrorDomain {
342 match self {
343 Self::MissingCapability { .. } => OwnershipErrorDomain::Capability,
344 Self::StaleOwner { .. } | Self::InvalidTransfer { .. } => {
345 OwnershipErrorDomain::Ownership
346 }
347 Self::OwnerDropped { .. } | Self::TerminalRegression { .. } => {
348 OwnershipErrorDomain::Lifecycle
349 }
350 Self::Timeout { .. } => OwnershipErrorDomain::Timeout,
351 }
352 }
353}
354
355impl ProtocolErrorCode for OwnershipError {
356 fn code(&self) -> &'static str {
357 match self {
358 Self::MissingCapability { .. } => "missing_capability",
359 Self::StaleOwner { .. } => "stale_owner",
360 Self::InvalidTransfer { .. } => "invalid_transfer",
361 Self::OwnerDropped { .. } => "owner_dropped",
362 Self::TerminalRegression { .. } => "terminal_regression",
363 Self::Timeout { .. } => "timeout",
364 }
365 }
366}
367
368impl From<OwnershipError> for AuraError {
369 fn from(value: OwnershipError) -> Self {
370 match value {
371 OwnershipError::MissingCapability { capability } => {
372 AuraError::permission_denied(format!("missing_capability: {capability}"))
373 }
374 OwnershipError::StaleOwner { detail } => {
375 AuraError::terminal(format!("stale_owner: {detail}"))
376 }
377 OwnershipError::InvalidTransfer { detail } => {
378 AuraError::terminal(format!("invalid_transfer: {detail}"))
379 }
380 OwnershipError::OwnerDropped { detail } => {
381 AuraError::terminal(format!("owner_dropped: {detail}"))
382 }
383 OwnershipError::TerminalRegression { detail } => {
384 AuraError::internal(format!("terminal_regression: {detail}"))
385 }
386 OwnershipError::Timeout { detail } => AuraError::terminal(format!("timeout: {detail}")),
387 }
388 }
389}
390
391pub type OwnershipResult<T> = std::result::Result<T, OwnershipError>;
393
394pub trait OwnershipCapability {
395 fn capability_key(&self) -> &CapabilityKey;
396 fn into_capability_key(self) -> CapabilityKey
397 where
398 Self: Sized;
399
400 fn biscuit_permission(&self) -> &str {
401 self.capability_key().as_str()
402 }
403}
404
405pub fn ownership_capability_token_request_for<C>(
408 subject: impl Into<String>,
409 capabilities: impl IntoIterator<Item = C>,
410) -> CapabilityTokenRequest
411where
412 C: OwnershipCapability,
413{
414 let subject = subject.into();
415 let permissions = capabilities
416 .into_iter()
417 .map(OwnershipCapability::into_capability_key)
418 .map(|capability| capability.as_str().to_string())
419 .collect::<Vec<_>>();
420 CapabilityTokenRequest::standard(&subject, &permissions)
421}
422
423#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
425pub struct OwnerEpoch(u64);
426
427impl OwnerEpoch {
428 pub const fn new(value: u64) -> Self {
429 Self(value)
430 }
431
432 pub const fn value(self) -> u64 {
433 self.0
434 }
435}
436
437#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
439pub struct PublicationSequence(u64);
440
441impl PublicationSequence {
442 pub const fn new(value: u64) -> Self {
443 Self(value)
444 }
445
446 pub const fn value(self) -> u64 {
447 self.0
448 }
449
450 pub const fn next(self) -> Self {
451 Self(self.0.saturating_add(1))
452 }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
457pub struct TraceContext {
458 trace_id: Option<String>,
459 span_id: Option<String>,
460}
461
462impl TraceContext {
463 pub fn new(trace_id: Option<String>, span_id: Option<String>) -> Self {
464 Self { trace_id, span_id }
465 }
466
467 pub const fn detached() -> Self {
468 Self {
469 trace_id: None,
470 span_id: None,
471 }
472 }
473
474 pub fn trace_id(&self) -> Option<&str> {
475 self.trace_id.as_deref()
476 }
477
478 pub fn span_id(&self) -> Option<&str> {
479 self.span_id.as_deref()
480 }
481}
482
483#[derive(Debug, Clone, PartialEq, Eq)]
485pub enum OperationTimeoutBudget {
486 Configured(TimeoutBudget),
487 DeferredLocalPolicy,
488}
489
490impl OperationTimeoutBudget {
491 pub fn configured(timeout_budget: TimeoutBudget) -> Self {
492 Self::Configured(timeout_budget)
493 }
494
495 pub const fn deferred_local_policy() -> Self {
496 Self::DeferredLocalPolicy
497 }
498
499 pub fn configured_budget(&self) -> Option<&TimeoutBudget> {
500 match self {
501 Self::Configured(timeout_budget) => Some(timeout_budget),
502 Self::DeferredLocalPolicy => None,
503 }
504 }
505}
506
507#[derive(Clone)]
509pub struct OwnedShutdownToken {
510 inner: Option<Arc<dyn CancellationToken>>,
511}
512
513impl std::fmt::Debug for OwnedShutdownToken {
514 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
515 f.debug_struct("OwnedShutdownToken")
516 .field("attached", &self.inner.is_some())
517 .finish()
518 }
519}
520
521impl OwnedShutdownToken {
522 pub fn attached(token: Arc<dyn CancellationToken>) -> Self {
523 Self { inner: Some(token) }
524 }
525
526 pub const fn detached() -> Self {
527 Self { inner: None }
528 }
529
530 pub async fn cancelled(&self) {
531 match &self.inner {
532 Some(token) => token.cancelled().await,
533 None => NeverCancel.cancelled().await,
534 }
535 }
536
537 pub fn is_cancelled(&self) -> bool {
538 self.inner
539 .as_ref()
540 .is_some_and(|token| token.is_cancelled())
541 }
542
543 pub fn raw(&self) -> Option<&Arc<dyn CancellationToken>> {
544 self.inner.as_ref()
545 }
546}
547
548#[derive(Clone)]
551pub struct OwnedTaskSpawner {
552 inner: Arc<dyn TaskSpawner>,
553 shutdown: OwnedShutdownToken,
554}
555
556impl std::fmt::Debug for OwnedTaskSpawner {
557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558 f.debug_struct("OwnedTaskSpawner").finish_non_exhaustive()
559 }
560}
561
562impl OwnedTaskSpawner {
563 pub fn new(inner: Arc<dyn TaskSpawner>, shutdown: OwnedShutdownToken) -> Self {
564 Self { inner, shutdown }
565 }
566
567 pub fn shutdown_token(&self) -> &OwnedShutdownToken {
568 &self.shutdown
569 }
570
571 pub fn spawn(&self, fut: BoxFuture<'static, ()>) {
572 self.inner.spawn(fut);
573 }
574
575 pub fn spawn_cancellable(&self, fut: BoxFuture<'static, ()>) {
576 let token = self
577 .shutdown
578 .raw()
579 .cloned()
580 .unwrap_or_else(|| Arc::new(NeverCancel));
581 self.inner.spawn_cancellable(fut, token);
582 }
583
584 pub fn spawn_local(&self, fut: LocalBoxFuture<'static, ()>) {
585 self.inner.spawn_local(fut);
586 }
587
588 pub fn spawn_local_cancellable(&self, fut: LocalBoxFuture<'static, ()>) {
589 let token = self
590 .shutdown
591 .raw()
592 .cloned()
593 .unwrap_or_else(|| Arc::new(NeverCancel));
594 self.inner.spawn_local_cancellable(fut, token);
595 }
596}
597
598#[derive(Debug, Clone)]
600pub struct OwnedTaskHandle<HandleId> {
601 handle_id: HandleId,
602 shutdown: OwnedShutdownToken,
603}
604
605impl<HandleId> OwnedTaskHandle<HandleId> {
606 pub fn new(handle_id: HandleId, shutdown: OwnedShutdownToken) -> Self {
607 Self {
608 handle_id,
609 shutdown,
610 }
611 }
612
613 pub fn handle_id(&self) -> &HandleId {
614 &self.handle_id
615 }
616
617 pub fn shutdown_token(&self) -> &OwnedShutdownToken {
618 &self.shutdown
619 }
620}
621
622#[derive(Debug, Clone, PartialEq, Eq)]
624pub struct BoundedActorIngress<Domain, Message> {
625 owner_name: &'static str,
626 capacity: u32,
627 _domain: PhantomData<fn() -> Domain>,
628 _message: PhantomData<fn() -> Message>,
629}
630
631impl<Domain, Message> BoundedActorIngress<Domain, Message> {
632 pub fn new(owner_name: &'static str, capacity: u32) -> Self {
633 Self {
634 owner_name,
635 capacity,
636 _domain: PhantomData,
637 _message: PhantomData,
638 }
639 }
640
641 pub fn owner_name(&self) -> &'static str {
642 self.owner_name
643 }
644
645 pub fn capacity(&self) -> u32 {
646 self.capacity
647 }
648}
649
650#[derive(Debug, Clone, PartialEq, Eq)]
652pub struct ActorDeclaration<Domain, Message> {
653 category: BoundaryDeclarationCategory,
654 owner_name: &'static str,
655 domain_name: &'static str,
656 ingress_gate: &'static str,
657 ingress: BoundedActorIngress<Domain, Message>,
658}
659
660impl<Domain, Message> ActorDeclaration<Domain, Message> {
661 pub fn new(
662 owner_name: &'static str,
663 domain_name: &'static str,
664 ingress_gate: &'static str,
665 capacity: u32,
666 ) -> Self {
667 Self {
668 category: BoundaryDeclarationCategory::ActorOwned,
669 owner_name,
670 domain_name,
671 ingress_gate,
672 ingress: BoundedActorIngress::new(owner_name, capacity),
673 }
674 }
675
676 pub fn category(&self) -> BoundaryDeclarationCategory {
677 self.category
678 }
679
680 pub fn owner_name(&self) -> &'static str {
681 self.owner_name
682 }
683
684 pub fn domain_name(&self) -> &'static str {
685 self.domain_name
686 }
687
688 pub fn ingress_gate(&self) -> &'static str {
689 self.ingress_gate
690 }
691
692 pub fn ingress(&self) -> &BoundedActorIngress<Domain, Message> {
693 &self.ingress
694 }
695
696 pub fn into_ingress(self) -> BoundedActorIngress<Domain, Message> {
697 self.ingress
698 }
699
700 pub fn register_supervision<HandleId>(
701 self,
702 handle_id: HandleId,
703 shutdown: OwnedShutdownToken,
704 ) -> SupervisionRegistration<Domain, Message, HandleId> {
705 SupervisionRegistration {
706 declaration: self,
707 handle: OwnedTaskHandle::new(handle_id, shutdown),
708 }
709 }
710}
711
712#[derive(Debug, Clone, PartialEq, Eq)]
715pub struct ActorRootDeclaration<Domain> {
716 category: BoundaryDeclarationCategory,
717 owner_name: &'static str,
718 domain_name: &'static str,
719 supervision_gate: &'static str,
720 _domain: PhantomData<fn() -> Domain>,
721}
722
723impl<Domain> ActorRootDeclaration<Domain> {
724 pub fn new(
725 owner_name: &'static str,
726 domain_name: &'static str,
727 supervision_gate: &'static str,
728 ) -> Self {
729 Self {
730 category: BoundaryDeclarationCategory::ActorOwned,
731 owner_name,
732 domain_name,
733 supervision_gate,
734 _domain: PhantomData,
735 }
736 }
737
738 pub fn category(&self) -> BoundaryDeclarationCategory {
739 self.category
740 }
741
742 pub fn owner_name(&self) -> &'static str {
743 self.owner_name
744 }
745
746 pub fn domain_name(&self) -> &'static str {
747 self.domain_name
748 }
749
750 pub fn supervision_gate(&self) -> &'static str {
751 self.supervision_gate
752 }
753
754 pub fn register_supervision<HandleId>(
755 self,
756 handle_id: HandleId,
757 shutdown: OwnedShutdownToken,
758 ) -> ActorRootSupervisionRegistration<Domain, HandleId> {
759 ActorRootSupervisionRegistration {
760 declaration: self,
761 handle: OwnedTaskHandle::new(handle_id, shutdown),
762 }
763 }
764}
765
766#[derive(Debug, Clone)]
768pub struct ActorRootSupervisionRegistration<Domain, HandleId> {
769 declaration: ActorRootDeclaration<Domain>,
770 handle: OwnedTaskHandle<HandleId>,
771}
772
773impl<Domain, HandleId> ActorRootSupervisionRegistration<Domain, HandleId> {
774 pub fn declaration(&self) -> &ActorRootDeclaration<Domain> {
775 &self.declaration
776 }
777
778 pub fn handle(&self) -> &OwnedTaskHandle<HandleId> {
779 &self.handle
780 }
781
782 pub fn into_parts(self) -> (ActorRootDeclaration<Domain>, OwnedTaskHandle<HandleId>) {
783 (self.declaration, self.handle)
784 }
785}
786
787#[derive(Debug, Clone)]
789pub struct SupervisionRegistration<Domain, Message, HandleId> {
790 declaration: ActorDeclaration<Domain, Message>,
791 handle: OwnedTaskHandle<HandleId>,
792}
793
794impl<Domain, Message, HandleId> SupervisionRegistration<Domain, Message, HandleId> {
795 pub fn declaration(&self) -> &ActorDeclaration<Domain, Message> {
796 &self.declaration
797 }
798
799 pub fn handle(&self) -> &OwnedTaskHandle<HandleId> {
800 &self.handle
801 }
802
803 pub fn into_parts(self) -> (ActorDeclaration<Domain, Message>, OwnedTaskHandle<HandleId>) {
804 (self.declaration, self.handle)
805 }
806}
807
808#[derive(Debug, Clone, PartialEq, Eq)]
809struct PublicationMetadata<OperationId, InstanceId, Trace = TraceContext> {
810 operation_id: OperationId,
811 instance_id: InstanceId,
812 owner_epoch: OwnerEpoch,
813 publication_sequence: PublicationSequence,
814 trace_context: Trace,
815}
816
817impl<OperationId, InstanceId, Trace> PublicationMetadata<OperationId, InstanceId, Trace> {
818 fn new(
819 operation_id: OperationId,
820 instance_id: InstanceId,
821 owner_epoch: OwnerEpoch,
822 publication_sequence: PublicationSequence,
823 trace_context: Trace,
824 ) -> Self {
825 Self {
826 operation_id,
827 instance_id,
828 owner_epoch,
829 publication_sequence,
830 trace_context,
831 }
832 }
833}
834
835#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
837#[serde(tag = "state", rename_all = "snake_case")]
838pub enum OperationProgress<Phase> {
839 Submitted,
840 Progress { phase: Phase },
841}
842
843impl<Phase> OperationProgress<Phase> {
844 pub const fn submitted() -> Self {
845 Self::Submitted
846 }
847
848 pub fn progress(phase: Phase) -> Self {
849 Self::Progress { phase }
850 }
851}
852
853#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
855#[serde(tag = "state", rename_all = "snake_case")]
856pub enum TerminalOutcome<Output, Error> {
857 Succeeded { output: Output },
858 Failed { error: Error },
859 Cancelled,
860}
861
862#[derive(Debug, PartialEq, Eq)]
864pub struct AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase> {
865 capability: LifecyclePublicationCapability,
866 metadata: PublicationMetadata<OperationId, InstanceId, Trace>,
867 progress: OperationProgress<Phase>,
868}
869
870impl<OperationId, InstanceId, Trace, Phase>
871 AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase>
872{
873 fn authorize(
874 capability: &LifecyclePublicationCapability,
875 metadata: PublicationMetadata<OperationId, InstanceId, Trace>,
876 progress: OperationProgress<Phase>,
877 ) -> Self {
878 Self {
879 capability: capability.clone(),
880 metadata,
881 progress,
882 }
883 }
884
885 pub fn capability(&self) -> &CapabilityKey {
886 self.capability.as_key()
887 }
888
889 pub fn operation_id(&self) -> &OperationId {
890 &self.metadata.operation_id
891 }
892
893 pub fn instance_id(&self) -> &InstanceId {
894 &self.metadata.instance_id
895 }
896
897 pub fn owner_epoch(&self) -> OwnerEpoch {
898 self.metadata.owner_epoch
899 }
900
901 pub fn publication_sequence(&self) -> PublicationSequence {
902 self.metadata.publication_sequence
903 }
904
905 pub fn trace_context(&self) -> &Trace {
906 &self.metadata.trace_context
907 }
908
909 pub fn progress(&self) -> &OperationProgress<Phase> {
910 &self.progress
911 }
912
913 pub fn into_parts(
914 self,
915 ) -> (
916 LifecyclePublicationCapability,
917 OperationId,
918 InstanceId,
919 OwnerEpoch,
920 PublicationSequence,
921 Trace,
922 OperationProgress<Phase>,
923 ) {
924 (
925 self.capability,
926 self.metadata.operation_id,
927 self.metadata.instance_id,
928 self.metadata.owner_epoch,
929 self.metadata.publication_sequence,
930 self.metadata.trace_context,
931 self.progress,
932 )
933 }
934}
935
936#[derive(Debug, PartialEq, Eq)]
938pub struct TerminalPublisher<OperationId, InstanceId, Trace, Output, Error> {
939 capability: LifecyclePublicationCapability,
940 metadata: PublicationMetadata<OperationId, InstanceId, Trace>,
941 _output: PhantomData<fn() -> Output>,
942 _error: PhantomData<fn() -> Error>,
943}
944
945impl<OperationId, InstanceId, Trace, Output, Error>
946 TerminalPublisher<OperationId, InstanceId, Trace, Output, Error>
947{
948 fn new(
949 capability: &LifecyclePublicationCapability,
950 metadata: PublicationMetadata<OperationId, InstanceId, Trace>,
951 ) -> Self {
952 Self {
953 capability: capability.clone(),
954 metadata,
955 _output: PhantomData,
956 _error: PhantomData,
957 }
958 }
959
960 pub fn succeed(
961 self,
962 output: Output,
963 ) -> AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error> {
964 AuthorizedTerminalPublication {
965 capability: self.capability,
966 metadata: self.metadata,
967 outcome: TerminalOutcome::Succeeded { output },
968 }
969 }
970
971 pub fn fail(
972 self,
973 error: Error,
974 ) -> AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error> {
975 AuthorizedTerminalPublication {
976 capability: self.capability,
977 metadata: self.metadata,
978 outcome: TerminalOutcome::Failed { error },
979 }
980 }
981
982 pub fn cancel(
983 self,
984 ) -> AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error> {
985 AuthorizedTerminalPublication {
986 capability: self.capability,
987 metadata: self.metadata,
988 outcome: TerminalOutcome::Cancelled,
989 }
990 }
991}
992
993#[derive(Debug, PartialEq, Eq)]
995pub struct AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error> {
996 capability: LifecyclePublicationCapability,
997 metadata: PublicationMetadata<OperationId, InstanceId, Trace>,
998 outcome: TerminalOutcome<Output, Error>,
999}
1000
1001impl<OperationId, InstanceId, Trace, Output, Error>
1002 AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error>
1003{
1004 pub fn capability(&self) -> &CapabilityKey {
1005 self.capability.as_key()
1006 }
1007
1008 pub fn operation_id(&self) -> &OperationId {
1009 &self.metadata.operation_id
1010 }
1011
1012 pub fn instance_id(&self) -> &InstanceId {
1013 &self.metadata.instance_id
1014 }
1015
1016 pub fn owner_epoch(&self) -> OwnerEpoch {
1017 self.metadata.owner_epoch
1018 }
1019
1020 pub fn publication_sequence(&self) -> PublicationSequence {
1021 self.metadata.publication_sequence
1022 }
1023
1024 pub fn trace_context(&self) -> &Trace {
1025 &self.metadata.trace_context
1026 }
1027
1028 pub fn outcome(&self) -> &TerminalOutcome<Output, Error> {
1029 &self.outcome
1030 }
1031
1032 pub fn into_parts(
1033 self,
1034 ) -> (
1035 LifecyclePublicationCapability,
1036 OperationId,
1037 InstanceId,
1038 OwnerEpoch,
1039 PublicationSequence,
1040 Trace,
1041 TerminalOutcome<Output, Error>,
1042 ) {
1043 (
1044 self.capability,
1045 self.metadata.operation_id,
1046 self.metadata.instance_id,
1047 self.metadata.owner_epoch,
1048 self.metadata.publication_sequence,
1049 self.metadata.trace_context,
1050 self.outcome,
1051 )
1052 }
1053}
1054
1055#[derive(Debug)]
1057pub struct OperationContext<OperationId, InstanceId, Trace = TraceContext> {
1058 operation_id: OperationId,
1059 instance_id: InstanceId,
1060 owner_epoch: OwnerEpoch,
1061 publication_sequence: PublicationSequence,
1062 timeout_budget: OperationTimeoutBudget,
1063 shutdown_token: OwnedShutdownToken,
1064 trace_context: Trace,
1065}
1066
1067impl<OperationId, InstanceId, Trace> OperationContext<OperationId, InstanceId, Trace> {
1068 fn metadata(&self) -> PublicationMetadata<OperationId, InstanceId, Trace>
1069 where
1070 OperationId: Clone,
1071 InstanceId: Clone,
1072 Trace: Clone,
1073 {
1074 PublicationMetadata::new(
1075 self.operation_id.clone(),
1076 self.instance_id.clone(),
1077 self.owner_epoch,
1078 self.publication_sequence,
1079 self.trace_context.clone(),
1080 )
1081 }
1082
1083 pub fn operation_id(&self) -> &OperationId {
1084 &self.operation_id
1085 }
1086
1087 pub fn instance_id(&self) -> &InstanceId {
1088 &self.instance_id
1089 }
1090
1091 pub fn owner_epoch(&self) -> OwnerEpoch {
1092 self.owner_epoch
1093 }
1094
1095 pub fn publication_sequence(&self) -> PublicationSequence {
1096 self.publication_sequence
1097 }
1098
1099 pub fn timeout_budget(&self) -> &OperationTimeoutBudget {
1100 &self.timeout_budget
1101 }
1102
1103 pub fn shutdown_token(&self) -> &OwnedShutdownToken {
1104 &self.shutdown_token
1105 }
1106
1107 pub fn trace_context(&self) -> &Trace {
1108 &self.trace_context
1109 }
1110
1111 pub fn publish_update<Phase>(
1112 &mut self,
1113 capability: &LifecyclePublicationCapability,
1114 progress: OperationProgress<Phase>,
1115 ) -> AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase>
1116 where
1117 OperationId: Clone,
1118 InstanceId: Clone,
1119 Trace: Clone,
1120 {
1121 let publication =
1122 AuthorizedProgressPublication::authorize(capability, self.metadata(), progress);
1123 self.publication_sequence = self.publication_sequence.next();
1124 publication
1125 }
1126
1127 pub fn publish_submitted(
1128 &mut self,
1129 capability: &LifecyclePublicationCapability,
1130 ) -> AuthorizedProgressPublication<OperationId, InstanceId, Trace, ()>
1131 where
1132 OperationId: Clone,
1133 InstanceId: Clone,
1134 Trace: Clone,
1135 {
1136 self.publish_update(capability, OperationProgress::submitted())
1137 }
1138
1139 pub fn publish_progress<Phase>(
1140 &mut self,
1141 capability: &LifecyclePublicationCapability,
1142 phase: Phase,
1143 ) -> AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase>
1144 where
1145 OperationId: Clone,
1146 InstanceId: Clone,
1147 Trace: Clone,
1148 {
1149 self.publish_update(capability, OperationProgress::progress(phase))
1150 }
1151
1152 pub fn begin_terminal<Output, Error>(
1153 self,
1154 capability: &LifecyclePublicationCapability,
1155 ) -> TerminalPublisher<OperationId, InstanceId, Trace, Output, Error> {
1156 TerminalPublisher::new(
1157 capability,
1158 PublicationMetadata::new(
1159 self.operation_id,
1160 self.instance_id,
1161 self.owner_epoch,
1162 self.publication_sequence,
1163 self.trace_context,
1164 ),
1165 )
1166 }
1167}
1168
1169mod sealed {
1170 pub trait Sealed {}
1171}
1172
1173pub trait OwnerAwait: sealed::Sealed {
1175 fn owned_shutdown_token(&self) -> &OwnedShutdownToken;
1176}
1177
1178pub trait OwnerPublication: sealed::Sealed {
1180 type OperationId;
1181 type InstanceId;
1182 type Trace;
1183
1184 fn operation_id(&self) -> &Self::OperationId;
1185 fn instance_id(&self) -> &Self::InstanceId;
1186 fn owner_epoch(&self) -> OwnerEpoch;
1187 fn publication_sequence(&self) -> PublicationSequence;
1188 fn trace_context(&self) -> &Self::Trace;
1189}
1190
1191impl<OperationId, InstanceId, Trace> sealed::Sealed
1192 for OperationContext<OperationId, InstanceId, Trace>
1193{
1194}
1195impl<OperationId, InstanceId, Trace> OwnerAwait
1196 for OperationContext<OperationId, InstanceId, Trace>
1197{
1198 fn owned_shutdown_token(&self) -> &OwnedShutdownToken {
1199 self.shutdown_token()
1200 }
1201}
1202
1203impl<OperationId, InstanceId, Trace, Phase> sealed::Sealed
1204 for AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase>
1205{
1206}
1207
1208impl<OperationId, InstanceId, Trace, Phase> OwnerPublication
1209 for AuthorizedProgressPublication<OperationId, InstanceId, Trace, Phase>
1210{
1211 type OperationId = OperationId;
1212 type InstanceId = InstanceId;
1213 type Trace = Trace;
1214
1215 fn operation_id(&self) -> &Self::OperationId {
1216 self.operation_id()
1217 }
1218
1219 fn instance_id(&self) -> &Self::InstanceId {
1220 self.instance_id()
1221 }
1222
1223 fn owner_epoch(&self) -> OwnerEpoch {
1224 self.owner_epoch()
1225 }
1226
1227 fn publication_sequence(&self) -> PublicationSequence {
1228 self.publication_sequence()
1229 }
1230
1231 fn trace_context(&self) -> &Self::Trace {
1232 self.trace_context()
1233 }
1234}
1235
1236pub mod actor_owned {
1238 pub use super::{
1239 ActorDeclaration, BoundedActorIngress, OwnedShutdownToken, OwnedTaskHandle,
1240 OwnedTaskSpawner, SupervisionRegistration,
1241 };
1242}
1243
1244pub mod move_owned {
1246 pub use super::{
1247 issue_operation_handle, issue_owner_token, OpaqueOperationHandle, OperationContext,
1248 OperationProgress, OperationTimeoutBudget, OwnerAwait, OwnerEpoch, OwnerPublication,
1249 OwnerToken, OwnershipTransfer, PublicationSequence, TerminalOutcome, TerminalPublisher,
1250 Terminality, TraceContext,
1251 };
1252}
1253
1254pub mod capability_gated {
1256 pub use super::{
1257 issue_operation_context, ownership_capability_token_request_for,
1258 ActorIngressMutationCapability, AuthorizedActorIngressMutation,
1259 AuthorizedProgressPublication, AuthorizedReadinessPublication,
1260 AuthorizedTerminalPublication, LifecyclePublicationCapability, OperationContextCapability,
1261 OwnershipCapability, OwnershipTransferCapability, ReadinessPublicationCapability,
1262 };
1263}
1264
1265impl<OperationId, InstanceId, Trace, Output, Error> sealed::Sealed
1266 for AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error>
1267{
1268}
1269
1270impl<OperationId, InstanceId, Trace, Output, Error> OwnerPublication
1271 for AuthorizedTerminalPublication<OperationId, InstanceId, Trace, Output, Error>
1272{
1273 type OperationId = OperationId;
1274 type InstanceId = InstanceId;
1275 type Trace = Trace;
1276
1277 fn operation_id(&self) -> &Self::OperationId {
1278 self.operation_id()
1279 }
1280
1281 fn instance_id(&self) -> &Self::InstanceId {
1282 self.instance_id()
1283 }
1284
1285 fn owner_epoch(&self) -> OwnerEpoch {
1286 self.owner_epoch()
1287 }
1288
1289 fn publication_sequence(&self) -> PublicationSequence {
1290 self.publication_sequence()
1291 }
1292
1293 fn trace_context(&self) -> &Self::Trace {
1294 self.trace_context()
1295 }
1296}
1297
1298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1304pub struct AuthorizedReadinessPublication<Payload> {
1305 capability: ReadinessPublicationCapability,
1306 payload: Payload,
1307}
1308
1309impl<Payload> AuthorizedReadinessPublication<Payload> {
1310 pub fn authorize(capability: &ReadinessPublicationCapability, payload: Payload) -> Self {
1311 Self {
1312 capability: capability.clone(),
1313 payload,
1314 }
1315 }
1316
1317 pub fn capability(&self) -> &CapabilityKey {
1318 self.capability.as_key()
1319 }
1320
1321 pub fn payload(&self) -> &Payload {
1322 &self.payload
1323 }
1324
1325 pub fn into_parts(self) -> (ReadinessPublicationCapability, Payload) {
1326 (self.capability, self.payload)
1327 }
1328}
1329
1330#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1336pub struct AuthorizedActorIngressMutation<Mutation> {
1337 capability: ActorIngressMutationCapability,
1338 mutation: Mutation,
1339}
1340
1341impl<Mutation> AuthorizedActorIngressMutation<Mutation> {
1342 pub fn authorize(capability: &ActorIngressMutationCapability, mutation: Mutation) -> Self {
1343 Self {
1344 capability: capability.clone(),
1345 mutation,
1346 }
1347 }
1348
1349 pub fn capability(&self) -> &CapabilityKey {
1350 self.capability.as_key()
1351 }
1352
1353 pub fn mutation(&self) -> &Mutation {
1354 &self.mutation
1355 }
1356
1357 pub fn into_parts(self) -> (ActorIngressMutationCapability, Mutation) {
1358 (self.capability, self.mutation)
1359 }
1360}
1361
1362macro_rules! ownership_capability_wrapper {
1363 ($(#[$meta:meta])* $name:ident) => {
1364 $(#[$meta])*
1365 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1366 pub struct $name(CapabilityKey);
1367
1368 impl $name {
1369 pub fn new(key: impl Into<CapabilityKey>) -> Self {
1370 Self(key.into())
1371 }
1372
1373 pub fn as_key(&self) -> &CapabilityKey {
1374 &self.0
1375 }
1376
1377 pub fn into_key(self) -> CapabilityKey {
1378 self.0
1379 }
1380 }
1381
1382 impl OwnershipCapability for $name {
1383 fn capability_key(&self) -> &CapabilityKey {
1384 self.as_key()
1385 }
1386
1387 fn into_capability_key(self) -> CapabilityKey {
1388 self.into_key()
1389 }
1390 }
1391
1392 impl From<CapabilityKey> for $name {
1393 fn from(value: CapabilityKey) -> Self {
1394 Self(value)
1395 }
1396 }
1397
1398 impl From<$name> for CapabilityKey {
1399 fn from(value: $name) -> Self {
1400 value.into_key()
1401 }
1402 }
1403 };
1404}
1405
1406ownership_capability_wrapper!(
1407 LifecyclePublicationCapability
1409);
1410ownership_capability_wrapper!(
1411 OperationContextCapability
1413);
1414ownership_capability_wrapper!(
1415 ReadinessPublicationCapability
1417);
1418ownership_capability_wrapper!(
1419 PostconditionProofCapability
1421);
1422ownership_capability_wrapper!(
1423 ActorIngressMutationCapability
1425);
1426ownership_capability_wrapper!(
1427 OwnershipTransferCapability
1429);
1430
1431pub fn issue_operation_handle<Kind, HandleId, InstanceId>(
1434 _capability: &ActorIngressMutationCapability,
1435 handle_id: HandleId,
1436 instance_id: InstanceId,
1437) -> OpaqueOperationHandle<Kind, HandleId, InstanceId> {
1438 OpaqueOperationHandle {
1439 handle_id,
1440 instance_id,
1441 _kind: PhantomData,
1442 }
1443}
1444
1445pub fn issue_operation_context<OperationId, InstanceId, Trace>(
1448 _capability: &OperationContextCapability,
1449 operation_id: OperationId,
1450 instance_id: InstanceId,
1451 owner_epoch: OwnerEpoch,
1452 publication_sequence: PublicationSequence,
1453 timeout_budget: OperationTimeoutBudget,
1454 shutdown_token: OwnedShutdownToken,
1455 trace_context: Trace,
1456) -> OperationContext<OperationId, InstanceId, Trace> {
1457 OperationContext {
1458 operation_id,
1459 instance_id,
1460 owner_epoch,
1461 publication_sequence,
1462 timeout_budget,
1463 shutdown_token,
1464 trace_context,
1465 }
1466}
1467
1468pub fn issue_owner_token<Scope, TokenId>(
1471 _capability: &OwnershipTransferCapability,
1472 token_id: TokenId,
1473 scope: Scope,
1474) -> OwnerToken<Scope, TokenId> {
1475 OwnerToken { token_id, scope }
1476}
1477
1478#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1483pub struct OpaqueOperationHandle<Kind, HandleId, InstanceId> {
1484 handle_id: HandleId,
1485 instance_id: InstanceId,
1486 #[serde(skip)]
1487 _kind: PhantomData<fn() -> Kind>,
1488}
1489
1490impl<Kind, HandleId, InstanceId> OpaqueOperationHandle<Kind, HandleId, InstanceId> {
1491 pub fn handle_id(&self) -> &HandleId {
1492 &self.handle_id
1493 }
1494
1495 pub fn instance_id(&self) -> &InstanceId {
1496 &self.instance_id
1497 }
1498
1499 pub fn into_parts(self) -> (HandleId, InstanceId) {
1500 (self.handle_id, self.instance_id)
1501 }
1502}
1503
1504#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1509pub struct OwnerToken<Scope, TokenId> {
1510 token_id: TokenId,
1511 scope: Scope,
1512}
1513
1514impl<Scope, TokenId> OwnerToken<Scope, TokenId> {
1515 pub fn token_id(&self) -> &TokenId {
1516 &self.token_id
1517 }
1518
1519 pub fn scope(&self) -> &Scope {
1520 &self.scope
1521 }
1522
1523 pub fn into_parts(self) -> (TokenId, Scope) {
1524 (self.token_id, self.scope)
1525 }
1526
1527 pub fn handoff<Recipient>(
1528 self,
1529 recipient: Recipient,
1530 ) -> OwnershipTransfer<Scope, TokenId, Recipient> {
1531 OwnershipTransfer {
1532 token_id: self.token_id,
1533 scope: self.scope,
1534 recipient,
1535 }
1536 }
1537}
1538
1539#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1545pub struct OwnershipTransfer<Scope, TokenId, Recipient> {
1546 token_id: TokenId,
1547 scope: Scope,
1548 recipient: Recipient,
1549}
1550
1551impl<Scope, TokenId, Recipient> OwnershipTransfer<Scope, TokenId, Recipient> {
1552 pub fn token_id(&self) -> &TokenId {
1553 &self.token_id
1554 }
1555
1556 pub fn scope(&self) -> &Scope {
1557 &self.scope
1558 }
1559
1560 pub fn recipient(&self) -> &Recipient {
1561 &self.recipient
1562 }
1563
1564 pub fn into_parts(self) -> (TokenId, Scope, Recipient) {
1565 (self.token_id, self.scope, self.recipient)
1566 }
1567
1568 pub fn retarget<NextRecipient>(
1569 self,
1570 recipient: NextRecipient,
1571 ) -> OwnershipTransfer<Scope, TokenId, NextRecipient> {
1572 OwnershipTransfer {
1573 token_id: self.token_id,
1574 scope: self.scope,
1575 recipient,
1576 }
1577 }
1578}
1579
1580pub trait Terminality {
1582 fn is_terminal(&self) -> bool;
1583 fn is_submitted(&self) -> bool;
1584 fn is_in_progress(&self) -> bool;
1585 fn is_succeeded(&self) -> bool;
1586 fn is_failed(&self) -> bool;
1587 fn is_cancelled(&self) -> bool;
1588}
1589
1590impl<Phase> Terminality for OperationProgress<Phase> {
1591 fn is_terminal(&self) -> bool {
1592 false
1593 }
1594
1595 fn is_submitted(&self) -> bool {
1596 matches!(self, Self::Submitted)
1597 }
1598
1599 fn is_in_progress(&self) -> bool {
1600 matches!(self, Self::Progress { .. })
1601 }
1602
1603 fn is_succeeded(&self) -> bool {
1604 false
1605 }
1606
1607 fn is_failed(&self) -> bool {
1608 false
1609 }
1610
1611 fn is_cancelled(&self) -> bool {
1612 false
1613 }
1614}
1615
1616impl<Output, Error> Terminality for TerminalOutcome<Output, Error> {
1617 fn is_terminal(&self) -> bool {
1618 true
1619 }
1620
1621 fn is_submitted(&self) -> bool {
1622 false
1623 }
1624
1625 fn is_in_progress(&self) -> bool {
1626 false
1627 }
1628
1629 fn is_succeeded(&self) -> bool {
1630 matches!(self, Self::Succeeded { .. })
1631 }
1632
1633 fn is_failed(&self) -> bool {
1634 matches!(self, Self::Failed { .. })
1635 }
1636
1637 fn is_cancelled(&self) -> bool {
1638 matches!(self, Self::Cancelled)
1639 }
1640}
1641
1642#[cfg(test)]
1643#[allow(clippy::expect_used)]
1644mod tests {
1645 use super::{
1646 issue_operation_context, issue_operation_handle, issue_owner_token,
1647 ActorIngressMutationCapability, AuthorizedActorIngressMutation,
1648 AuthorizedReadinessPublication, LifecyclePublicationCapability, OperationContextCapability,
1649 OperationProgress, OperationTimeoutBudget, OwnedShutdownToken, OwnerEpoch,
1650 OwnershipCapability, OwnershipError, OwnershipErrorDomain, OwnershipTransfer,
1651 OwnershipTransferCapability, PublicationSequence, ReadinessPublicationCapability,
1652 TerminalOutcome, Terminality, TraceContext,
1653 };
1654 use crate::{effects::CapabilityKey, AuraError, ProtocolErrorCode};
1655
1656 #[test]
1660 fn opaque_operation_handle_preserves_ids() {
1661 struct Invite;
1662 let capability = ActorIngressMutationCapability::new("actor:ingress");
1663
1664 let handle = issue_operation_handle::<Invite, _, _>(&capability, "invitation_create", 7u64);
1665 assert_eq!(handle.handle_id(), &"invitation_create");
1666 assert_eq!(handle.instance_id(), &7u64);
1667 let (handle_id, instance_id) = handle.into_parts();
1668 assert_eq!(handle_id, "invitation_create");
1669 assert_eq!(instance_id, 7u64);
1670 }
1671
1672 #[test]
1673 fn owner_token_handoff_creates_consumed_transfer_record() {
1674 let capability = OwnershipTransferCapability::new("ownership:transfer");
1675 let token = issue_owner_token(&capability, "token-1", "session");
1676 let transfer: OwnershipTransfer<_, _, _> = token.handoff("owner-b");
1677 assert_eq!(transfer.token_id(), &"token-1");
1678 assert_eq!(transfer.scope(), &"session");
1679 assert_eq!(transfer.recipient(), &"owner-b");
1680 }
1681
1682 #[test]
1683 fn progress_and_terminal_outcome_report_terminality() {
1684 let submitted = OperationProgress::<&'static str>::submitted();
1685 assert!(submitted.is_submitted());
1686 assert!(!submitted.is_terminal());
1687
1688 let progress = OperationProgress::<&'static str>::progress("waiting");
1689 assert!(progress.is_in_progress());
1690 assert!(!progress.is_terminal());
1691
1692 let success = TerminalOutcome::<&'static str, &'static str>::Succeeded { output: "done" };
1693 assert!(success.is_succeeded());
1694 assert!(success.is_terminal());
1695
1696 let failed = TerminalOutcome::<(), &'static str>::Failed { error: "timeout" };
1697 assert!(failed.is_failed());
1698 assert!(failed.is_terminal());
1699
1700 let cancelled = TerminalOutcome::<(), &'static str>::Cancelled;
1701 assert!(cancelled.is_cancelled());
1702 assert!(cancelled.is_terminal());
1703 }
1704
1705 #[test]
1706 fn ownership_error_exposes_domain_code_and_aura_mapping() {
1707 let error = OwnershipError::missing_capability("invitation:send");
1708 assert_eq!(error.domain(), OwnershipErrorDomain::Capability);
1709 assert_eq!(error.code(), "missing_capability");
1710
1711 let aura_error: AuraError = error.into();
1712 assert!(matches!(aura_error, AuraError::PermissionDenied { .. }));
1713
1714 let timeout = OwnershipError::timeout("invitation_create");
1715 assert_eq!(timeout.domain(), OwnershipErrorDomain::Timeout);
1716 assert_eq!(timeout.code(), "timeout");
1717 let aura_timeout: AuraError = timeout.into();
1718 assert!(matches!(aura_timeout, AuraError::Terminal(_)));
1719 }
1720
1721 #[test]
1722 fn ownership_capability_wrappers_round_trip_runtime_keys() {
1723 let lifecycle = LifecyclePublicationCapability::new("semantic:lifecycle");
1724 let readiness = ReadinessPublicationCapability::new("semantic:readiness");
1725 let actor = ActorIngressMutationCapability::new("actor:ingress");
1726 let transfer = OwnershipTransferCapability::new("ownership:transfer");
1727
1728 assert_eq!(lifecycle.capability_key().as_str(), "semantic:lifecycle");
1729 assert_eq!(readiness.capability_key().as_str(), "semantic:readiness");
1730 assert_eq!(actor.capability_key().as_str(), "actor:ingress");
1731 assert_eq!(transfer.capability_key().as_str(), "ownership:transfer");
1732
1733 let raw: CapabilityKey = transfer.into_capability_key();
1734 assert_eq!(raw.as_str(), "ownership:transfer");
1735 }
1736
1737 #[test]
1738 fn ownership_capabilities_use_existing_capability_token_request_shape() {
1739 let request = super::ownership_capability_token_request_for(
1740 "owner-a",
1741 [
1742 LifecyclePublicationCapability::new("semantic:lifecycle"),
1743 LifecyclePublicationCapability::new("semantic:lifecycle:secondary"),
1744 ],
1745 );
1746
1747 assert_eq!(request.subject, "owner-a");
1748 assert_eq!(
1749 request.permissions,
1750 vec![
1751 "semantic:lifecycle".to_string(),
1752 "semantic:lifecycle:secondary".to_string(),
1753 ]
1754 );
1755 }
1756
1757 #[test]
1758 fn lifecycle_publication_requires_capability_wrapper() {
1759 let capability = LifecyclePublicationCapability::new("semantic:lifecycle");
1760 let context_capability = OperationContextCapability::new("operation:context");
1761 let mut context = issue_operation_context(
1762 &context_capability,
1763 "invitation_accept",
1764 7u64,
1765 OwnerEpoch::new(3),
1766 PublicationSequence::new(9),
1767 OperationTimeoutBudget::deferred_local_policy(),
1768 OwnedShutdownToken::detached(),
1769 TraceContext::detached(),
1770 );
1771
1772 let progress = context.publish_update(&capability, OperationProgress::<()>::submitted());
1773 assert_eq!(progress.capability().as_str(), "semantic:lifecycle");
1774 assert_eq!(progress.operation_id(), &"invitation_accept");
1775 assert_eq!(progress.instance_id(), &7u64);
1776 assert_eq!(progress.owner_epoch().value(), 3);
1777 assert_eq!(progress.publication_sequence().value(), 9);
1778 assert!(matches!(progress.progress(), OperationProgress::Submitted));
1779
1780 let terminal = context
1781 .begin_terminal::<(), &'static str>(&capability)
1782 .succeed(());
1783 assert_eq!(terminal.capability().as_str(), "semantic:lifecycle");
1784 assert!(matches!(
1785 terminal.outcome(),
1786 TerminalOutcome::Succeeded { .. }
1787 ));
1788 }
1789
1790 #[test]
1791 fn readiness_publication_requires_capability_wrapper() {
1792 let capability = ReadinessPublicationCapability::new("semantic:readiness");
1793 let publication =
1794 AuthorizedReadinessPublication::authorize(&capability, "channel_membership_ready");
1795
1796 assert_eq!(publication.capability().as_str(), "semantic:readiness");
1797 assert_eq!(publication.payload(), &"channel_membership_ready");
1798 }
1799
1800 #[test]
1801 fn actor_ingress_mutation_requires_capability_wrapper() {
1802 let capability = ActorIngressMutationCapability::new("actor:ingress");
1803 let mutation = AuthorizedActorIngressMutation::authorize(&capability, "join_channel");
1804
1805 assert_eq!(mutation.capability().as_str(), "actor:ingress");
1806 assert_eq!(mutation.mutation(), &"join_channel");
1807 }
1808
1809 #[test]
1810 fn canonical_semantic_owner_protocol_requires_handoff_and_bounded_waits() {
1811 let protocol = super::SemanticOwnerProtocol::CANONICAL;
1812 assert_eq!(
1813 protocol.handoff_policy(),
1814 super::SemanticOwnerHandoffPolicy::HandoffBeforeFirstAwait
1815 );
1816 assert_eq!(
1817 protocol.await_policy(),
1818 super::SemanticOwnerAwaitPolicy::BoundedOnly
1819 );
1820 assert_eq!(
1821 protocol.best_effort_policy(),
1822 super::SemanticOwnerBestEffortPolicy::TerminalBeforeBestEffort
1823 );
1824 }
1825
1826 #[test]
1827 fn canonical_best_effort_boundary_protocol_is_post_terminal_only() {
1828 assert_eq!(
1829 super::BestEffortBoundaryProtocol::POST_TERMINAL_ONLY.terminal_relation(),
1830 super::SemanticOwnerBestEffortPolicy::TerminalBeforeBestEffort
1831 );
1832 }
1833
1834 #[tokio::test]
1835 async fn post_terminal_best_effort_preserves_first_error_and_cannot_own_terminality() {
1836 let mut best_effort =
1837 super::PostTerminalBestEffort::<super::OwnershipError>::post_terminal_only();
1838
1839 assert_eq!(
1840 best_effort.protocol().terminal_relation(),
1841 super::SemanticOwnerBestEffortPolicy::TerminalBeforeBestEffort
1842 );
1843
1844 let first = best_effort
1845 .capture(async { Err::<(), _>(super::OwnershipError::timeout("first")) })
1846 .await;
1847 let second = best_effort
1848 .capture(async { Err::<(), _>(super::OwnershipError::timeout("second")) })
1849 .await;
1850
1851 assert!(first.is_none());
1852 assert!(second.is_none());
1853 assert_eq!(
1854 best_effort.first_error(),
1855 Some(&super::OwnershipError::timeout("first"))
1856 );
1857 assert_eq!(
1858 best_effort.finish(),
1859 Err(super::OwnershipError::timeout("first"))
1860 );
1861 }
1862}