Skip to main content

liminal/aion/dispatch/
types.rs

1use super::super::channels::ChannelName;
2use super::super::codec::{DispatchRequest, DispatchResponse};
3use super::super::error::AionSurfaceError;
4use super::super::types::{ActivityRequest, ActivityResult};
5use crate::conversation::ParticipantPid;
6use crate::routing::{ConsumerId, ConsumerStateView};
7
8/// Worker process visible to one dispatch routing decision.
9#[derive(Clone, Debug, PartialEq, Eq)]
10pub struct DispatchWorker {
11    /// Stable worker identity used by routing decisions and recorded events.
12    pub worker_id: String,
13    /// Beamr process identifier linked into the dispatch conversation.
14    pub participant: ParticipantPid,
15    /// Capacity and affinity view presented to the configured routing function.
16    pub consumer_state: ConsumerStateView,
17}
18
19impl DispatchWorker {
20    /// Creates a worker with a one-slot capacity view.
21    #[must_use]
22    pub fn new(worker_id: impl Into<String>, participant: ParticipantPid) -> Self {
23        let worker_id = worker_id.into();
24        let consumer_state =
25            ConsumerStateView::new(ConsumerId::new(worker_id.clone()), 0, 1, 0, Vec::new());
26        Self {
27            worker_id,
28            participant,
29            consumer_state,
30        }
31    }
32
33    /// Creates a worker with an explicit routing view.
34    #[must_use]
35    pub fn with_consumer_state(
36        worker_id: impl Into<String>,
37        participant: ParticipantPid,
38        consumer_state: ConsumerStateView,
39    ) -> Self {
40        Self {
41            worker_id: worker_id.into(),
42            participant,
43            consumer_state,
44        }
45    }
46}
47
48/// Aion activity state emitted as a recorder-visible channel operation.
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum ActivityDispatchState {
51    /// Dispatch conversation opened and the activity became scheduled.
52    ActivityScheduled,
53    /// A worker was selected and linked, so the activity started.
54    ActivityStarted,
55    /// The worker returned a completed activity result.
56    ActivityCompleted,
57    /// The worker failed or its linked process exited.
58    ActivityFailed { retry_eligible: bool },
59}
60
61/// Recorder-visible dispatch channel operation.
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum DispatchOperationKind {
64    /// The dispatch conversation boundary opened on the task queue channel.
65    ConversationOpened,
66    /// The configured routing function selected a worker.
67    WorkerSelected,
68    /// The activity request was sent through the conversation.
69    MessageSent,
70    /// A worker response was received through the conversation.
71    MessageReceived,
72    /// A linked worker process exited while the conversation was waiting.
73    WorkerExited,
74    /// The dispatch conversation boundary closed.
75    ConversationClosed,
76}
77
78/// Structured event recorded for each non-deterministic dispatch operation.
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct DispatchOperation {
81    /// Operation kind.
82    pub kind: DispatchOperationKind,
83    /// Conversation correlation identifier.
84    pub conversation_id: String,
85    /// Dispatch channel name.
86    pub channel_name: String,
87    /// Worker associated with the operation, when one has been selected.
88    pub worker_id: Option<String>,
89    /// Aion activity state mapped from the conversation lifecycle.
90    pub activity_state: Option<ActivityDispatchState>,
91    /// Activity result carried by receive operations.
92    pub result: Option<ActivityResult>,
93    /// Diagnostic detail for failure operations.
94    pub message: Option<String>,
95}
96
97impl DispatchOperation {
98    pub(crate) fn new(
99        kind: DispatchOperationKind,
100        conversation_id: &str,
101        channel_name: &ChannelName,
102    ) -> Self {
103        Self {
104            kind,
105            conversation_id: conversation_id.to_owned(),
106            channel_name: String::from(channel_name.clone()),
107            worker_id: None,
108            activity_state: None,
109            result: None,
110            message: None,
111        }
112    }
113
114    pub(crate) fn worker(mut self, worker_id: impl Into<String>) -> Self {
115        self.worker_id = Some(worker_id.into());
116        self
117    }
118
119    pub(crate) const fn state(mut self, state: ActivityDispatchState) -> Self {
120        self.activity_state = Some(state);
121        self
122    }
123
124    pub(crate) fn result(mut self, result: ActivityResult) -> Self {
125        self.result = Some(result);
126        self
127    }
128
129    pub(crate) fn message(mut self, message: impl Into<String>) -> Self {
130        self.message = Some(message.into());
131        self
132    }
133}
134
135/// Recorded final dispatch outcome used by Aion's resolver during replay.
136#[derive(Clone, Debug, PartialEq, Eq)]
137pub struct RecordedDispatchOutcome {
138    /// Final recorded dispatch result.
139    pub result: Result<ActivityResult, AionSurfaceError>,
140}
141
142impl RecordedDispatchOutcome {
143    /// Creates a replayable outcome from a dispatch result.
144    #[must_use]
145    pub const fn new(result: Result<ActivityResult, AionSurfaceError>) -> Self {
146        Self { result }
147    }
148
149    pub(crate) fn into_result(self) -> Result<ActivityResult, AionSurfaceError> {
150        self.result
151    }
152}
153
154/// Recorder seam implemented by Aion's existing record/replay layer.
155pub trait DispatchRecorder: std::fmt::Debug + Send + Sync {
156    /// Returns a recorded outcome during replay, before any live conversation is opened.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the recorder cannot read replay data.
161    fn replay_outcome(
162        &self,
163        channel_name: &str,
164        request: &ActivityRequest,
165    ) -> Result<Option<RecordedDispatchOutcome>, AionSurfaceError>;
166
167    /// Records one dispatch channel operation.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the recorder cannot append the operation.
172    fn record(&self, operation: DispatchOperation) -> Result<(), AionSurfaceError>;
173}
174
175/// Supplies the current task queue subscriber snapshot.
176pub trait DispatchWorkerPool: std::fmt::Debug + Send + Sync {
177    /// Returns workers currently subscribed to the dispatch channel.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the subscriber snapshot cannot be loaded.
182    fn workers_for(
183        &self,
184        channel_name: &ChannelName,
185        request: &ActivityRequest,
186    ) -> Result<Vec<DispatchWorker>, AionSurfaceError>;
187}
188
189/// Selects a worker by invoking the channel's configured routing function.
190pub trait DispatchRouter: std::fmt::Debug + Send + Sync {
191    /// Selects one worker from the supplied snapshot.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if routing execution fails.
196    fn select_worker(
197        &self,
198        workflow_id: &str,
199        channel_name: &ChannelName,
200        request: &ActivityRequest,
201        candidates: &[DispatchWorker],
202        excluded_worker_ids: &[String],
203    ) -> Result<Option<DispatchWorker>, AionSurfaceError>;
204}
205
206/// Typed event returned while waiting on a dispatch conversation.
207#[derive(Clone, Debug, PartialEq, Eq)]
208pub enum DispatchConversationEvent {
209    /// Worker returned an activity result.
210    Response(DispatchResponse),
211    /// Linked worker process exited before returning a result.
212    WorkerExited { worker_id: String, message: String },
213}
214
215/// Open dispatch conversation with typed zero-hop request and response methods.
216pub trait DispatchConversation: std::fmt::Debug + Send {
217    /// Links a selected worker process before the request is sent.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the process link cannot be established.
222    fn link_worker(&mut self, worker: &DispatchWorker) -> Result<(), AionSurfaceError>;
223
224    /// Sends a typed dispatch request through the conversation.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the conversation cannot accept the request.
229    fn send(&mut self, request: DispatchRequest) -> Result<(), AionSurfaceError>;
230
231    /// Receives either a worker response or a linked-process exit event.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the conversation cannot receive an event.
236    fn receive(&mut self) -> Result<DispatchConversationEvent, AionSurfaceError>;
237
238    /// Closes the conversation boundary.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the conversation cannot close normally.
243    fn close(&mut self) -> Result<(), AionSurfaceError>;
244}
245
246/// Opens dispatch conversations on task queue channels.
247pub trait DispatchConversationFactory: std::fmt::Debug + Send + Sync {
248    /// Opens a dispatch conversation boundary for `channel_name`.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the conversation cannot be opened.
253    fn open(
254        &self,
255        workflow_id: &str,
256        channel_name: &ChannelName,
257        conversation_id: &str,
258    ) -> Result<Box<dyn DispatchConversation>, AionSurfaceError>;
259}
260
261/// Generates conversation correlation identifiers at the conversation-open boundary.
262pub trait ConversationIdProvider: std::fmt::Debug + Send + Sync {
263    /// Returns the next conversation identifier.
264    fn next_conversation_id(&self) -> String;
265}