Skip to main content

liminal/aion/signal/
types.rs

1use super::super::channels::ChannelName;
2use super::super::error::AionSurfaceError;
3use super::super::types::{Payload, SignalPayload};
4use crate::channel::{ChannelHandle, ChannelMode, Schema};
5use crate::conversation::ParticipantPid;
6
7/// Workflow-declared signal type supplied by Aion's workflow definition.
8#[derive(Clone, Debug)]
9pub struct SignalDeclaration {
10    /// Workflow-declared signal name.
11    pub signal_name: String,
12    /// Required payload content type for this signal.
13    pub content_type: String,
14    /// Optional schema used to validate the signal payload bytes before publish.
15    pub payload_schema: Option<Schema>,
16}
17
18impl SignalDeclaration {
19    /// Creates a declaration that validates signal name and content type.
20    #[must_use]
21    pub fn new(signal_name: impl Into<String>, content_type: impl Into<String>) -> Self {
22        Self {
23            signal_name: signal_name.into(),
24            content_type: content_type.into(),
25            payload_schema: None,
26        }
27    }
28
29    /// Creates a declaration that also validates payload bytes against a schema.
30    #[must_use]
31    pub fn with_payload_schema(
32        signal_name: impl Into<String>,
33        content_type: impl Into<String>,
34        payload_schema: Schema,
35    ) -> Self {
36        Self {
37            signal_name: signal_name.into(),
38            content_type: content_type.into(),
39            payload_schema: Some(payload_schema),
40        }
41    }
42}
43
44/// Signal channel configuration for one workflow instance.
45#[derive(Clone, Debug)]
46pub struct SignalWorkflowConfig {
47    /// Aion namespace that owns the workflow.
48    pub namespace: String,
49    /// Workflow identifier used in the signal channel name.
50    pub workflow_id: String,
51    /// Workflow process that receives delivered signals in its normal mailbox.
52    pub workflow_pid: ParticipantPid,
53    /// Signal declarations supplied by the workflow definition.
54    pub declarations: Vec<SignalDeclaration>,
55    /// Per-workflow signal channel durability mode.
56    pub mode: ChannelMode,
57}
58
59impl SignalWorkflowConfig {
60    /// Creates an ephemeral signal workflow configuration.
61    #[must_use]
62    pub fn new(
63        namespace: impl Into<String>,
64        id: impl Into<String>,
65        pid: ParticipantPid,
66        declarations: Vec<SignalDeclaration>,
67    ) -> Self {
68        Self {
69            namespace: namespace.into(),
70            workflow_id: id.into(),
71            workflow_pid: pid,
72            declarations,
73            mode: ChannelMode::Ephemeral,
74        }
75    }
76
77    /// Sets the per-workflow durability mode.
78    #[must_use]
79    pub const fn with_mode(mut self, mode: ChannelMode) -> Self {
80        self.mode = mode;
81        self
82    }
83}
84
85/// Active signal channel session returned when a workflow declares signal handlers.
86#[derive(Clone, Debug)]
87pub struct SignalChannel {
88    /// Validated channel name following `aion.signal.{namespace}.{workflow_id}`.
89    pub channel_name: ChannelName,
90    /// Typed channel handle used for publish-time schema validation and fan-out.
91    pub handle: ChannelHandle,
92    /// Signal declarations used to type the channel.
93    pub declarations: Vec<SignalDeclaration>,
94    /// Per-workflow durability mode.
95    pub mode: ChannelMode,
96}
97
98/// Terminal workflow states that tear down signal delivery.
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
100pub enum WorkflowTerminalStatus {
101    /// Workflow completed normally.
102    Completed,
103    /// Workflow failed.
104    Failed,
105    /// Workflow was cancelled.
106    Cancelled,
107    /// Workflow timed out.
108    TimedOut,
109}
110
111impl WorkflowTerminalStatus {
112    pub(super) const fn as_str(self) -> &'static str {
113        match self {
114            Self::Completed => "Completed",
115            Self::Failed => "Failed",
116            Self::Cancelled => "Cancelled",
117            Self::TimedOut => "TimedOut",
118        }
119    }
120}
121
122/// Recorder-visible signal channel operation.
123#[derive(Clone, Debug, PartialEq, Eq)]
124pub enum SignalOperationKind {
125    /// A validated signal was delivered to the workflow mailbox.
126    SignalDelivered,
127}
128
129/// Structured event recorded for durable signal delivery.
130#[derive(Clone, Debug, PartialEq, Eq)]
131pub struct SignalOperation {
132    /// Operation kind.
133    pub kind: SignalOperationKind,
134    /// Signal channel name.
135    pub channel_name: String,
136    /// Workflow that received the signal.
137    pub workflow_id: String,
138    /// Delivered signal name.
139    pub signal_name: String,
140    /// Delivered signal payload.
141    pub payload: Payload,
142    /// Per-workflow channel mode at delivery time.
143    pub mode: ChannelMode,
144}
145
146impl SignalOperation {
147    pub(super) fn delivered(
148        channel_name: &ChannelName,
149        workflow_id: &str,
150        signal: &SignalPayload,
151        mode: ChannelMode,
152    ) -> Self {
153        Self {
154            kind: SignalOperationKind::SignalDelivered,
155            channel_name: String::from(channel_name.clone()),
156            workflow_id: workflow_id.to_owned(),
157            signal_name: signal.signal_name.clone(),
158            payload: signal.payload.clone(),
159            mode,
160        }
161    }
162}
163
164/// Recorded signal delivery returned to Aion's resolver during replay.
165#[derive(Clone, Debug, PartialEq, Eq)]
166pub struct RecordedSignalDelivery {
167    /// Signal channel name that originally delivered the signal.
168    pub channel_name: String,
169    /// Workflow that originally received the signal.
170    pub workflow_id: String,
171    /// Recorded signal payload.
172    pub signal: SignalPayload,
173}
174
175impl RecordedSignalDelivery {
176    /// Creates a replayable signal delivery event.
177    #[must_use]
178    pub const fn new(channel_name: String, workflow_id: String, signal: SignalPayload) -> Self {
179        Self {
180            channel_name,
181            workflow_id,
182            signal,
183        }
184    }
185}
186
187/// Sends validated signal payloads into the workflow process mailbox.
188pub trait SignalDeliverer: std::fmt::Debug + Send + Sync {
189    /// Delivers one standard workflow mailbox message.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`AionSurfaceError`] when the workflow process cannot accept the signal.
194    fn deliver(
195        &self,
196        workflow_pid: ParticipantPid,
197        signal: SignalPayload,
198    ) -> Result<(), AionSurfaceError>;
199}
200
201/// Recorder seam implemented by Aion's durable event log and replay resolver.
202pub trait SignalRecorder: std::fmt::Debug + Send + Sync {
203    /// Returns recorded signal deliveries during replay without touching live channels.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if replay data cannot be read.
208    fn replay_deliveries(
209        &self,
210        channel_name: &str,
211        workflow_id: &str,
212    ) -> Result<Vec<RecordedSignalDelivery>, AionSurfaceError>;
213
214    /// Records one durable signal delivery operation.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the event log cannot append the operation.
219    fn record(&self, operation: SignalOperation) -> Result<(), AionSurfaceError>;
220}