Skip to main content

liminal/aion/dispatch/
router.rs

1use std::sync::Arc;
2
3use super::super::channels::ChannelName;
4use super::super::error::AionSurfaceError;
5use super::super::types::ActivityRequest;
6use super::{DispatchRouter, DispatchWorker, dispatch_failed};
7use crate::routing::function::RoutingMessage;
8use crate::routing::{FieldValue, RoutingSlot, SupervisedExecutor};
9
10/// Router adapter backed by the active liminal routing function slot.
11#[derive(Debug)]
12pub struct RoutingFunctionDispatchRouter {
13    slot: Arc<RoutingSlot>,
14    executor: SupervisedExecutor,
15}
16
17impl RoutingFunctionDispatchRouter {
18    /// Creates a router using the supplied routing slot and supervised executor.
19    #[must_use]
20    pub const fn new(slot: Arc<RoutingSlot>, executor: SupervisedExecutor) -> Self {
21        Self { slot, executor }
22    }
23}
24
25impl DispatchRouter for RoutingFunctionDispatchRouter {
26    fn select_worker(
27        &self,
28        workflow_id: &str,
29        channel_name: &ChannelName,
30        request: &ActivityRequest,
31        candidates: &[DispatchWorker],
32        excluded_worker_ids: &[String],
33    ) -> Result<Option<DispatchWorker>, AionSurfaceError> {
34        let eligible = eligible_workers(candidates, excluded_worker_ids);
35        let views = eligible
36            .iter()
37            .map(|worker| worker.consumer_state.clone())
38            .collect();
39        let decision = self
40            .executor
41            .execute(&self.slot.current(), routing_message(request), views)
42            .map_err(|error| dispatch_failed(channel_name, workflow_id, error.to_string()))?;
43
44        let Some(selected) = decision.selected() else {
45            return Ok(None);
46        };
47        let selected_id = selected.as_str();
48        eligible
49            .into_iter()
50            .find(|worker| worker.worker_id == selected_id)
51            .map_or_else(
52                || {
53                    Err(dispatch_failed(
54                        channel_name,
55                        workflow_id,
56                        format!("routing selected unknown worker '{selected_id}'"),
57                    ))
58                },
59                |worker| Ok(Some(worker)),
60            )
61    }
62}
63
64fn eligible_workers(
65    candidates: &[DispatchWorker],
66    excluded_worker_ids: &[String],
67) -> Vec<DispatchWorker> {
68    candidates
69        .iter()
70        .filter(|worker| !excluded_worker_ids.iter().any(|id| id == &worker.worker_id))
71        .cloned()
72        .collect()
73}
74
75fn routing_message(request: &ActivityRequest) -> RoutingMessage {
76    RoutingMessage::new()
77        .with(
78            "activity_type",
79            FieldValue::Text(request.activity_type.clone()),
80        )
81        .with("task_queue", FieldValue::Text(request.task_queue.clone()))
82        .with(
83            "input_content_type",
84            FieldValue::Text(request.input.content_type.clone()),
85        )
86}