somatize_runtime/
filter_library.rs1use somatize_compiler::FilterRegistry;
8use somatize_core::cache::CacheKey;
9use somatize_core::filter::{Filter, FilterMeta};
10use somatize_core::value::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14pub struct FilterLibrary {
28 filters: HashMap<String, Arc<dyn Filter>>,
29 states: HashMap<String, Value>,
30}
31
32impl FilterLibrary {
33 pub fn new() -> Self {
34 Self {
35 filters: HashMap::new(),
36 states: HashMap::new(),
37 }
38 }
39
40 pub fn register(&mut self, node_id: impl Into<String>, filter: Box<dyn Filter>) {
42 self.filters.insert(node_id.into(), Arc::from(filter));
43 }
44
45 pub fn len(&self) -> usize {
47 self.filters.len()
48 }
49
50 pub fn is_empty(&self) -> bool {
52 self.filters.is_empty()
53 }
54
55 pub fn get(&self, node_id: &str) -> Option<Arc<dyn Filter>> {
57 self.filters.get(node_id).cloned()
58 }
59
60 pub fn set_state(&mut self, node_id: impl Into<String>, state: Value) {
62 self.states.insert(node_id.into(), state);
63 }
64
65 pub fn get_state(&self, node_id: &str) -> Option<&Value> {
67 self.states.get(node_id)
68 }
69}
70
71impl Default for FilterLibrary {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl FilterRegistry for FilterLibrary {
80 fn meta(&self, node_id: &str) -> Option<FilterMeta> {
81 self.filters.get(node_id).map(|f| f.meta())
82 }
83
84 fn config_hash(&self, node_id: &str) -> Option<CacheKey> {
85 self.filters.get(node_id).map(|f| f.config_hash())
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use somatize_core::error::Result;
93 use somatize_core::filter::{FilterKind, StreamMode};
94
95 struct DummyFilter {
96 name: String,
97 }
98
99 impl Filter for DummyFilter {
100 fn config_hash(&self) -> CacheKey {
101 CacheKey::from_parts(&[self.name.as_bytes()])
102 }
103 fn fit(&self, _x: &Value, _y: Option<&Value>) -> Result<Value> {
104 Ok(Value::Empty)
105 }
106 fn forward(&self, x: &Value, _state: &Value) -> Result<Value> {
107 Ok(x.clone())
108 }
109 fn meta(&self) -> FilterMeta {
110 FilterMeta {
111 name: self.name.clone(),
112 kind: FilterKind::Stateless,
113 cacheable: true,
114 differentiable: false,
115 stream_mode: StreamMode::FixedState,
116 distribution: somatize_core::filter::Distribution::Local,
117 input_schema: None,
118 output_schema: None,
119 }
120 }
121 }
122
123 #[test]
124 fn register_and_query() {
125 let mut lib = FilterLibrary::new();
126 lib.register("a", Box::new(DummyFilter { name: "A".into() }));
127 lib.register("b", Box::new(DummyFilter { name: "B".into() }));
128
129 assert_eq!(lib.len(), 2);
130 assert!(lib.get("a").is_some());
131 assert!(lib.get("missing").is_none());
132 }
133
134 #[test]
135 fn implements_filter_registry() {
136 let mut lib = FilterLibrary::new();
137 lib.register(
138 "node_1",
139 Box::new(DummyFilter {
140 name: "Scaler".into(),
141 }),
142 );
143
144 let meta = lib.meta("node_1").unwrap();
145 assert_eq!(meta.name, "Scaler");
146 assert!(meta.cacheable);
147
148 let hash = lib.config_hash("node_1").unwrap();
149 assert_eq!(hash, CacheKey::from_parts(&[b"Scaler"]));
150
151 assert!(lib.meta("nonexistent").is_none());
152 }
153
154 #[test]
155 fn state_management() {
156 let mut lib = FilterLibrary::new();
157 lib.register("a", Box::new(DummyFilter { name: "A".into() }));
158
159 assert!(lib.get_state("a").is_none());
160
161 lib.set_state("a", Value::json(serde_json::json!({"mean": 5.0})));
162 let state = lib.get_state("a").unwrap();
163 assert_eq!(state.as_json().unwrap()["mean"], 5.0);
164 }
165}