1use std::sync::Arc;
2
3mod defaults;
4mod router;
5#[cfg(test)]
6mod tests;
7mod types;
8
9use defaults::{
10 EmptyWorkerPool, NoopConversationFactory, NoopRecorder, NoopRouter, UuidConversationIds,
11};
12pub use router::RoutingFunctionDispatchRouter;
13pub use types::{
14 ActivityDispatchState, ConversationIdProvider, DispatchConversation, DispatchConversationEvent,
15 DispatchConversationFactory, DispatchOperation, DispatchOperationKind, DispatchRecorder,
16 DispatchRouter, DispatchWorker, DispatchWorkerPool, RecordedDispatchOutcome,
17};
18
19use super::channels::{ChannelName, dispatch_channel};
20use super::codec::{DispatchRequest, DispatchResponse};
21use super::error::AionSurfaceError;
22use super::types::{ActivityRequest, ActivityResult};
23
24const DEFAULT_WORKFLOW_ID: &str = "aion-dispatch";
25
26#[derive(Clone)]
28pub struct DispatchContext {
29 workflow_id: String,
30 worker_pool: Arc<dyn DispatchWorkerPool>,
31 router: Arc<dyn DispatchRouter>,
32 conversations: Arc<dyn DispatchConversationFactory>,
33 recorder: Arc<dyn DispatchRecorder>,
34 ids: Arc<dyn ConversationIdProvider>,
35}
36
37impl DispatchContext {
38 #[must_use]
40 pub fn new(
41 workflow_id: impl Into<String>,
42 worker_pool: Arc<dyn DispatchWorkerPool>,
43 router: Arc<dyn DispatchRouter>,
44 conversations: Arc<dyn DispatchConversationFactory>,
45 recorder: Arc<dyn DispatchRecorder>,
46 ids: Arc<dyn ConversationIdProvider>,
47 ) -> Self {
48 Self {
49 workflow_id: workflow_id.into(),
50 worker_pool,
51 router,
52 conversations,
53 recorder,
54 ids,
55 }
56 }
57
58 #[must_use]
60 pub fn embedded_no_workers(workflow_id: impl Into<String>) -> Self {
61 Self::new(
62 workflow_id,
63 Arc::new(EmptyWorkerPool),
64 Arc::new(NoopRouter),
65 Arc::new(NoopConversationFactory),
66 Arc::new(NoopRecorder),
67 Arc::new(UuidConversationIds),
68 )
69 }
70
71 fn workflow_id(&self) -> &str {
72 if self.workflow_id.is_empty() {
73 DEFAULT_WORKFLOW_ID
74 } else {
75 self.workflow_id.as_str()
76 }
77 }
78}
79
80impl Default for DispatchContext {
81 fn default() -> Self {
82 Self::embedded_no_workers(DEFAULT_WORKFLOW_ID)
83 }
84}
85
86impl std::fmt::Debug for DispatchContext {
87 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 formatter
89 .debug_struct("DispatchContext")
90 .field("workflow_id", &self.workflow_id())
91 .finish_non_exhaustive()
92 }
93}
94
95pub fn dispatch_activity(
102 namespace: &str,
103 task_queue: &str,
104 request: ActivityRequest,
105) -> Result<ActivityResult, AionSurfaceError> {
106 dispatch_activity_with_context(&DispatchContext::default(), namespace, task_queue, request)
107}
108
109pub fn dispatch_activity_with_context(
116 context: &DispatchContext,
117 namespace: &str,
118 task_queue: &str,
119 request: ActivityRequest,
120) -> Result<ActivityResult, AionSurfaceError> {
121 let request = (request,);
122 dispatch_activity_ref(context, namespace, task_queue, &request.0)
123}
124
125fn dispatch_activity_ref(
126 context: &DispatchContext,
127 namespace: &str,
128 task_queue: &str,
129 request: &ActivityRequest,
130) -> Result<ActivityResult, AionSurfaceError> {
131 let channel_name = dispatch_channel(namespace, task_queue)?;
132 if let Some(outcome) = context
133 .recorder
134 .replay_outcome(channel_name.as_str(), request)?
135 {
136 return outcome.into_result();
137 }
138
139 let conversation_id = context.ids.next_conversation_id();
140 let mut conversation = context.conversations.open(
141 context.workflow_id(),
142 &channel_name,
143 conversation_id.as_str(),
144 )?;
145 record_operation(
146 context,
147 DispatchOperation::new(
148 DispatchOperationKind::ConversationOpened,
149 conversation_id.as_str(),
150 &channel_name,
151 )
152 .state(ActivityDispatchState::ActivityScheduled),
153 )?;
154
155 let mut excluded_workers = Vec::new();
156 let outcome = dispatch_attempts(
157 context,
158 &channel_name,
159 conversation_id.as_str(),
160 request,
161 &mut *conversation,
162 &mut excluded_workers,
163 );
164 let close_result = close_conversation(
165 context,
166 &channel_name,
167 conversation_id.as_str(),
168 &mut *conversation,
169 );
170
171 match (outcome, close_result) {
172 (Ok(result), Ok(())) => Ok(result),
173 (Err(error), Ok(())) | (_, Err(error)) => Err(error),
174 }
175}
176
177fn dispatch_attempts(
178 context: &DispatchContext,
179 channel_name: &ChannelName,
180 conversation_id: &str,
181 request: &ActivityRequest,
182 conversation: &mut dyn DispatchConversation,
183 excluded_workers: &mut Vec<String>,
184) -> Result<ActivityResult, AionSurfaceError> {
185 loop {
186 let worker = select_worker(context, channel_name, request, excluded_workers)?;
187 start_activity(
188 context,
189 channel_name,
190 conversation_id,
191 conversation,
192 &worker,
193 )?;
194 send_request(
195 context,
196 channel_name,
197 conversation_id,
198 request,
199 conversation,
200 &worker,
201 )?;
202
203 match conversation.receive()? {
204 DispatchConversationEvent::Response(response) => {
205 record_response(context, channel_name, conversation_id, &response)?;
206 return map_activity_result(context, channel_name, response);
207 }
208 DispatchConversationEvent::WorkerExited { worker_id, message } => {
209 record_worker_exit(context, channel_name, conversation_id, &worker_id, message)?;
210 excluded_workers.push(worker_id);
211 }
212 }
213 }
214}
215
216fn start_activity(
217 context: &DispatchContext,
218 channel_name: &ChannelName,
219 conversation_id: &str,
220 conversation: &mut dyn DispatchConversation,
221 worker: &DispatchWorker,
222) -> Result<(), AionSurfaceError> {
223 conversation.link_worker(worker)?;
224 record_operation(
225 context,
226 DispatchOperation::new(
227 DispatchOperationKind::WorkerSelected,
228 conversation_id,
229 channel_name,
230 )
231 .worker(worker.worker_id.clone())
232 .state(ActivityDispatchState::ActivityStarted),
233 )
234}
235
236fn send_request(
237 context: &DispatchContext,
238 channel_name: &ChannelName,
239 conversation_id: &str,
240 request: &ActivityRequest,
241 conversation: &mut dyn DispatchConversation,
242 worker: &DispatchWorker,
243) -> Result<(), AionSurfaceError> {
244 let dispatch_request = DispatchRequest::new(conversation_id.to_owned(), request.clone());
245 conversation.send(dispatch_request)?;
246 record_operation(
247 context,
248 DispatchOperation::new(
249 DispatchOperationKind::MessageSent,
250 conversation_id,
251 channel_name,
252 )
253 .worker(worker.worker_id.clone()),
254 )
255}
256
257fn record_response(
258 context: &DispatchContext,
259 channel_name: &ChannelName,
260 conversation_id: &str,
261 response: &DispatchResponse,
262) -> Result<(), AionSurfaceError> {
263 let result = response.result.clone();
264 let mut operation = DispatchOperation::new(
265 DispatchOperationKind::MessageReceived,
266 conversation_id,
267 channel_name,
268 )
269 .worker(response.worker_id.clone())
270 .state(result_state(&result))
271 .result(result.clone());
272 if let Some(message) = result_message(&result) {
273 operation = operation.message(message);
274 }
275 record_operation(context, operation)
276}
277
278fn record_worker_exit(
279 context: &DispatchContext,
280 channel_name: &ChannelName,
281 conversation_id: &str,
282 worker_id: &str,
283 message: String,
284) -> Result<(), AionSurfaceError> {
285 record_operation(
286 context,
287 DispatchOperation::new(
288 DispatchOperationKind::WorkerExited,
289 conversation_id,
290 channel_name,
291 )
292 .worker(worker_id.to_owned())
293 .state(ActivityDispatchState::ActivityFailed {
294 retry_eligible: true,
295 })
296 .message(message),
297 )
298}
299
300fn select_worker(
301 context: &DispatchContext,
302 channel_name: &ChannelName,
303 request: &ActivityRequest,
304 excluded_workers: &[String],
305) -> Result<DispatchWorker, AionSurfaceError> {
306 let candidates = context.worker_pool.workers_for(channel_name, request)?;
307 context
308 .router
309 .select_worker(
310 context.workflow_id(),
311 channel_name,
312 request,
313 &candidates,
314 excluded_workers,
315 )?
316 .ok_or_else(|| {
317 dispatch_failed(
318 channel_name,
319 context.workflow_id(),
320 "NoWorkersAvailable: no dispatch workers are available",
321 )
322 })
323}
324
325fn close_conversation(
326 context: &DispatchContext,
327 channel_name: &ChannelName,
328 conversation_id: &str,
329 conversation: &mut dyn DispatchConversation,
330) -> Result<(), AionSurfaceError> {
331 conversation.close()?;
332 record_operation(
333 context,
334 DispatchOperation::new(
335 DispatchOperationKind::ConversationClosed,
336 conversation_id,
337 channel_name,
338 ),
339 )
340}
341
342fn map_activity_result(
343 context: &DispatchContext,
344 channel_name: &ChannelName,
345 response: DispatchResponse,
346) -> Result<ActivityResult, AionSurfaceError> {
347 match response.result {
348 ActivityResult::Completed { output } => Ok(ActivityResult::Completed { output }),
349 ActivityResult::Failed { error } => Err(dispatch_failed(
350 channel_name,
351 context.workflow_id(),
352 format!("worker '{}' failed activity: {error}", response.worker_id),
353 )),
354 }
355}
356
357const fn result_state(result: &ActivityResult) -> ActivityDispatchState {
358 match result {
359 ActivityResult::Completed { .. } => ActivityDispatchState::ActivityCompleted,
360 ActivityResult::Failed { .. } => ActivityDispatchState::ActivityFailed {
361 retry_eligible: false,
362 },
363 }
364}
365
366fn result_message(result: &ActivityResult) -> Option<String> {
367 match result {
368 ActivityResult::Completed { .. } => None,
369 ActivityResult::Failed { error } => Some(error.to_string()),
370 }
371}
372
373fn record_operation(
374 context: &DispatchContext,
375 operation: DispatchOperation,
376) -> Result<(), AionSurfaceError> {
377 context.recorder.record(operation)
378}
379
380fn dispatch_failed(
381 channel_name: &ChannelName,
382 workflow_id: &str,
383 message: impl Into<String>,
384) -> AionSurfaceError {
385 AionSurfaceError::DispatchFailed {
386 channel_name: String::from(channel_name.clone()),
387 workflow_id: workflow_id.to_owned(),
388 message: message.into(),
389 }
390}