use clasp_core::state::{ParamState, StateStore, StateStoreConfig, UpdateError};
use clasp_core::{ParamValue, SetMessage, SignalDefinition, SnapshotMessage, Value};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::time::{Duration, Instant};
#[cfg(feature = "journal")]
use clasp_core::SignalType;
#[cfg(feature = "journal")]
use clasp_journal::{Journal, JournalEntry};
#[cfg(feature = "journal")]
use std::sync::Arc;
use crate::SessionId;
#[derive(Debug, Clone)]
pub struct SignalEntry {
pub definition: SignalDefinition,
pub registered_at: Instant,
pub last_accessed: Instant,
}
#[derive(Debug, Clone)]
pub struct RouterStateConfig {
pub param_config: StateStoreConfig,
pub signal_ttl: Option<Duration>,
pub max_signals: Option<usize>,
}
impl Default for RouterStateConfig {
fn default() -> Self {
Self {
param_config: StateStoreConfig::default(),
signal_ttl: Some(Duration::from_secs(3600)), max_signals: Some(10_000),
}
}
}
impl RouterStateConfig {
pub fn unlimited() -> Self {
Self {
param_config: StateStoreConfig::unlimited(),
signal_ttl: None,
max_signals: None,
}
}
}
type ListenerFn = Box<dyn Fn(&str, &Value) + Send + Sync>;
pub struct RouterState {
params: RwLock<StateStore>,
listeners: DashMap<String, Vec<ListenerFn>>,
signals: DashMap<String, SignalEntry>,
config: RouterStateConfig,
#[cfg(feature = "journal")]
journal: Option<Arc<dyn Journal>>,
}
impl RouterState {
pub fn new() -> Self {
Self::with_config(RouterStateConfig::unlimited())
}
pub fn with_config(config: RouterStateConfig) -> Self {
Self {
params: RwLock::new(StateStore::with_config(config.param_config.clone())),
listeners: DashMap::new(),
signals: DashMap::new(),
config,
#[cfg(feature = "journal")]
journal: None,
}
}
#[cfg(feature = "journal")]
pub fn set_journal(&mut self, journal: Arc<dyn Journal>) {
self.journal = Some(journal);
}
#[cfg(feature = "journal")]
pub fn journal(&self) -> Option<&Arc<dyn Journal>> {
self.journal.as_ref()
}
pub fn register_signals(&self, signals: Vec<SignalDefinition>) {
let now = Instant::now();
for signal in signals {
let address = signal.address.clone();
self.signals.insert(
address,
SignalEntry {
definition: signal,
registered_at: now,
last_accessed: now,
},
);
}
}
pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
self.signals
.iter()
.filter(|entry| clasp_core::address::glob_match(pattern, entry.key()))
.map(|entry| entry.value().definition.clone())
.collect()
}
pub fn all_signals(&self) -> Vec<SignalDefinition> {
self.signals
.iter()
.map(|entry| entry.value().definition.clone())
.collect()
}
pub fn signal_count(&self) -> usize {
self.signals.len()
}
pub fn cleanup_stale_signals(&self, ttl: Duration) -> usize {
let now = Instant::now();
let before = self.signals.len();
self.signals
.retain(|_, entry| now.duration_since(entry.last_accessed) < ttl);
before - self.signals.len()
}
pub fn cleanup_stale_params(&self, ttl: Duration) -> usize {
self.params.write().cleanup_stale(ttl)
}
pub fn cleanup_stale(&self) -> (usize, usize) {
let params_removed = if let Some(ttl) = self.config.param_config.param_ttl {
self.params.write().cleanup_stale(ttl)
} else {
0
};
let signals_removed = if let Some(ttl) = self.config.signal_ttl {
self.cleanup_stale_signals(ttl)
} else {
0
};
(params_removed, signals_removed)
}
pub fn get(&self, address: &str) -> Option<Value> {
self.params.read().get_value(address).cloned()
}
pub fn get_state(&self, address: &str) -> Option<ParamState> {
self.params.read().get(address).cloned()
}
pub fn set(
&self,
address: &str,
value: Value,
writer: &SessionId,
revision: Option<u64>,
lock: bool,
unlock: bool,
) -> Result<u64, UpdateError> {
let result =
self.params
.write()
.set(address, value.clone(), writer, revision, lock, unlock)?;
if let Some(listeners) = self.listeners.get(address) {
for listener in listeners.iter() {
listener(address, &value);
}
}
Ok(result)
}
pub fn apply_set(&self, msg: &SetMessage, writer: &SessionId) -> Result<u64, UpdateError> {
let result = self.set(
&msg.address,
msg.value.clone(),
writer,
msg.revision,
msg.lock,
msg.unlock,
)?;
#[cfg(feature = "journal")]
if let Some(ref journal) = self.journal {
let entry = JournalEntry::from_set(
msg.address.clone(),
msg.value.clone(),
result,
writer.clone(),
clasp_core::time::now(),
);
let journal = Arc::clone(journal);
tokio::spawn(async move {
let _ = journal.append(entry).await;
});
}
Ok(result)
}
#[cfg(feature = "journal")]
pub fn journal_publish(
&self,
address: &str,
signal_type: SignalType,
value: Option<&Value>,
author: &str,
) {
if let Some(ref journal) = self.journal {
let entry = JournalEntry::from_publish(
address.to_string(),
signal_type,
value.cloned().unwrap_or(Value::Null),
author.to_string(),
clasp_core::time::now(),
);
let journal = Arc::clone(journal);
tokio::spawn(async move {
let _ = journal.append(entry).await;
});
}
}
pub fn get_matching(&self, pattern: &str) -> Vec<(String, ParamState)> {
self.params
.read()
.get_matching(pattern)
.into_iter()
.map(|(k, v)| (k.to_string(), v.clone()))
.collect()
}
pub fn snapshot(&self, pattern: &str) -> SnapshotMessage {
let params: Vec<ParamValue> = self
.get_matching(pattern)
.into_iter()
.map(|(address, state)| ParamValue {
address,
value: state.value,
revision: state.revision,
writer: Some(state.writer),
timestamp: Some(state.timestamp),
})
.collect();
SnapshotMessage { params }
}
pub fn full_snapshot(&self) -> SnapshotMessage {
self.snapshot("**")
}
#[cfg(feature = "journal")]
pub async fn recover_from_journal(&self) -> std::result::Result<usize, String> {
let journal = self
.journal
.as_ref()
.ok_or_else(|| "No journal configured".to_string())?;
let mut recovered = 0;
if let Ok(Some(snapshots)) = journal.load_snapshot().await {
for snap in &snapshots {
let _ = self.set(
&snap.address,
snap.value.clone(),
&snap.writer,
Some(snap.revision),
false,
false,
);
recovered += 1;
}
tracing::info!("Recovered {} params from journal snapshot", recovered);
}
if let Ok(entries) = journal.since(0, None).await {
for entry in &entries {
if entry.msg_type == 0x21 {
if let Some(revision) = entry.revision {
let _ = self.set(
&entry.address,
entry.value.clone(),
&entry.author,
Some(revision),
false,
false,
);
recovered += 1;
}
}
}
tracing::info!(
"Replayed {} journal entries ({} were SET operations)",
entries.len(),
entries.iter().filter(|e| e.msg_type == 0x21).count()
);
}
Ok(recovered)
}
#[cfg(feature = "journal")]
pub async fn save_snapshot(&self) -> std::result::Result<u64, String> {
let journal = self
.journal
.as_ref()
.ok_or_else(|| "No journal configured".to_string())?;
let all_params = self.get_matching("**");
let snapshots: Vec<clasp_journal::ParamSnapshot> = all_params
.into_iter()
.map(|(address, state)| clasp_journal::ParamSnapshot {
address,
value: state.value,
revision: state.revision,
writer: state.writer,
timestamp: state.timestamp,
})
.collect();
journal
.snapshot(&snapshots)
.await
.map_err(|e| e.to_string())
}
pub fn len(&self) -> usize {
self.params.read().len()
}
pub fn is_empty(&self) -> bool {
self.params.read().is_empty()
}
pub fn clear(&self) {
self.params.write().clear();
}
}
impl Default for RouterState {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_state() {
let state = RouterState::new();
state
.set(
"/test/value",
Value::Float(0.5),
&"session1".to_string(),
None,
false,
false,
)
.unwrap();
let value = state.get("/test/value").unwrap();
assert_eq!(value, Value::Float(0.5));
}
#[test]
fn test_snapshot() {
let state = RouterState::new();
state
.set(
"/test/a",
Value::Float(1.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
state
.set(
"/test/b",
Value::Float(2.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
state
.set(
"/other/c",
Value::Float(3.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
let snapshot = state.snapshot("/test/**");
assert_eq!(snapshot.params.len(), 2);
}
#[test]
fn test_register_signals() {
use clasp_core::SignalType;
let state = RouterState::new();
let signals = vec![
SignalDefinition {
address: "/test/signal1".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
},
SignalDefinition {
address: "/test/signal2".to_string(),
signal_type: SignalType::Event,
datatype: Some("bool".to_string()),
access: None,
meta: None,
},
];
state.register_signals(signals);
assert_eq!(state.signal_count(), 2);
let queried = state.query_signals("/test/**");
assert_eq!(queried.len(), 2);
}
#[test]
fn test_cleanup_stale_signals() {
use clasp_core::SignalType;
let config = RouterStateConfig {
param_config: StateStoreConfig::unlimited(),
signal_ttl: Some(Duration::from_millis(10)),
max_signals: None,
};
let state = RouterState::with_config(config);
let signals = vec![SignalDefinition {
address: "/test/signal".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
}];
state.register_signals(signals);
assert_eq!(state.signal_count(), 1);
let removed = state.cleanup_stale_signals(Duration::from_millis(10));
assert_eq!(removed, 0);
std::thread::sleep(Duration::from_millis(15));
let removed = state.cleanup_stale_signals(Duration::from_millis(10));
assert_eq!(removed, 1);
assert_eq!(state.signal_count(), 0);
}
#[test]
fn test_cleanup_stale_all() {
use clasp_core::SignalType;
let config = RouterStateConfig {
param_config: StateStoreConfig::with_limits(1000, 1), signal_ttl: Some(Duration::from_millis(10)),
max_signals: None,
};
let state = RouterState::with_config(config);
state
.set(
"/test/param",
Value::Float(1.0),
&"s1".to_string(),
None,
false,
false,
)
.unwrap();
let signals = vec![SignalDefinition {
address: "/test/signal".to_string(),
signal_type: SignalType::Param,
datatype: Some("float".to_string()),
access: None,
meta: None,
}];
state.register_signals(signals);
assert_eq!(state.len(), 1);
assert_eq!(state.signal_count(), 1);
std::thread::sleep(Duration::from_millis(15));
let (params_removed, signals_removed) = state.cleanup_stale();
assert_eq!(signals_removed, 1);
assert_eq!(params_removed, 0);
assert_eq!(state.signal_count(), 0);
assert_eq!(state.len(), 1);
}
}