liminal/aion/dispatch/
router.rs1use std::sync::Arc;
2
3use super::super::channels::ChannelName;
4use super::super::error::AionSurfaceError;
5use super::super::types::ActivityRequest;
6use super::{DispatchRouter, DispatchWorker, dispatch_failed};
7use crate::routing::function::RoutingMessage;
8use crate::routing::{FieldValue, RoutingSlot, SupervisedExecutor};
9
10#[derive(Debug)]
12pub struct RoutingFunctionDispatchRouter {
13 slot: Arc<RoutingSlot>,
14 executor: SupervisedExecutor,
15}
16
17impl RoutingFunctionDispatchRouter {
18 #[must_use]
20 pub const fn new(slot: Arc<RoutingSlot>, executor: SupervisedExecutor) -> Self {
21 Self { slot, executor }
22 }
23}
24
25impl DispatchRouter for RoutingFunctionDispatchRouter {
26 fn select_worker(
27 &self,
28 workflow_id: &str,
29 channel_name: &ChannelName,
30 request: &ActivityRequest,
31 candidates: &[DispatchWorker],
32 excluded_worker_ids: &[String],
33 ) -> Result<Option<DispatchWorker>, AionSurfaceError> {
34 let eligible = eligible_workers(candidates, excluded_worker_ids);
35 let views = eligible
36 .iter()
37 .map(|worker| worker.consumer_state.clone())
38 .collect();
39 let decision = self
40 .executor
41 .execute(&self.slot.current(), routing_message(request), views)
42 .map_err(|error| dispatch_failed(channel_name, workflow_id, error.to_string()))?;
43
44 let Some(selected) = decision.selected() else {
45 return Ok(None);
46 };
47 let selected_id = selected.as_str();
48 eligible
49 .into_iter()
50 .find(|worker| worker.worker_id == selected_id)
51 .map_or_else(
52 || {
53 Err(dispatch_failed(
54 channel_name,
55 workflow_id,
56 format!("routing selected unknown worker '{selected_id}'"),
57 ))
58 },
59 |worker| Ok(Some(worker)),
60 )
61 }
62}
63
64fn eligible_workers(
65 candidates: &[DispatchWorker],
66 excluded_worker_ids: &[String],
67) -> Vec<DispatchWorker> {
68 candidates
69 .iter()
70 .filter(|worker| !excluded_worker_ids.iter().any(|id| id == &worker.worker_id))
71 .cloned()
72 .collect()
73}
74
75fn routing_message(request: &ActivityRequest) -> RoutingMessage {
76 RoutingMessage::new()
77 .with(
78 "activity_type",
79 FieldValue::Text(request.activity_type.clone()),
80 )
81 .with("task_queue", FieldValue::Text(request.task_queue.clone()))
82 .with(
83 "input_content_type",
84 FieldValue::Text(request.input.content_type.clone()),
85 )
86}