Skip to main content

somatize_runtime/
filter_library.rs

1//! Unified filter registry — holds implementations, metadata, and trained states.
2//!
3//! [`FilterLibrary`] is the single registry for the entire pipeline:
4//! the compiler reads metadata via [`FilterRegistry`], and the executor
5//! reads filters and states directly. No intermediate conversion needed.
6//!
7//! States live in a pluggable [`StateStore`] — by default
8//! [`MemoryStateStore`], but users can inject a disk- or S3-backed store
9//! for pipelines whose trained states don't fit comfortably in RAM.
10
11use somatize_compiler::FilterRegistry;
12use somatize_core::cache::CacheKey;
13use somatize_core::filter::{Filter, FilterMeta};
14use somatize_core::state::{MemoryStateStore, StateStore};
15use somatize_core::value::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19/// Unified registry of filter implementations, metadata, and trained states.
20///
21/// ```ignore
22/// let mut lib = FilterLibrary::new();
23/// lib.register("scaler", Box::new(MyScaler { scale: 2.0 }));
24/// lib.register("model", Box::new(MyModel::default()));
25///
26/// // Use as compiler registry
27/// let result = somatize_compiler::compile(&graph, &lib, mode, cache)?;
28///
29/// // Use directly with executor — no conversion needed
30/// executor::execute(&plan, &mut ctx, &lib, &cache)?;
31/// ```
32pub struct FilterLibrary {
33    filters: HashMap<String, Arc<dyn Filter>>,
34    states: Arc<dyn StateStore>,
35}
36
37impl FilterLibrary {
38    /// Create a new library with an in-memory state store.
39    pub fn new() -> Self {
40        Self {
41            filters: HashMap::new(),
42            states: Arc::new(MemoryStateStore::new()),
43        }
44    }
45
46    /// Create a library with a custom [`StateStore`] backend.
47    pub fn with_state_store(states: Arc<dyn StateStore>) -> Self {
48        Self {
49            filters: HashMap::new(),
50            states,
51        }
52    }
53
54    /// Register a filter for a given node ID.
55    pub fn register(&mut self, node_id: impl Into<String>, filter: Box<dyn Filter>) {
56        self.filters.insert(node_id.into(), Arc::from(filter));
57    }
58
59    /// Number of registered filters.
60    pub fn len(&self) -> usize {
61        self.filters.len()
62    }
63
64    /// Whether the library is empty.
65    pub fn is_empty(&self) -> bool {
66        self.filters.is_empty()
67    }
68
69    /// Get a filter by node ID.
70    pub fn get(&self, node_id: &str) -> Option<Arc<dyn Filter>> {
71        self.filters.get(node_id).cloned()
72    }
73
74    /// Store a trained state for a node.
75    ///
76    /// Errors bubble up from the underlying [`StateStore`] (e.g. I/O on
77    /// a disk-backed backend). The in-memory default never fails.
78    pub fn set_state(&self, node_id: impl Into<String>, state: Value) {
79        let id = node_id.into();
80        if let Err(e) = self.states.set(&id, state) {
81            // A failing state store is a programming/infra error; keep
82            // the panic local to this call site rather than letting an
83            // Err propagate through every caller.
84            panic!("StateStore::set failed for node {id}: {e}");
85        }
86    }
87
88    /// Retrieve the trained state for a node. The returned `Arc<Value>`
89    /// can be dereferenced (`&*arc`) for the forward hot path without
90    /// cloning the underlying value.
91    pub fn get_state(&self, node_id: &str) -> Option<Arc<Value>> {
92        self.states.get(node_id).ok().flatten()
93    }
94
95    /// Drop all stored states (but keep filters).
96    pub fn clear_states(&self) {
97        let _ = self.states.clear();
98    }
99
100    /// Access the underlying [`StateStore`] (e.g. to share it across
101    /// sessions or inspect its contents).
102    pub fn state_store(&self) -> &Arc<dyn StateStore> {
103        &self.states
104    }
105}
106
107impl Default for FilterLibrary {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113/// Implements [`FilterRegistry`] so the compiler can read metadata
114/// directly from the registered filter implementations.
115impl FilterRegistry for FilterLibrary {
116    fn meta(&self, node_id: &str) -> Option<FilterMeta> {
117        self.filters.get(node_id).map(|f| f.meta())
118    }
119
120    fn config_hash(&self, node_id: &str) -> Option<CacheKey> {
121        self.filters.get(node_id).map(|f| f.config_hash())
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use somatize_core::error::Result;
129    use somatize_core::filter::{FilterKind, StreamMode};
130
131    struct DummyFilter {
132        name: String,
133    }
134
135    impl Filter for DummyFilter {
136        fn config_hash(&self) -> CacheKey {
137            CacheKey::from_parts(&[self.name.as_bytes()])
138        }
139        fn fit(&self, _x: &Value, _y: Option<&Value>) -> Result<Value> {
140            Ok(Value::Empty)
141        }
142        fn forward(&self, x: &Value, _state: &Value) -> Result<Value> {
143            Ok(x.clone())
144        }
145        fn meta(&self) -> FilterMeta {
146            FilterMeta {
147                name: self.name.clone(),
148                kind: FilterKind::Stateless,
149                cacheable: true,
150                differentiable: false,
151                stream_mode: StreamMode::FixedState,
152                distribution: somatize_core::filter::Distribution::Local,
153                input_schema: None,
154                output_schema: None,
155            }
156        }
157
158        fn as_any(&self) -> &dyn std::any::Any {
159            self
160        }
161    }
162
163    #[test]
164    fn register_and_query() {
165        let mut lib = FilterLibrary::new();
166        lib.register("a", Box::new(DummyFilter { name: "A".into() }));
167        lib.register("b", Box::new(DummyFilter { name: "B".into() }));
168
169        assert_eq!(lib.len(), 2);
170        assert!(lib.get("a").is_some());
171        assert!(lib.get("missing").is_none());
172    }
173
174    #[test]
175    fn implements_filter_registry() {
176        let mut lib = FilterLibrary::new();
177        lib.register(
178            "node_1",
179            Box::new(DummyFilter {
180                name: "Scaler".into(),
181            }),
182        );
183
184        let meta = lib.meta("node_1").unwrap();
185        assert_eq!(meta.name, "Scaler");
186        assert!(meta.cacheable);
187
188        let hash = lib.config_hash("node_1").unwrap();
189        assert_eq!(hash, CacheKey::from_parts(&[b"Scaler"]));
190
191        assert!(lib.meta("nonexistent").is_none());
192    }
193
194    #[test]
195    fn state_management() {
196        let mut lib = FilterLibrary::new();
197        lib.register("a", Box::new(DummyFilter { name: "A".into() }));
198
199        assert!(lib.get_state("a").is_none());
200
201        lib.set_state("a", Value::json(serde_json::json!({"mean": 5.0})));
202        let state = lib.get_state("a").unwrap();
203        assert_eq!(state.as_json().unwrap()["mean"], 5.0);
204    }
205
206    #[test]
207    fn clear_states_keeps_filters() {
208        let mut lib = FilterLibrary::new();
209        lib.register("a", Box::new(DummyFilter { name: "A".into() }));
210        lib.set_state("a", Value::Empty);
211
212        assert!(lib.get_state("a").is_some());
213        lib.clear_states();
214        assert!(lib.get_state("a").is_none());
215        assert!(lib.get("a").is_some());
216    }
217}