1use clasp_core::state::{ParamState, StateStore, StateStoreConfig, UpdateError};
4use clasp_core::{ParamValue, SetMessage, SignalDefinition, SnapshotMessage, Value};
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9#[cfg(feature = "journal")]
10use clasp_core::SignalType;
11#[cfg(feature = "journal")]
12use clasp_journal::{Journal, JournalEntry};
13#[cfg(feature = "journal")]
14use std::sync::Arc;
15
16use crate::SessionId;
17
18#[derive(Debug, Clone)]
20pub struct SignalEntry {
21 pub definition: SignalDefinition,
23 pub registered_at: Instant,
25 pub last_accessed: Instant,
27}
28
29#[derive(Debug, Clone)]
31pub struct RouterStateConfig {
32 pub param_config: StateStoreConfig,
34 pub signal_ttl: Option<Duration>,
36 pub max_signals: Option<usize>,
38}
39
40impl Default for RouterStateConfig {
41 fn default() -> Self {
42 Self {
43 param_config: StateStoreConfig::default(),
44 signal_ttl: Some(Duration::from_secs(3600)), max_signals: Some(10_000),
46 }
47 }
48}
49
50impl RouterStateConfig {
51 pub fn unlimited() -> Self {
53 Self {
54 param_config: StateStoreConfig::unlimited(),
55 signal_ttl: None,
56 max_signals: None,
57 }
58 }
59}
60
61type ListenerFn = Box<dyn Fn(&str, &Value) + Send + Sync>;
63
64pub struct RouterState {
66 params: RwLock<StateStore>,
68 listeners: DashMap<String, Vec<ListenerFn>>,
70 signals: DashMap<String, SignalEntry>,
72 config: RouterStateConfig,
74 #[cfg(feature = "journal")]
76 journal: Option<Arc<dyn Journal>>,
77}
78
79impl RouterState {
80 pub fn new() -> Self {
81 Self::with_config(RouterStateConfig::unlimited())
82 }
83
84 pub fn with_config(config: RouterStateConfig) -> Self {
86 Self {
87 params: RwLock::new(StateStore::with_config(config.param_config.clone())),
88 listeners: DashMap::new(),
89 signals: DashMap::new(),
90 config,
91 #[cfg(feature = "journal")]
92 journal: None,
93 }
94 }
95
96 #[cfg(feature = "journal")]
98 pub fn set_journal(&mut self, journal: Arc<dyn Journal>) {
99 self.journal = Some(journal);
100 }
101
102 #[cfg(feature = "journal")]
104 pub fn journal(&self) -> Option<&Arc<dyn Journal>> {
105 self.journal.as_ref()
106 }
107
108 pub fn register_signals(&self, signals: Vec<SignalDefinition>) {
110 let now = Instant::now();
111 for signal in signals {
112 let address = signal.address.clone();
113 self.signals.insert(
114 address,
115 SignalEntry {
116 definition: signal,
117 registered_at: now,
118 last_accessed: now,
119 },
120 );
121 }
122 }
123
124 pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
126 self.signals
127 .iter()
128 .filter(|entry| clasp_core::address::glob_match(pattern, entry.key()))
129 .map(|entry| entry.value().definition.clone())
130 .collect()
131 }
132
133 pub fn all_signals(&self) -> Vec<SignalDefinition> {
135 self.signals
136 .iter()
137 .map(|entry| entry.value().definition.clone())
138 .collect()
139 }
140
141 pub fn signal_count(&self) -> usize {
143 self.signals.len()
144 }
145
146 pub fn cleanup_stale_signals(&self, ttl: Duration) -> usize {
149 let now = Instant::now();
150 let before = self.signals.len();
151 self.signals
152 .retain(|_, entry| now.duration_since(entry.last_accessed) < ttl);
153 before - self.signals.len()
154 }
155
156 pub fn cleanup_stale_params(&self, ttl: Duration) -> usize {
159 self.params.write().cleanup_stale(ttl)
160 }
161
162 pub fn cleanup_stale(&self) -> (usize, usize) {
165 let params_removed = if let Some(ttl) = self.config.param_config.param_ttl {
166 self.params.write().cleanup_stale(ttl)
167 } else {
168 0
169 };
170
171 let signals_removed = if let Some(ttl) = self.config.signal_ttl {
172 self.cleanup_stale_signals(ttl)
173 } else {
174 0
175 };
176
177 (params_removed, signals_removed)
178 }
179
180 pub fn get(&self, address: &str) -> Option<Value> {
182 self.params.read().get_value(address).cloned()
183 }
184
185 pub fn get_state(&self, address: &str) -> Option<ParamState> {
187 self.params.read().get(address).cloned()
188 }
189
190 pub fn set(
192 &self,
193 address: &str,
194 value: Value,
195 writer: &SessionId,
196 revision: Option<u64>,
197 lock: bool,
198 unlock: bool,
199 ttl: Option<clasp_core::Ttl>,
200 ) -> Result<u64, UpdateError> {
201 let result =
202 self.params
203 .write()
204 .set(address, value.clone(), writer, revision, lock, unlock, ttl)?;
205
206 if let Some(listeners) = self.listeners.get(address) {
208 for listener in listeners.iter() {
209 listener(address, &value);
210 }
211 }
212
213 Ok(result)
214 }
215
216 pub fn apply_set(&self, msg: &SetMessage, writer: &SessionId) -> Result<u64, UpdateError> {
218 let result = self.set(
219 &msg.address,
220 msg.value.clone(),
221 writer,
222 msg.revision,
223 msg.lock,
224 msg.unlock,
225 msg.ttl,
226 )?;
227
228 #[cfg(feature = "journal")]
230 if let Some(ref journal) = self.journal {
231 let entry = JournalEntry::from_set(
232 msg.address.clone(),
233 msg.value.clone(),
234 result,
235 writer.clone(),
236 clasp_core::time::now(),
237 );
238 let journal = Arc::clone(journal);
239 tokio::spawn(async move {
240 let _ = journal.append(entry).await;
241 });
242 }
243
244 Ok(result)
245 }
246
247 #[cfg(feature = "journal")]
249 pub fn journal_publish(
250 &self,
251 address: &str,
252 signal_type: SignalType,
253 value: Option<&Value>,
254 author: &str,
255 ) {
256 if let Some(ref journal) = self.journal {
257 let entry = JournalEntry::from_publish(
258 address.to_string(),
259 signal_type,
260 value.cloned().unwrap_or(Value::Null),
261 author.to_string(),
262 clasp_core::time::now(),
263 );
264 let journal = Arc::clone(journal);
265 tokio::spawn(async move {
266 let _ = journal.append(entry).await;
267 });
268 }
269 }
270
271 pub fn get_matching(&self, pattern: &str) -> Vec<(String, ParamState)> {
273 self.params
274 .read()
275 .get_matching(pattern)
276 .into_iter()
277 .map(|(k, v)| (k.to_string(), v.clone()))
278 .collect()
279 }
280
281 pub fn snapshot(&self, pattern: &str) -> SnapshotMessage {
283 let params: Vec<ParamValue> = self
284 .get_matching(pattern)
285 .into_iter()
286 .map(|(address, state)| ParamValue {
287 address,
288 value: state.value,
289 revision: state.revision,
290 writer: Some(state.writer),
291 timestamp: Some(state.timestamp),
292 })
293 .collect();
294
295 SnapshotMessage { params }
296 }
297
298 pub fn full_snapshot(&self) -> SnapshotMessage {
300 self.snapshot("**")
301 }
302
303 #[cfg(feature = "journal")]
308 pub async fn recover_from_journal(&self) -> std::result::Result<usize, String> {
309 let journal = self
310 .journal
311 .as_ref()
312 .ok_or_else(|| "No journal configured".to_string())?;
313
314 let mut recovered = 0;
315
316 if let Ok(Some(snapshots)) = journal.load_snapshot().await {
318 for snap in &snapshots {
319 let _ = self.set(
320 &snap.address,
321 snap.value.clone(),
322 &snap.writer,
323 Some(snap.revision),
324 false,
325 false,
326 None,
327 );
328 recovered += 1;
329 }
330 tracing::info!("Recovered {} params from journal snapshot", recovered);
331 }
332
333 if let Ok(entries) = journal.since(0, None).await {
336 for entry in &entries {
337 if entry.msg_type == 0x21 {
338 if let Some(revision) = entry.revision {
340 let _ = self.set(
341 &entry.address,
342 entry.value.clone(),
343 &entry.author,
344 Some(revision),
345 false,
346 false,
347 None,
348 );
349 recovered += 1;
350 }
351 }
352 }
353 tracing::info!(
354 "Replayed {} journal entries ({} were SET operations)",
355 entries.len(),
356 entries.iter().filter(|e| e.msg_type == 0x21).count()
357 );
358 }
359
360 Ok(recovered)
361 }
362
363 #[cfg(feature = "journal")]
365 pub async fn save_snapshot(&self) -> std::result::Result<u64, String> {
366 let journal = self
367 .journal
368 .as_ref()
369 .ok_or_else(|| "No journal configured".to_string())?;
370
371 let all_params = self.get_matching("**");
372 let snapshots: Vec<clasp_journal::ParamSnapshot> = all_params
373 .into_iter()
374 .map(|(address, state)| clasp_journal::ParamSnapshot {
375 address,
376 value: state.value,
377 revision: state.revision,
378 writer: state.writer,
379 timestamp: state.timestamp,
380 })
381 .collect();
382
383 journal
384 .snapshot(&snapshots)
385 .await
386 .map_err(|e| e.to_string())
387 }
388
389 pub fn len(&self) -> usize {
391 self.params.read().len()
392 }
393
394 pub fn is_empty(&self) -> bool {
396 self.params.read().is_empty()
397 }
398
399 pub fn clear(&self) {
401 self.params.write().clear();
402 }
403}
404
405impl Default for RouterState {
406 fn default() -> Self {
407 Self::new()
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_basic_state() {
417 let state = RouterState::new();
418
419 state
420 .set(
421 "/test/value",
422 Value::Float(0.5),
423 &"session1".to_string(),
424 None,
425 false,
426 false,
427 None,
428 )
429 .unwrap();
430
431 let value = state.get("/test/value").unwrap();
432 assert_eq!(value, Value::Float(0.5));
433 }
434
435 #[test]
436 fn test_snapshot() {
437 let state = RouterState::new();
438
439 state
440 .set(
441 "/test/a",
442 Value::Float(1.0),
443 &"s1".to_string(),
444 None,
445 false,
446 false,
447 None,
448 )
449 .unwrap();
450 state
451 .set(
452 "/test/b",
453 Value::Float(2.0),
454 &"s1".to_string(),
455 None,
456 false,
457 false,
458 None,
459 )
460 .unwrap();
461 state
462 .set(
463 "/other/c",
464 Value::Float(3.0),
465 &"s1".to_string(),
466 None,
467 false,
468 false,
469 None,
470 )
471 .unwrap();
472
473 let snapshot = state.snapshot("/test/**");
474 assert_eq!(snapshot.params.len(), 2);
475 }
476
477 #[test]
478 fn test_register_signals() {
479 use clasp_core::SignalType;
480
481 let state = RouterState::new();
482
483 let signals = vec![
484 SignalDefinition {
485 address: "/test/signal1".to_string(),
486 signal_type: SignalType::Param,
487 datatype: Some("float".to_string()),
488 access: None,
489 meta: None,
490 },
491 SignalDefinition {
492 address: "/test/signal2".to_string(),
493 signal_type: SignalType::Event,
494 datatype: Some("bool".to_string()),
495 access: None,
496 meta: None,
497 },
498 ];
499
500 state.register_signals(signals);
501 assert_eq!(state.signal_count(), 2);
502
503 let queried = state.query_signals("/test/**");
504 assert_eq!(queried.len(), 2);
505 }
506
507 #[test]
508 fn test_cleanup_stale_signals() {
509 use clasp_core::SignalType;
510
511 let config = RouterStateConfig {
512 param_config: StateStoreConfig::unlimited(),
513 signal_ttl: Some(Duration::from_millis(10)),
514 max_signals: None,
515 };
516 let state = RouterState::with_config(config);
517
518 let signals = vec![SignalDefinition {
519 address: "/test/signal".to_string(),
520 signal_type: SignalType::Param,
521 datatype: Some("float".to_string()),
522 access: None,
523 meta: None,
524 }];
525
526 state.register_signals(signals);
527 assert_eq!(state.signal_count(), 1);
528
529 let removed = state.cleanup_stale_signals(Duration::from_millis(10));
531 assert_eq!(removed, 0);
532
533 std::thread::sleep(Duration::from_millis(15));
535 let removed = state.cleanup_stale_signals(Duration::from_millis(10));
536 assert_eq!(removed, 1);
537 assert_eq!(state.signal_count(), 0);
538 }
539
540 #[test]
541 fn test_cleanup_stale_all() {
542 use clasp_core::SignalType;
543
544 let config = RouterStateConfig {
545 param_config: StateStoreConfig::with_limits(1000, 1), signal_ttl: Some(Duration::from_millis(10)),
547 max_signals: None,
548 };
549 let state = RouterState::with_config(config);
550
551 state
553 .set(
554 "/test/param",
555 Value::Float(1.0),
556 &"s1".to_string(),
557 None,
558 false,
559 false,
560 None,
561 )
562 .unwrap();
563
564 let signals = vec![SignalDefinition {
565 address: "/test/signal".to_string(),
566 signal_type: SignalType::Param,
567 datatype: Some("float".to_string()),
568 access: None,
569 meta: None,
570 }];
571 state.register_signals(signals);
572
573 assert_eq!(state.len(), 1);
574 assert_eq!(state.signal_count(), 1);
575
576 std::thread::sleep(Duration::from_millis(15));
578 let (params_removed, signals_removed) = state.cleanup_stale();
579
580 assert_eq!(signals_removed, 1);
582 assert_eq!(params_removed, 0);
583 assert_eq!(state.signal_count(), 0);
584 assert_eq!(state.len(), 1);
585 }
586}