use clasp_core::state::{ParamState, StateStore, StateStoreConfig, UpdateError};
use clasp_core::{ParamValue, SetMessage, SignalDefinition, SnapshotMessage, Value};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::time::{Duration, Instant};
use crate::SessionId;
#[derive(Debug, Clone)]
pub struct SignalEntry {
pub definition: SignalDefinition,
pub registered_at: Instant,
pub last_accessed: Instant,
}
#[derive(Debug, Clone)]
pub struct RouterStateConfig {
pub param_config: StateStoreConfig,
pub signal_ttl: Option<Duration>,
pub max_signals: Option<usize>,
}
impl Default for RouterStateConfig {
fn default() -> Self {
Self {
param_config: StateStoreConfig::default(),
signal_ttl: Some(Duration::from_secs(3600)), max_signals: Some(10_000),
}
}
}
impl RouterStateConfig {
pub fn unlimited() -> Self {
Self {
param_config: StateStoreConfig::unlimited(),
signal_ttl: None,
max_signals: None,
}
}
}
pub struct RouterState {
params: RwLock<StateStore>,
listeners: DashMap<String, Vec<Box<dyn Fn(&str, &Value) + Send + Sync>>>,
signals: DashMap<String, SignalEntry>,
config: RouterStateConfig,
}
impl RouterState {
pub fn new() -> Self {
Self::with_config(RouterStateConfig::unlimited())
}
pub fn with_config(config: RouterStateConfig) -> Self {
Self {
params: RwLock::new(StateStore::with_config(config.param_config.clone())),
listeners: DashMap::new(),
signals: DashMap::new(),
config,
}
}
pub fn register_signals(&self, signals: Vec<SignalDefinition>) {
let now = Instant::now();
for signal in signals {
let address = signal.address.clone();
self.signals.insert(
address,
SignalEntry {
definition: signal,
registered_at: now,
last_accessed: now,
},
);
}
}
pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
self.signals
.iter()
.filter(|entry| clasp_core::address::glob_match(pattern, entry.key()))
.map(|entry| entry.value().definition.clone())
.collect()
}
pub fn all_signals(&self) -> Vec<SignalDefinition> {
self.signals
.iter()
.map(|entry| entry.value().definition.clone())
.collect()
}
pub fn signal_count(&self) -> usize {
self.signals.len()
}
pub fn cleanup_stale_signals(&self, ttl: Duration) -> usize {
let now = Instant::now();
let before = self.signals.len();
self.signals
.retain(|_, entry| now.duration_since(entry.last_accessed) < ttl);
before - self.signals.len()
}
pub fn cleanup_stale_params(&self, ttl: Duration) -> usize {
self.params.write().cleanup_stale(ttl)
}
pub fn cleanup_stale(&self) -> (usize, usize) {
let params_removed = if let Some(ttl) = self.config.param_config.param_ttl {
self.params.write().cleanup_stale(ttl)
} else {
0
};
let signals_removed = if let Some(ttl) = self.config.signal_ttl {
self.cleanup_stale_signals(ttl)
} else {
0
};
(params_removed, signals_removed)
}
pub fn get(&self, address: &str) -> Option<Value> {
self.params.read().get_value(address).cloned()
}
pub fn get_state(&self, address: &str) -> Option<ParamState> {
self.params.read().get(address).cloned()
}
pub fn set(
&self,
address: &str,
value: Value,
writer: &SessionId,
revision: Option<u64>,
lock: bool,
unlock: bool,
) -> Result<u64, UpdateError> {
let result =
self.params
.write()
.set(address, value.clone(), writer, revision, lock, unlock)?;
if let Some(listeners) = self.listeners.get(address) {
for listener in listeners.iter() {
listener(address, &value);
}
}
Ok(result)
}
pub fn apply_set(&self, msg: &SetMessage, writer: &SessionId) -> Result<u64, UpdateError> {
self.set(
&msg.address,
msg.value.clone(),
writer,
msg.revision,
msg.lock,
msg.unlock,
)
}
pub fn get_matching(&self, pattern: &str) -> Vec<(String, ParamState)> {
self.params
.read()
.get_matching(pattern)
.into_iter()
.map(|(k, v)| (k.to_string(), v.clone()))
.collect()
}
pub fn snapshot(&self, pattern: &str) -> SnapshotMessage {
let params: Vec<ParamValue> = self
.get_matching(pattern)
.into_iter()
.map(|(address, state)| ParamValue {
address,
value: state.value,
revision: state.revision,
writer: Some(state.writer),
timestamp: Some(state.timestamp),
})
.collect();
SnapshotMessage { params }
}
pub fn full_snapshot(&self) -> SnapshotMessage {
self.snapshot("**")
}
pub fn len(&self) -> usize {
self.params.read().len()
}
pub fn is_empty(&self) -> bool {
self.params.read().is_empty()
}
pub fn clear(&self) {
self.params.write().clear();
}
}
impl Default for RouterState {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_state() {
let state = RouterState::new();
state
.set(
"/test/value",
Value::Float(0.5),
&"session1".to_string(),
None,
false,
false,
)
.unwrap();
let value = state.get("/test/value").unwrap();
assert_eq!(value, Value::Float(0.5));
}
#[test]
fn test_snapshot() {
let state = RouterState::new();
state
.set(
"/test/a",
Value::Float(1.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
state
.set(
"/test/b",
Value::Float(2.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
state
.set(
"/other/c",
Value::Float(3.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
let snapshot = state.snapshot("/test/**");
assert_eq!(snapshot.params.len(), 2);
}
#[test]
fn test_register_signals() {
use clasp_core::SignalType;
let state = RouterState::new();
let signals = vec![
SignalDefinition {
address: "/test/signal1".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
},
SignalDefinition {
address: "/test/signal2".to_string(),
signal_type: SignalType::Event,
datatype: Some("bool".to_string()),
access: None,
meta: None,
},
];
state.register_signals(signals);
assert_eq!(state.signal_count(), 2);
let queried = state.query_signals("/test/**");
assert_eq!(queried.len(), 2);
}
#[test]
fn test_cleanup_stale_signals() {
use clasp_core::SignalType;
let config = RouterStateConfig {
param_config: StateStoreConfig::unlimited(),
signal_ttl: Some(Duration::from_millis(10)),
max_signals: None,
};
let state = RouterState::with_config(config);
let signals = vec![SignalDefinition {
address: "/test/signal".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
}];
state.register_signals(signals);
assert_eq!(state.signal_count(), 1);
let removed = state.cleanup_stale_signals(Duration::from_millis(10));
assert_eq!(removed, 0);
std::thread::sleep(Duration::from_millis(15));
let removed = state.cleanup_stale_signals(Duration::from_millis(10));
assert_eq!(removed, 1);
assert_eq!(state.signal_count(), 0);
}
#[test]
fn test_cleanup_stale_all() {
use clasp_core::SignalType;
let config = RouterStateConfig {
param_config: StateStoreConfig::with_limits(1000, 1), signal_ttl: Some(Duration::from_millis(10)),
max_signals: None,
};
let state = RouterState::with_config(config);
state
.set(
"/test/param",
Value::Float(1.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
let signals = vec![SignalDefinition {
address: "/test/signal".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
}];
state.register_signals(signals);
assert_eq!(state.len(), 1);
assert_eq!(state.signal_count(), 1);
std::thread::sleep(Duration::from_millis(15));
let (params_removed, signals_removed) = state.cleanup_stale();
assert_eq!(signals_removed, 1);
assert_eq!(params_removed, 0);
assert_eq!(state.signal_count(), 0);
assert_eq!(state.len(), 1);
}
}