daedalus_runtime/
handler_registry.rs1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use crate::executor::NodeError;
5use crate::io::NodeIo;
6use crate::plan::RuntimeNode;
7use crate::state::ExecutionContext;
8
9type StatelessFn = Arc<
11 dyn Fn(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError> + Send + Sync,
12>;
13type StatefulFn = Arc<
14 Mutex<
15 Box<
16 dyn FnMut(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError> + Send,
17 >,
18 >,
19>;
20
21pub struct HandlerRegistry {
22 stateless: HashMap<String, StatelessFn>,
23 stateful: HashMap<String, StatefulFn>,
24}
25
26impl HandlerRegistry {
27 pub fn new() -> Self {
28 Self {
29 stateless: HashMap::new(),
30 stateful: HashMap::new(),
31 }
32 }
33
34 pub fn is_empty(&self) -> bool {
35 self.stateless.is_empty() && self.stateful.is_empty()
36 }
37
38 pub fn on<F>(&mut self, id: &str, f: F)
39 where
40 F: Fn(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError>
41 + Send
42 + Sync
43 + 'static,
44 {
45 if std::env::var_os("DAEDALUS_TRACE_HANDLER_REGISTER").is_some() {
46 log::warn!("daedalus-runtime: register handler id={}", id);
47 }
48 self.stateless.insert(id.to_string(), Arc::new(f));
49 }
50
51 pub fn on_stateful<F>(&mut self, id: &str, f: F)
52 where
53 F: FnMut(&RuntimeNode, &ExecutionContext, &mut NodeIo) -> Result<(), NodeError>
54 + Send
55 + 'static,
56 {
57 if std::env::var_os("DAEDALUS_TRACE_HANDLER_REGISTER").is_some() {
58 log::warn!(
59 "daedalus-runtime: register stateful handler id={}",
60 id
61 );
62 }
63 self.stateful
64 .insert(id.to_string(), Arc::new(Mutex::new(Box::new(f))));
65 }
66
67 pub fn merge(&mut self, other: HandlerRegistry) {
68 self.stateless.extend(other.stateless);
69 self.stateful.extend(other.stateful);
70 }
71
72 pub fn has_handler(&self, id: &str) -> bool {
73 self.stateless.contains_key(id) || self.stateful.contains_key(id)
74 }
75
76 pub fn with_prefix(self, prefix: &str) -> Self {
77 if prefix.is_empty() {
78 return self;
79 }
80 let mut out = HandlerRegistry::new();
81 for (k, v) in self.stateless {
82 out.stateless
83 .insert(crate::apply_node_prefix(prefix, &k), v);
84 }
85 for (k, v) in self.stateful {
86 out.stateful.insert(crate::apply_node_prefix(prefix, &k), v);
87 }
88 out
89 }
90
91 pub fn clone_arc(&self) -> Self {
93 Self {
94 stateless: self.stateless.clone(),
95 stateful: self.stateful.clone(),
96 }
97 }
98}
99
100impl crate::executor::NodeHandler for HandlerRegistry {
101 fn run(
102 &self,
103 node: &RuntimeNode,
104 ctx: &ExecutionContext,
105 io: &mut NodeIo,
106 ) -> Result<(), NodeError> {
107 if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
108 && node.id == "cv:image:to_gray"
109 {
110 let has_stateless = self.stateless.contains_key(&node.id);
111 let has_stateful = self.stateful.contains_key(&node.id);
112 log::warn!(
113 "daedalus-runtime: handler run node={} stateless={} stateful={}",
114 node.id,
115 has_stateless,
116 has_stateful
117 );
118 }
119 if let Some(f) = self.stateless.get(&node.id) {
120 let res = f(node, ctx, io);
121 if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
122 && node.id == "cv:image:to_gray"
123 {
124 log::warn!(
125 "daedalus-runtime: handler result node={} ok={}",
126 node.id,
127 res.is_ok()
128 );
129 }
130 res
131 } else if let Some(f) = self.stateful.get(&node.id) {
132 let res = f.lock().unwrap()(node, ctx, io);
133 if std::env::var_os("DAEDALUS_TRACE_HANDLER_RUN").is_some()
134 && node.id == "cv:image:to_gray"
135 {
136 log::warn!(
137 "daedalus-runtime: handler result node={} ok={}",
138 node.id,
139 res.is_ok()
140 );
141 }
142 res
143 } else {
144 if std::env::var_os("DAEDALUS_TRACE_MISSING_HANDLERS").is_some() {
145 log::warn!("daedalus-runtime: missing handler for node id={}", node.id);
146 }
147 Ok(())
148 }
149 }
150}
151
152impl Default for HandlerRegistry {
153 fn default() -> Self {
154 Self::new()
155 }
156}