Skip to main content

clasp_router/
state.rs

1//! Router state management
2
3use clasp_core::state::{ParamState, StateStore, StateStoreConfig, UpdateError};
4use clasp_core::{ParamValue, SetMessage, SignalDefinition, SnapshotMessage, Value};
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9#[cfg(feature = "journal")]
10use clasp_core::SignalType;
11#[cfg(feature = "journal")]
12use clasp_journal::{Journal, JournalEntry};
13#[cfg(feature = "journal")]
14use std::sync::Arc;
15
16use crate::SessionId;
17
18/// Signal entry with registration time for cleanup
19#[derive(Debug, Clone)]
20pub struct SignalEntry {
21    /// The signal definition
22    pub definition: SignalDefinition,
23    /// When this signal was registered
24    pub registered_at: Instant,
25    /// Last time this signal was accessed or updated
26    pub last_accessed: Instant,
27}
28
29/// Configuration for router state management
30#[derive(Debug, Clone)]
31pub struct RouterStateConfig {
32    /// Parameter store configuration
33    pub param_config: StateStoreConfig,
34    /// TTL for signal definitions (None = never expire)
35    pub signal_ttl: Option<Duration>,
36    /// Maximum number of signals (None = unlimited)
37    pub max_signals: Option<usize>,
38}
39
40impl Default for RouterStateConfig {
41    fn default() -> Self {
42        Self {
43            param_config: StateStoreConfig::default(),
44            signal_ttl: Some(Duration::from_secs(3600)), // 1 hour
45            max_signals: Some(10_000),
46        }
47    }
48}
49
50impl RouterStateConfig {
51    /// Create config with no limits (for backwards compatibility)
52    pub fn unlimited() -> Self {
53        Self {
54            param_config: StateStoreConfig::unlimited(),
55            signal_ttl: None,
56            max_signals: None,
57        }
58    }
59}
60
61/// Listener callback type
62type ListenerFn = Box<dyn Fn(&str, &Value) + Send + Sync>;
63
64/// Global router state
65pub struct RouterState {
66    /// Parameter state store
67    params: RwLock<StateStore>,
68    /// Change listeners (for reactive updates)
69    listeners: DashMap<String, Vec<ListenerFn>>,
70    /// Signal registry (announced signals from clients) with timestamps
71    signals: DashMap<String, SignalEntry>,
72    /// Configuration
73    config: RouterStateConfig,
74    /// Optional journal for state persistence and replay
75    #[cfg(feature = "journal")]
76    journal: Option<Arc<dyn Journal>>,
77}
78
79impl RouterState {
80    pub fn new() -> Self {
81        Self::with_config(RouterStateConfig::unlimited())
82    }
83
84    /// Create with specific configuration
85    pub fn with_config(config: RouterStateConfig) -> Self {
86        Self {
87            params: RwLock::new(StateStore::with_config(config.param_config.clone())),
88            listeners: DashMap::new(),
89            signals: DashMap::new(),
90            config,
91            #[cfg(feature = "journal")]
92            journal: None,
93        }
94    }
95
96    /// Set the journal for state persistence and replay
97    #[cfg(feature = "journal")]
98    pub fn set_journal(&mut self, journal: Arc<dyn Journal>) {
99        self.journal = Some(journal);
100    }
101
102    /// Get a reference to the journal (if configured)
103    #[cfg(feature = "journal")]
104    pub fn journal(&self) -> Option<&Arc<dyn Journal>> {
105        self.journal.as_ref()
106    }
107
108    /// Register signals from an ANNOUNCE message
109    pub fn register_signals(&self, signals: Vec<SignalDefinition>) {
110        let now = Instant::now();
111        for signal in signals {
112            let address = signal.address.clone();
113            self.signals.insert(
114                address,
115                SignalEntry {
116                    definition: signal,
117                    registered_at: now,
118                    last_accessed: now,
119                },
120            );
121        }
122    }
123
124    /// Query signals matching a pattern
125    pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
126        self.signals
127            .iter()
128            .filter(|entry| clasp_core::address::glob_match(pattern, entry.key()))
129            .map(|entry| entry.value().definition.clone())
130            .collect()
131    }
132
133    /// Get all registered signals
134    pub fn all_signals(&self) -> Vec<SignalDefinition> {
135        self.signals
136            .iter()
137            .map(|entry| entry.value().definition.clone())
138            .collect()
139    }
140
141    /// Get signal count
142    pub fn signal_count(&self) -> usize {
143        self.signals.len()
144    }
145
146    /// Remove stale signals that haven't been accessed within the TTL
147    /// Returns the number of signals removed
148    pub fn cleanup_stale_signals(&self, ttl: Duration) -> usize {
149        let now = Instant::now();
150        let before = self.signals.len();
151        self.signals
152            .retain(|_, entry| now.duration_since(entry.last_accessed) < ttl);
153        before - self.signals.len()
154    }
155
156    /// Remove stale params using the configured TTL
157    /// Returns the number of params removed
158    pub fn cleanup_stale_params(&self, ttl: Duration) -> usize {
159        self.params.write().cleanup_stale(ttl)
160    }
161
162    /// Run all cleanup operations using configured TTLs
163    /// Returns (params_removed, signals_removed)
164    pub fn cleanup_stale(&self) -> (usize, usize) {
165        let params_removed = if let Some(ttl) = self.config.param_config.param_ttl {
166            self.params.write().cleanup_stale(ttl)
167        } else {
168            0
169        };
170
171        let signals_removed = if let Some(ttl) = self.config.signal_ttl {
172            self.cleanup_stale_signals(ttl)
173        } else {
174            0
175        };
176
177        (params_removed, signals_removed)
178    }
179
180    /// Get a parameter value
181    pub fn get(&self, address: &str) -> Option<Value> {
182        self.params.read().get_value(address).cloned()
183    }
184
185    /// Get full parameter state
186    pub fn get_state(&self, address: &str) -> Option<ParamState> {
187        self.params.read().get(address).cloned()
188    }
189
190    /// Set a parameter value
191    pub fn set(
192        &self,
193        address: &str,
194        value: Value,
195        writer: &SessionId,
196        revision: Option<u64>,
197        lock: bool,
198        unlock: bool,
199        ttl: Option<clasp_core::Ttl>,
200    ) -> Result<u64, UpdateError> {
201        let result =
202            self.params
203                .write()
204                .set(address, value.clone(), writer, revision, lock, unlock, ttl)?;
205
206        // Notify listeners
207        if let Some(listeners) = self.listeners.get(address) {
208            for listener in listeners.iter() {
209                listener(address, &value);
210            }
211        }
212
213        Ok(result)
214    }
215
216    /// Apply a SET message
217    pub fn apply_set(&self, msg: &SetMessage, writer: &SessionId) -> Result<u64, UpdateError> {
218        let result = self.set(
219            &msg.address,
220            msg.value.clone(),
221            writer,
222            msg.revision,
223            msg.lock,
224            msg.unlock,
225            msg.ttl,
226        )?;
227
228        // Fire-and-forget journal append
229        #[cfg(feature = "journal")]
230        if let Some(ref journal) = self.journal {
231            let entry = JournalEntry::from_set(
232                msg.address.clone(),
233                msg.value.clone(),
234                result,
235                writer.clone(),
236                clasp_core::time::now(),
237            );
238            let journal = Arc::clone(journal);
239            tokio::spawn(async move {
240                let _ = journal.append(entry).await;
241            });
242        }
243
244        Ok(result)
245    }
246
247    /// Record a PUBLISH event in the journal (fire-and-forget)
248    #[cfg(feature = "journal")]
249    pub fn journal_publish(
250        &self,
251        address: &str,
252        signal_type: SignalType,
253        value: Option<&Value>,
254        author: &str,
255    ) {
256        if let Some(ref journal) = self.journal {
257            let entry = JournalEntry::from_publish(
258                address.to_string(),
259                signal_type,
260                value.cloned().unwrap_or(Value::Null),
261                author.to_string(),
262                clasp_core::time::now(),
263            );
264            let journal = Arc::clone(journal);
265            tokio::spawn(async move {
266                let _ = journal.append(entry).await;
267            });
268        }
269    }
270
271    /// Get all parameters matching a pattern
272    pub fn get_matching(&self, pattern: &str) -> Vec<(String, ParamState)> {
273        self.params
274            .read()
275            .get_matching(pattern)
276            .into_iter()
277            .map(|(k, v)| (k.to_string(), v.clone()))
278            .collect()
279    }
280
281    /// Create a snapshot of all params matching a pattern
282    pub fn snapshot(&self, pattern: &str) -> SnapshotMessage {
283        let params: Vec<ParamValue> = self
284            .get_matching(pattern)
285            .into_iter()
286            .map(|(address, state)| ParamValue {
287                address,
288                value: state.value,
289                revision: state.revision,
290                writer: Some(state.writer),
291                timestamp: Some(state.timestamp),
292            })
293            .collect();
294
295        SnapshotMessage { params }
296    }
297
298    /// Create a full snapshot
299    pub fn full_snapshot(&self) -> SnapshotMessage {
300        self.snapshot("**")
301    }
302
303    /// Recover state from journal snapshot and replay entries.
304    ///
305    /// Loads the most recent snapshot, then replays any entries appended after
306    /// the snapshot was taken. This enables crash recovery.
307    #[cfg(feature = "journal")]
308    pub async fn recover_from_journal(&self) -> std::result::Result<usize, String> {
309        let journal = self
310            .journal
311            .as_ref()
312            .ok_or_else(|| "No journal configured".to_string())?;
313
314        let mut recovered = 0;
315
316        // Load snapshot if available
317        if let Ok(Some(snapshots)) = journal.load_snapshot().await {
318            for snap in &snapshots {
319                let _ = self.set(
320                    &snap.address,
321                    snap.value.clone(),
322                    &snap.writer,
323                    Some(snap.revision),
324                    false,
325                    false,
326                    None,
327                );
328                recovered += 1;
329            }
330            tracing::info!("Recovered {} params from journal snapshot", recovered);
331        }
332
333        // Replay entries since the snapshot
334        // For simplicity, replay all SET entries (they're idempotent with LWW)
335        if let Ok(entries) = journal.since(0, None).await {
336            for entry in &entries {
337                if entry.msg_type == 0x21 {
338                    // SET
339                    if let Some(revision) = entry.revision {
340                        let _ = self.set(
341                            &entry.address,
342                            entry.value.clone(),
343                            &entry.author,
344                            Some(revision),
345                            false,
346                            false,
347                            None,
348                        );
349                        recovered += 1;
350                    }
351                }
352            }
353            tracing::info!(
354                "Replayed {} journal entries ({} were SET operations)",
355                entries.len(),
356                entries.iter().filter(|e| e.msg_type == 0x21).count()
357            );
358        }
359
360        Ok(recovered)
361    }
362
363    /// Save current state as a journal snapshot.
364    #[cfg(feature = "journal")]
365    pub async fn save_snapshot(&self) -> std::result::Result<u64, String> {
366        let journal = self
367            .journal
368            .as_ref()
369            .ok_or_else(|| "No journal configured".to_string())?;
370
371        let all_params = self.get_matching("**");
372        let snapshots: Vec<clasp_journal::ParamSnapshot> = all_params
373            .into_iter()
374            .map(|(address, state)| clasp_journal::ParamSnapshot {
375                address,
376                value: state.value,
377                revision: state.revision,
378                writer: state.writer,
379                timestamp: state.timestamp,
380            })
381            .collect();
382
383        journal
384            .snapshot(&snapshots)
385            .await
386            .map_err(|e| e.to_string())
387    }
388
389    /// Number of parameters
390    pub fn len(&self) -> usize {
391        self.params.read().len()
392    }
393
394    /// Check if empty
395    pub fn is_empty(&self) -> bool {
396        self.params.read().is_empty()
397    }
398
399    /// Clear all state
400    pub fn clear(&self) {
401        self.params.write().clear();
402    }
403}
404
405impl Default for RouterState {
406    fn default() -> Self {
407        Self::new()
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_basic_state() {
417        let state = RouterState::new();
418
419        state
420            .set(
421                "/test/value",
422                Value::Float(0.5),
423                &"session1".to_string(),
424                None,
425                false,
426                false,
427                None,
428            )
429            .unwrap();
430
431        let value = state.get("/test/value").unwrap();
432        assert_eq!(value, Value::Float(0.5));
433    }
434
435    #[test]
436    fn test_snapshot() {
437        let state = RouterState::new();
438
439        state
440            .set(
441                "/test/a",
442                Value::Float(1.0),
443                &"s1".to_string(),
444                None,
445                false,
446                false,
447                None,
448            )
449            .unwrap();
450        state
451            .set(
452                "/test/b",
453                Value::Float(2.0),
454                &"s1".to_string(),
455                None,
456                false,
457                false,
458                None,
459            )
460            .unwrap();
461        state
462            .set(
463                "/other/c",
464                Value::Float(3.0),
465                &"s1".to_string(),
466                None,
467                false,
468                false,
469                None,
470            )
471            .unwrap();
472
473        let snapshot = state.snapshot("/test/**");
474        assert_eq!(snapshot.params.len(), 2);
475    }
476
477    #[test]
478    fn test_register_signals() {
479        use clasp_core::SignalType;
480
481        let state = RouterState::new();
482
483        let signals = vec![
484            SignalDefinition {
485                address: "/test/signal1".to_string(),
486                signal_type: SignalType::Param,
487                datatype: Some("float".to_string()),
488                access: None,
489                meta: None,
490            },
491            SignalDefinition {
492                address: "/test/signal2".to_string(),
493                signal_type: SignalType::Event,
494                datatype: Some("bool".to_string()),
495                access: None,
496                meta: None,
497            },
498        ];
499
500        state.register_signals(signals);
501        assert_eq!(state.signal_count(), 2);
502
503        let queried = state.query_signals("/test/**");
504        assert_eq!(queried.len(), 2);
505    }
506
507    #[test]
508    fn test_cleanup_stale_signals() {
509        use clasp_core::SignalType;
510
511        let config = RouterStateConfig {
512            param_config: StateStoreConfig::unlimited(),
513            signal_ttl: Some(Duration::from_millis(10)),
514            max_signals: None,
515        };
516        let state = RouterState::with_config(config);
517
518        let signals = vec![SignalDefinition {
519            address: "/test/signal".to_string(),
520            signal_type: SignalType::Param,
521            datatype: Some("float".to_string()),
522            access: None,
523            meta: None,
524        }];
525
526        state.register_signals(signals);
527        assert_eq!(state.signal_count(), 1);
528
529        // Immediate cleanup should remove nothing
530        let removed = state.cleanup_stale_signals(Duration::from_millis(10));
531        assert_eq!(removed, 0);
532
533        // Wait and cleanup
534        std::thread::sleep(Duration::from_millis(15));
535        let removed = state.cleanup_stale_signals(Duration::from_millis(10));
536        assert_eq!(removed, 1);
537        assert_eq!(state.signal_count(), 0);
538    }
539
540    #[test]
541    fn test_cleanup_stale_all() {
542        use clasp_core::SignalType;
543
544        let config = RouterStateConfig {
545            param_config: StateStoreConfig::with_limits(1000, 1), // 1 second TTL
546            signal_ttl: Some(Duration::from_millis(10)),
547            max_signals: None,
548        };
549        let state = RouterState::with_config(config);
550
551        // Add a param and signal
552        state
553            .set(
554                "/test/param",
555                Value::Float(1.0),
556                &"s1".to_string(),
557                None,
558                false,
559                false,
560                None,
561            )
562            .unwrap();
563
564        let signals = vec![SignalDefinition {
565            address: "/test/signal".to_string(),
566            signal_type: SignalType::Param,
567            datatype: Some("float".to_string()),
568            access: None,
569            meta: None,
570        }];
571        state.register_signals(signals);
572
573        assert_eq!(state.len(), 1);
574        assert_eq!(state.signal_count(), 1);
575
576        // Wait for signal TTL to expire
577        std::thread::sleep(Duration::from_millis(15));
578        let (params_removed, signals_removed) = state.cleanup_stale();
579
580        // Signal should be removed, param should still be there (1 second TTL)
581        assert_eq!(signals_removed, 1);
582        assert_eq!(params_removed, 0);
583        assert_eq!(state.signal_count(), 0);
584        assert_eq!(state.len(), 1);
585    }
586}