Skip to main content

somatize_runtime/
filter_library.rs

1//! Unified filter registry — holds implementations, metadata, and trained states.
2//!
3//! [`FilterLibrary`] is the single registry for the entire pipeline:
4//! the compiler reads metadata via [`FilterRegistry`], and the executor
5//! reads filters and states directly. No intermediate conversion needed.
6
7use somatize_compiler::FilterRegistry;
8use somatize_core::cache::CacheKey;
9use somatize_core::filter::{Filter, FilterMeta};
10use somatize_core::value::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14/// Unified registry of filter implementations, metadata, and trained states.
15///
16/// ```ignore
17/// let mut lib = FilterLibrary::new();
18/// lib.register("scaler", Box::new(MyScaler { scale: 2.0 }));
19/// lib.register("model", Box::new(MyModel::default()));
20///
21/// // Use as compiler registry
22/// let result = somatize_compiler::compile(&graph, &lib, mode, cache)?;
23///
24/// // Use directly with executor — no conversion needed
25/// executor::execute(&plan, &mut ctx, &lib, &cache)?;
26/// ```
27pub struct FilterLibrary {
28    filters: HashMap<String, Arc<dyn Filter>>,
29    states: HashMap<String, Value>,
30}
31
32impl FilterLibrary {
33    pub fn new() -> Self {
34        Self {
35            filters: HashMap::new(),
36            states: HashMap::new(),
37        }
38    }
39
40    /// Register a filter for a given node ID.
41    pub fn register(&mut self, node_id: impl Into<String>, filter: Box<dyn Filter>) {
42        self.filters.insert(node_id.into(), Arc::from(filter));
43    }
44
45    /// Number of registered filters.
46    pub fn len(&self) -> usize {
47        self.filters.len()
48    }
49
50    /// Whether the library is empty.
51    pub fn is_empty(&self) -> bool {
52        self.filters.is_empty()
53    }
54
55    /// Get a filter by node ID.
56    pub fn get(&self, node_id: &str) -> Option<Arc<dyn Filter>> {
57        self.filters.get(node_id).cloned()
58    }
59
60    /// Store a trained state for a node.
61    pub fn set_state(&mut self, node_id: impl Into<String>, state: Value) {
62        self.states.insert(node_id.into(), state);
63    }
64
65    /// Retrieve the trained state for a node.
66    pub fn get_state(&self, node_id: &str) -> Option<&Value> {
67        self.states.get(node_id)
68    }
69}
70
71impl Default for FilterLibrary {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Implements [`FilterRegistry`] so the compiler can read metadata
78/// directly from the registered filter implementations.
79impl FilterRegistry for FilterLibrary {
80    fn meta(&self, node_id: &str) -> Option<FilterMeta> {
81        self.filters.get(node_id).map(|f| f.meta())
82    }
83
84    fn config_hash(&self, node_id: &str) -> Option<CacheKey> {
85        self.filters.get(node_id).map(|f| f.config_hash())
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use somatize_core::error::Result;
93    use somatize_core::filter::{FilterKind, StreamMode};
94
95    struct DummyFilter {
96        name: String,
97    }
98
99    impl Filter for DummyFilter {
100        fn config_hash(&self) -> CacheKey {
101            CacheKey::from_parts(&[self.name.as_bytes()])
102        }
103        fn fit(&self, _x: &Value, _y: Option<&Value>) -> Result<Value> {
104            Ok(Value::Empty)
105        }
106        fn forward(&self, x: &Value, _state: &Value) -> Result<Value> {
107            Ok(x.clone())
108        }
109        fn meta(&self) -> FilterMeta {
110            FilterMeta {
111                name: self.name.clone(),
112                kind: FilterKind::Stateless,
113                cacheable: true,
114                differentiable: false,
115                stream_mode: StreamMode::FixedState,
116                distribution: somatize_core::filter::Distribution::Local,
117                input_schema: None,
118                output_schema: None,
119            }
120        }
121
122        fn as_any(&self) -> &dyn std::any::Any {
123            self
124        }
125    }
126
127    #[test]
128    fn register_and_query() {
129        let mut lib = FilterLibrary::new();
130        lib.register("a", Box::new(DummyFilter { name: "A".into() }));
131        lib.register("b", Box::new(DummyFilter { name: "B".into() }));
132
133        assert_eq!(lib.len(), 2);
134        assert!(lib.get("a").is_some());
135        assert!(lib.get("missing").is_none());
136    }
137
138    #[test]
139    fn implements_filter_registry() {
140        let mut lib = FilterLibrary::new();
141        lib.register(
142            "node_1",
143            Box::new(DummyFilter {
144                name: "Scaler".into(),
145            }),
146        );
147
148        let meta = lib.meta("node_1").unwrap();
149        assert_eq!(meta.name, "Scaler");
150        assert!(meta.cacheable);
151
152        let hash = lib.config_hash("node_1").unwrap();
153        assert_eq!(hash, CacheKey::from_parts(&[b"Scaler"]));
154
155        assert!(lib.meta("nonexistent").is_none());
156    }
157
158    #[test]
159    fn state_management() {
160        let mut lib = FilterLibrary::new();
161        lib.register("a", Box::new(DummyFilter { name: "A".into() }));
162
163        assert!(lib.get_state("a").is_none());
164
165        lib.set_state("a", Value::json(serde_json::json!({"mean": 5.0})));
166        let state = lib.get_state("a").unwrap();
167        assert_eq!(state.as_json().unwrap()["mean"], 5.0);
168    }
169}