Skip to main content

liminal/aion/
dispatch.rs

1use std::sync::Arc;
2
3mod defaults;
4mod router;
5#[cfg(test)]
6mod tests;
7mod types;
8
9use defaults::{
10    EmptyWorkerPool, NoopConversationFactory, NoopRecorder, NoopRouter, UuidConversationIds,
11};
12pub use router::RoutingFunctionDispatchRouter;
13pub use types::{
14    ActivityDispatchState, ConversationIdProvider, DispatchConversation, DispatchConversationEvent,
15    DispatchConversationFactory, DispatchOperation, DispatchOperationKind, DispatchRecorder,
16    DispatchRouter, DispatchWorker, DispatchWorkerPool, RecordedDispatchOutcome,
17};
18
19use super::channels::{ChannelName, dispatch_channel};
20use super::codec::{DispatchRequest, DispatchResponse};
21use super::error::AionSurfaceError;
22use super::types::{ActivityRequest, ActivityResult};
23
24const DEFAULT_WORKFLOW_ID: &str = "aion-dispatch";
25
26/// Dependencies used by activity dispatch.
27#[derive(Clone)]
28pub struct DispatchContext {
29    workflow_id: String,
30    worker_pool: Arc<dyn DispatchWorkerPool>,
31    router: Arc<dyn DispatchRouter>,
32    conversations: Arc<dyn DispatchConversationFactory>,
33    recorder: Arc<dyn DispatchRecorder>,
34    ids: Arc<dyn ConversationIdProvider>,
35}
36
37impl DispatchContext {
38    /// Creates a dispatch context from explicit integration dependencies.
39    #[must_use]
40    pub fn new(
41        workflow_id: impl Into<String>,
42        worker_pool: Arc<dyn DispatchWorkerPool>,
43        router: Arc<dyn DispatchRouter>,
44        conversations: Arc<dyn DispatchConversationFactory>,
45        recorder: Arc<dyn DispatchRecorder>,
46        ids: Arc<dyn ConversationIdProvider>,
47    ) -> Self {
48        Self {
49            workflow_id: workflow_id.into(),
50            worker_pool,
51            router,
52            conversations,
53            recorder,
54            ids,
55        }
56    }
57
58    /// Creates an embedded in-process context with no registered workers yet.
59    #[must_use]
60    pub fn embedded_no_workers(workflow_id: impl Into<String>) -> Self {
61        Self::new(
62            workflow_id,
63            Arc::new(EmptyWorkerPool),
64            Arc::new(NoopRouter),
65            Arc::new(NoopConversationFactory),
66            Arc::new(NoopRecorder),
67            Arc::new(UuidConversationIds),
68        )
69    }
70
71    fn workflow_id(&self) -> &str {
72        if self.workflow_id.is_empty() {
73            DEFAULT_WORKFLOW_ID
74        } else {
75            self.workflow_id.as_str()
76        }
77    }
78}
79
80impl Default for DispatchContext {
81    fn default() -> Self {
82        Self::embedded_no_workers(DEFAULT_WORKFLOW_ID)
83    }
84}
85
86impl std::fmt::Debug for DispatchContext {
87    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        formatter
89            .debug_struct("DispatchContext")
90            .field("workflow_id", &self.workflow_id())
91            .finish_non_exhaustive()
92    }
93}
94
95/// Dispatches an activity using the default embedded context.
96///
97/// # Errors
98///
99/// Returns [`AionSurfaceError`] when the channel name is invalid, no worker is available, the
100/// conversation fails, or the worker reports an activity failure.
101pub fn dispatch_activity(
102    namespace: &str,
103    task_queue: &str,
104    request: ActivityRequest,
105) -> Result<ActivityResult, AionSurfaceError> {
106    dispatch_activity_with_context(&DispatchContext::default(), namespace, task_queue, request)
107}
108
109/// Dispatches an activity through a recorder-compatible liminal conversation.
110///
111/// # Errors
112///
113/// Returns [`AionSurfaceError`] when the channel name is invalid, no worker is available, the
114/// conversation fails, or the worker reports an activity failure.
115pub fn dispatch_activity_with_context(
116    context: &DispatchContext,
117    namespace: &str,
118    task_queue: &str,
119    request: ActivityRequest,
120) -> Result<ActivityResult, AionSurfaceError> {
121    let request = (request,);
122    dispatch_activity_ref(context, namespace, task_queue, &request.0)
123}
124
125fn dispatch_activity_ref(
126    context: &DispatchContext,
127    namespace: &str,
128    task_queue: &str,
129    request: &ActivityRequest,
130) -> Result<ActivityResult, AionSurfaceError> {
131    let channel_name = dispatch_channel(namespace, task_queue)?;
132    if let Some(outcome) = context
133        .recorder
134        .replay_outcome(channel_name.as_str(), request)?
135    {
136        return outcome.into_result();
137    }
138
139    let conversation_id = context.ids.next_conversation_id();
140    let mut conversation = context.conversations.open(
141        context.workflow_id(),
142        &channel_name,
143        conversation_id.as_str(),
144    )?;
145    record_operation(
146        context,
147        DispatchOperation::new(
148            DispatchOperationKind::ConversationOpened,
149            conversation_id.as_str(),
150            &channel_name,
151        )
152        .state(ActivityDispatchState::ActivityScheduled),
153    )?;
154
155    let mut excluded_workers = Vec::new();
156    let outcome = dispatch_attempts(
157        context,
158        &channel_name,
159        conversation_id.as_str(),
160        request,
161        &mut *conversation,
162        &mut excluded_workers,
163    );
164    let close_result = close_conversation(
165        context,
166        &channel_name,
167        conversation_id.as_str(),
168        &mut *conversation,
169    );
170
171    match (outcome, close_result) {
172        (Ok(result), Ok(())) => Ok(result),
173        (Err(error), Ok(())) | (_, Err(error)) => Err(error),
174    }
175}
176
177fn dispatch_attempts(
178    context: &DispatchContext,
179    channel_name: &ChannelName,
180    conversation_id: &str,
181    request: &ActivityRequest,
182    conversation: &mut dyn DispatchConversation,
183    excluded_workers: &mut Vec<String>,
184) -> Result<ActivityResult, AionSurfaceError> {
185    loop {
186        let worker = select_worker(context, channel_name, request, excluded_workers)?;
187        start_activity(
188            context,
189            channel_name,
190            conversation_id,
191            conversation,
192            &worker,
193        )?;
194        send_request(
195            context,
196            channel_name,
197            conversation_id,
198            request,
199            conversation,
200            &worker,
201        )?;
202
203        match conversation.receive()? {
204            DispatchConversationEvent::Response(response) => {
205                record_response(context, channel_name, conversation_id, &response)?;
206                return map_activity_result(context, channel_name, response);
207            }
208            DispatchConversationEvent::WorkerExited { worker_id, message } => {
209                record_worker_exit(context, channel_name, conversation_id, &worker_id, message)?;
210                excluded_workers.push(worker_id);
211            }
212        }
213    }
214}
215
216fn start_activity(
217    context: &DispatchContext,
218    channel_name: &ChannelName,
219    conversation_id: &str,
220    conversation: &mut dyn DispatchConversation,
221    worker: &DispatchWorker,
222) -> Result<(), AionSurfaceError> {
223    conversation.link_worker(worker)?;
224    record_operation(
225        context,
226        DispatchOperation::new(
227            DispatchOperationKind::WorkerSelected,
228            conversation_id,
229            channel_name,
230        )
231        .worker(worker.worker_id.clone())
232        .state(ActivityDispatchState::ActivityStarted),
233    )
234}
235
236fn send_request(
237    context: &DispatchContext,
238    channel_name: &ChannelName,
239    conversation_id: &str,
240    request: &ActivityRequest,
241    conversation: &mut dyn DispatchConversation,
242    worker: &DispatchWorker,
243) -> Result<(), AionSurfaceError> {
244    let dispatch_request = DispatchRequest::new(conversation_id.to_owned(), request.clone());
245    conversation.send(dispatch_request)?;
246    record_operation(
247        context,
248        DispatchOperation::new(
249            DispatchOperationKind::MessageSent,
250            conversation_id,
251            channel_name,
252        )
253        .worker(worker.worker_id.clone()),
254    )
255}
256
257fn record_response(
258    context: &DispatchContext,
259    channel_name: &ChannelName,
260    conversation_id: &str,
261    response: &DispatchResponse,
262) -> Result<(), AionSurfaceError> {
263    let result = response.result.clone();
264    let mut operation = DispatchOperation::new(
265        DispatchOperationKind::MessageReceived,
266        conversation_id,
267        channel_name,
268    )
269    .worker(response.worker_id.clone())
270    .state(result_state(&result))
271    .result(result.clone());
272    if let Some(message) = result_message(&result) {
273        operation = operation.message(message);
274    }
275    record_operation(context, operation)
276}
277
278fn record_worker_exit(
279    context: &DispatchContext,
280    channel_name: &ChannelName,
281    conversation_id: &str,
282    worker_id: &str,
283    message: String,
284) -> Result<(), AionSurfaceError> {
285    record_operation(
286        context,
287        DispatchOperation::new(
288            DispatchOperationKind::WorkerExited,
289            conversation_id,
290            channel_name,
291        )
292        .worker(worker_id.to_owned())
293        .state(ActivityDispatchState::ActivityFailed {
294            retry_eligible: true,
295        })
296        .message(message),
297    )
298}
299
300fn select_worker(
301    context: &DispatchContext,
302    channel_name: &ChannelName,
303    request: &ActivityRequest,
304    excluded_workers: &[String],
305) -> Result<DispatchWorker, AionSurfaceError> {
306    let candidates = context.worker_pool.workers_for(channel_name, request)?;
307    context
308        .router
309        .select_worker(
310            context.workflow_id(),
311            channel_name,
312            request,
313            &candidates,
314            excluded_workers,
315        )?
316        .ok_or_else(|| {
317            dispatch_failed(
318                channel_name,
319                context.workflow_id(),
320                "NoWorkersAvailable: no dispatch workers are available",
321            )
322        })
323}
324
325fn close_conversation(
326    context: &DispatchContext,
327    channel_name: &ChannelName,
328    conversation_id: &str,
329    conversation: &mut dyn DispatchConversation,
330) -> Result<(), AionSurfaceError> {
331    conversation.close()?;
332    record_operation(
333        context,
334        DispatchOperation::new(
335            DispatchOperationKind::ConversationClosed,
336            conversation_id,
337            channel_name,
338        ),
339    )
340}
341
342fn map_activity_result(
343    context: &DispatchContext,
344    channel_name: &ChannelName,
345    response: DispatchResponse,
346) -> Result<ActivityResult, AionSurfaceError> {
347    match response.result {
348        ActivityResult::Completed { output } => Ok(ActivityResult::Completed { output }),
349        ActivityResult::Failed { error } => Err(dispatch_failed(
350            channel_name,
351            context.workflow_id(),
352            format!("worker '{}' failed activity: {error}", response.worker_id),
353        )),
354    }
355}
356
357const fn result_state(result: &ActivityResult) -> ActivityDispatchState {
358    match result {
359        ActivityResult::Completed { .. } => ActivityDispatchState::ActivityCompleted,
360        ActivityResult::Failed { .. } => ActivityDispatchState::ActivityFailed {
361            retry_eligible: false,
362        },
363    }
364}
365
366fn result_message(result: &ActivityResult) -> Option<String> {
367    match result {
368        ActivityResult::Completed { .. } => None,
369        ActivityResult::Failed { error } => Some(error.to_string()),
370    }
371}
372
373fn record_operation(
374    context: &DispatchContext,
375    operation: DispatchOperation,
376) -> Result<(), AionSurfaceError> {
377    context.recorder.record(operation)
378}
379
380fn dispatch_failed(
381    channel_name: &ChannelName,
382    workflow_id: &str,
383    message: impl Into<String>,
384) -> AionSurfaceError {
385    AionSurfaceError::DispatchFailed {
386        channel_name: String::from(channel_name.clone()),
387        workflow_id: workflow_id.to_owned(),
388        message: message.into(),
389    }
390}