somatize_runtime/
filter_library.rs1use somatize_compiler::FilterRegistry;
12use somatize_core::cache::CacheKey;
13use somatize_core::filter::{Filter, FilterMeta};
14use somatize_core::state::{MemoryStateStore, StateStore};
15use somatize_core::value::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19pub struct FilterLibrary {
33 filters: HashMap<String, Arc<dyn Filter>>,
34 states: Arc<dyn StateStore>,
35}
36
37impl FilterLibrary {
38 pub fn new() -> Self {
40 Self {
41 filters: HashMap::new(),
42 states: Arc::new(MemoryStateStore::new()),
43 }
44 }
45
46 pub fn with_state_store(states: Arc<dyn StateStore>) -> Self {
48 Self {
49 filters: HashMap::new(),
50 states,
51 }
52 }
53
54 pub fn register(&mut self, node_id: impl Into<String>, filter: Box<dyn Filter>) {
56 self.filters.insert(node_id.into(), Arc::from(filter));
57 }
58
59 pub fn len(&self) -> usize {
61 self.filters.len()
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.filters.is_empty()
67 }
68
69 pub fn get(&self, node_id: &str) -> Option<Arc<dyn Filter>> {
71 self.filters.get(node_id).cloned()
72 }
73
74 pub fn set_state(&self, node_id: impl Into<String>, state: Value) {
79 let id = node_id.into();
80 if let Err(e) = self.states.set(&id, state) {
81 panic!("StateStore::set failed for node {id}: {e}");
85 }
86 }
87
88 pub fn get_state(&self, node_id: &str) -> Option<Arc<Value>> {
92 self.states.get(node_id).ok().flatten()
93 }
94
95 pub fn clear_states(&self) {
97 let _ = self.states.clear();
98 }
99
100 pub fn state_store(&self) -> &Arc<dyn StateStore> {
103 &self.states
104 }
105}
106
107impl Default for FilterLibrary {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl FilterRegistry for FilterLibrary {
116 fn meta(&self, node_id: &str) -> Option<FilterMeta> {
117 self.filters.get(node_id).map(|f| f.meta())
118 }
119
120 fn config_hash(&self, node_id: &str) -> Option<CacheKey> {
121 self.filters.get(node_id).map(|f| f.config_hash())
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use somatize_core::error::Result;
129 use somatize_core::filter::{FilterKind, StreamMode};
130
131 struct DummyFilter {
132 name: String,
133 }
134
135 impl Filter for DummyFilter {
136 fn config_hash(&self) -> CacheKey {
137 CacheKey::from_parts(&[self.name.as_bytes()])
138 }
139 fn fit(&self, _x: &Value, _y: Option<&Value>) -> Result<Value> {
140 Ok(Value::Empty)
141 }
142 fn forward(&self, x: &Value, _state: &Value) -> Result<Value> {
143 Ok(x.clone())
144 }
145 fn meta(&self) -> FilterMeta {
146 FilterMeta {
147 name: self.name.clone(),
148 kind: FilterKind::Stateless,
149 cacheable: true,
150 differentiable: false,
151 stream_mode: StreamMode::FixedState,
152 distribution: somatize_core::filter::Distribution::Local,
153 input_schema: None,
154 output_schema: None,
155 }
156 }
157
158 fn as_any(&self) -> &dyn std::any::Any {
159 self
160 }
161 }
162
163 #[test]
164 fn register_and_query() {
165 let mut lib = FilterLibrary::new();
166 lib.register("a", Box::new(DummyFilter { name: "A".into() }));
167 lib.register("b", Box::new(DummyFilter { name: "B".into() }));
168
169 assert_eq!(lib.len(), 2);
170 assert!(lib.get("a").is_some());
171 assert!(lib.get("missing").is_none());
172 }
173
174 #[test]
175 fn implements_filter_registry() {
176 let mut lib = FilterLibrary::new();
177 lib.register(
178 "node_1",
179 Box::new(DummyFilter {
180 name: "Scaler".into(),
181 }),
182 );
183
184 let meta = lib.meta("node_1").unwrap();
185 assert_eq!(meta.name, "Scaler");
186 assert!(meta.cacheable);
187
188 let hash = lib.config_hash("node_1").unwrap();
189 assert_eq!(hash, CacheKey::from_parts(&[b"Scaler"]));
190
191 assert!(lib.meta("nonexistent").is_none());
192 }
193
194 #[test]
195 fn state_management() {
196 let mut lib = FilterLibrary::new();
197 lib.register("a", Box::new(DummyFilter { name: "A".into() }));
198
199 assert!(lib.get_state("a").is_none());
200
201 lib.set_state("a", Value::json(serde_json::json!({"mean": 5.0})));
202 let state = lib.get_state("a").unwrap();
203 assert_eq!(state.as_json().unwrap()["mean"], 5.0);
204 }
205
206 #[test]
207 fn clear_states_keeps_filters() {
208 let mut lib = FilterLibrary::new();
209 lib.register("a", Box::new(DummyFilter { name: "A".into() }));
210 lib.set_state("a", Value::Empty);
211
212 assert!(lib.get_state("a").is_some());
213 lib.clear_states();
214 assert!(lib.get_state("a").is_none());
215 assert!(lib.get("a").is_some());
216 }
217}