daedalus_runtime/
handler_registry.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use crate::executor::NodeError;
5use crate::io::NodeIo;
6use crate::plan::RuntimeNode;
7use crate::state::ExecutionContext;
8
9/// Helper registry mapping node ids to stateless/stateful handlers.
10type StatelessFn = Arc<
11    dyn Fn(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError> + Send + Sync,
12>;
13type StatefulFn = Arc<
14    Mutex<
15        Box<
16            dyn FnMut(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError> + Send,
17        >,
18    >,
19>;
20
21pub struct HandlerRegistry {
22    stateless: HashMap<String, StatelessFn>,
23    stateful: HashMap<String, StatefulFn>,
24}
25
26impl HandlerRegistry {
27    pub fn new() -> Self {
28        Self {
29            stateless: HashMap::new(),
30            stateful: HashMap::new(),
31        }
32    }
33
34    pub fn is_empty(&self) -> bool {
35        self.stateless.is_empty() && self.stateful.is_empty()
36    }
37
38    pub fn on<F>(&mut self, id: &str, f: F)
39    where
40        F: Fn(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError>
41            + Send
42            + Sync
43            + 'static,
44    {
45        if std::env::var_os("DAEDALUS_TRACE_HANDLER_REGISTER").is_some() {
46            log::warn!("daedalus-runtime: register handler id={}", id);
47        }
48        self.stateless.insert(id.to_string(), Arc::new(f));
49    }
50
51    pub fn on_stateful<F>(&mut self, id: &str, f: F)
52    where
53        F: FnMut(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError>
54            + Send
55            + 'static,
56    {
57        if std::env::var_os("DAEDALUS_TRACE_HANDLER_REGISTER").is_some() {
58            log::warn!(
59                "daedalus-runtime: register stateful handler id={}",
60                id
61            );
62        }
63        self.stateful
64            .insert(id.to_string(), Arc::new(Mutex::new(Box::new(f))));
65    }
66
67    pub fn merge(&mut self, other: HandlerRegistry) {
68        self.stateless.extend(other.stateless);
69        self.stateful.extend(other.stateful);
70    }
71
72    pub fn has_handler(&self, id: &str) -> bool {
73        self.stateless.contains_key(id) || self.stateful.contains_key(id)
74    }
75
76    pub fn with_prefix(self, prefix: &str) -> Self {
77        if prefix.is_empty() {
78            return self;
79        }
80        let mut out = HandlerRegistry::new();
81        for (k, v) in self.stateless {
82            out.stateless
83                .insert(crate::apply_node_prefix(prefix, &k), v);
84        }
85        for (k, v) in self.stateful {
86            out.stateful.insert(crate::apply_node_prefix(prefix, &k), v);
87        }
88        out
89    }
90
91    /// Cheap clone by cloning the underlying Arcs/Mutexes.
92    pub fn clone_arc(&self) -> Self {
93        Self {
94            stateless: self.stateless.clone(),
95            stateful: self.stateful.clone(),
96        }
97    }
98}
99
100impl crate::executor::NodeHandler for HandlerRegistry {
101    fn run(
102        &self,
103        node: &RuntimeNode,
104        ctx: &ExecutionContext,
105        io: &mut NodeIo,
106    ) -> Result<(), NodeError> {
107        if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
108            && node.id == "cv:image:to_gray"
109        {
110            let has_stateless = self.stateless.contains_key(&node.id);
111            let has_stateful = self.stateful.contains_key(&node.id);
112            log::warn!(
113                "daedalus-runtime: handler run node={} stateless={} stateful={}",
114                node.id,
115                has_stateless,
116                has_stateful
117            );
118        }
119        if let Some(f) = self.stateless.get(&node.id) {
120            let res = f(node, ctx, io);
121            if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
122                && node.id == "cv:image:to_gray"
123            {
124                log::warn!(
125                    "daedalus-runtime: handler result node={} ok={}",
126                    node.id,
127                    res.is_ok()
128                );
129            }
130            res
131        } else if let Some(f) = self.stateful.get(&node.id) {
132            let res = f.lock().unwrap()(node, ctx, io);
133            if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
134                && node.id == "cv:image:to_gray"
135            {
136                log::warn!(
137                    "daedalus-runtime: handler result node={} ok={}",
138                    node.id,
139                    res.is_ok()
140                );
141            }
142            res
143        } else {
144            if std::env::var_os("DAEDALUS_TRACE_MISSING_HANDLERS").is_some() {
145                log::warn!("daedalus-runtime: missing handler for node id={}", node.id);
146            }
147            Ok(())
148        }
149    }
150}
151
152impl Default for HandlerRegistry {
153    fn default() -> Self {
154        Self::new()
155    }
156}