Skip to main content

clasp_core/
state.rs

1//! State management for Clasp params
2//!
3//! Provides conflict resolution and revision tracking for stateful parameters.
4
5use crate::{ConflictStrategy, Ttl, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9/// Eviction strategy when the state store reaches capacity
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12    /// Evict least recently accessed entries (default)
13    #[default]
14    Lru,
15    /// Evict oldest entries by creation time
16    OldestFirst,
17    /// Reject new entries when at capacity
18    RejectNew,
19}
20
21/// Configuration for state store limits
22#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24    /// Maximum number of parameters (None = unlimited)
25    pub max_params: Option<usize>,
26    /// Time-to-live for parameters without access (None = never expire)
27    pub param_ttl: Option<Duration>,
28    /// Strategy for eviction when at capacity
29    pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33    fn default() -> Self {
34        Self {
35            max_params: Some(10_000),
36            param_ttl: Some(Duration::from_secs(3600)), // 1 hour
37            eviction: EvictionStrategy::Lru,
38        }
39    }
40}
41
42impl StateStoreConfig {
43    /// Create config with no limits (for backwards compatibility)
44    pub fn unlimited() -> Self {
45        Self {
46            max_params: None,
47            param_ttl: None,
48            eviction: EvictionStrategy::Lru,
49        }
50    }
51
52    /// Create config with custom limits
53    pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54        Self {
55            max_params: Some(max_params),
56            param_ttl: Some(Duration::from_secs(ttl_secs)),
57            eviction: EvictionStrategy::Lru,
58        }
59    }
60}
61
62/// State of a single parameter
63#[derive(Debug, Clone)]
64pub struct ParamState {
65    /// Current value
66    pub value: Value,
67    /// Monotonic revision number
68    pub revision: u64,
69    /// Session ID of last writer
70    pub writer: String,
71    /// Timestamp of last write (microseconds)
72    pub timestamp: u64,
73    /// Timestamp of last access (microseconds) - for TTL eviction
74    pub last_accessed: u64,
75    /// Conflict resolution strategy
76    pub strategy: ConflictStrategy,
77    /// Lock holder (if locked)
78    pub lock_holder: Option<String>,
79    /// Metadata
80    pub meta: Option<ParamMeta>,
81    /// Origin router ID (for federation loop prevention)
82    pub origin: Option<String>,
83    /// Per-param TTL (overrides server-wide TTL when set)
84    pub ttl: Option<Ttl>,
85}
86
87/// Parameter metadata
88#[derive(Debug, Clone)]
89pub struct ParamMeta {
90    pub unit: Option<String>,
91    pub range: Option<(f64, f64)>,
92    pub default: Option<Value>,
93}
94
95impl ParamState {
96    /// Create a new param state
97    pub fn new(value: Value, writer: String) -> Self {
98        let now = current_timestamp();
99        Self {
100            value,
101            revision: 1,
102            writer,
103            timestamp: now,
104            last_accessed: now,
105            strategy: ConflictStrategy::Lww,
106            lock_holder: None,
107            meta: None,
108            origin: None,
109            ttl: None,
110        }
111    }
112
113    /// Update the last_accessed timestamp
114    pub fn touch(&mut self) {
115        self.last_accessed = current_timestamp();
116    }
117
118    /// Create with specific strategy
119    pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
120        self.strategy = strategy;
121        self
122    }
123
124    /// Create with metadata
125    pub fn with_meta(mut self, meta: ParamMeta) -> Self {
126        self.meta = Some(meta);
127        self
128    }
129
130    /// Attempt to update the value
131    ///
132    /// Returns Ok(new_revision) if update was accepted,
133    /// Err with reason if rejected.
134    pub fn try_update(
135        &mut self,
136        new_value: Value,
137        writer: &str,
138        expected_revision: Option<u64>,
139        request_lock: bool,
140        release_lock: bool,
141        ttl: Option<Ttl>,
142    ) -> Result<u64, UpdateError> {
143        let timestamp = current_timestamp();
144
145        // Check optimistic lock (if revision specified)
146        if let Some(expected) = expected_revision {
147            if expected != self.revision {
148                return Err(UpdateError::RevisionConflict {
149                    expected,
150                    actual: self.revision,
151                });
152            }
153        }
154
155        // Check lock
156        if let Some(ref holder) = self.lock_holder {
157            if holder != writer && !release_lock {
158                return Err(UpdateError::LockHeld {
159                    holder: holder.clone(),
160                });
161            }
162        }
163
164        // Handle lock release
165        if release_lock && self.lock_holder.as_deref() == Some(writer) {
166            self.lock_holder = None;
167        }
168
169        // Apply conflict resolution
170        let should_update = match self.strategy {
171            ConflictStrategy::Lww => timestamp >= self.timestamp,
172            ConflictStrategy::Max => {
173                match (&new_value, &self.value) {
174                    (Value::Float(new), Value::Float(old)) => new > old,
175                    (Value::Int(new), Value::Int(old)) => new > old,
176                    _ => true, // Fall back to LWW for non-numeric
177                }
178            }
179            ConflictStrategy::Min => match (&new_value, &self.value) {
180                (Value::Float(new), Value::Float(old)) => new < old,
181                (Value::Int(new), Value::Int(old)) => new < old,
182                _ => true,
183            },
184            ConflictStrategy::Lock => {
185                self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
186            }
187            ConflictStrategy::Merge => true, // App handles merge
188        };
189
190        if !should_update {
191            return Err(UpdateError::ConflictRejected);
192        }
193
194        // Handle lock request
195        if request_lock {
196            if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
197                return Err(UpdateError::LockHeld {
198                    holder: self.lock_holder.clone().unwrap(),
199                });
200            }
201            self.lock_holder = Some(writer.to_string());
202        }
203
204        // Apply update
205        self.value = new_value;
206        self.revision += 1;
207        self.writer = writer.to_string();
208        self.timestamp = timestamp;
209        self.last_accessed = timestamp;
210
211        // Update per-param TTL if provided
212        if let Some(t) = ttl {
213            self.ttl = Some(t);
214        }
215
216        Ok(self.revision)
217    }
218
219    /// Check if value is within range (if specified)
220    pub fn validate_range(&self, value: &Value) -> bool {
221        if let Some(meta) = &self.meta {
222            if let Some((min, max)) = meta.range {
223                if let Some(v) = value.as_f64() {
224                    return v >= min && v <= max;
225                }
226            }
227        }
228        true
229    }
230}
231
232/// Errors that can occur during state updates
233#[derive(Debug, Clone)]
234pub enum UpdateError {
235    RevisionConflict { expected: u64, actual: u64 },
236    LockHeld { holder: String },
237    ConflictRejected,
238    OutOfRange,
239    AtCapacity,
240}
241
242impl std::fmt::Display for UpdateError {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        match self {
245            Self::RevisionConflict { expected, actual } => {
246                write!(
247                    f,
248                    "Revision conflict: expected {}, got {}",
249                    expected, actual
250                )
251            }
252            Self::LockHeld { holder } => {
253                write!(f, "Parameter locked by {}", holder)
254            }
255            Self::ConflictRejected => {
256                write!(f, "Update rejected by conflict strategy")
257            }
258            Self::OutOfRange => {
259                write!(f, "Value out of allowed range")
260            }
261            Self::AtCapacity => {
262                write!(f, "State store at capacity")
263            }
264        }
265    }
266}
267
268impl std::error::Error for UpdateError {}
269
270/// Error returned when state store is at capacity
271#[derive(Debug, Clone)]
272pub struct CapacityError;
273
274impl std::fmt::Display for CapacityError {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        write!(f, "State store at capacity")
277    }
278}
279
280impl std::error::Error for CapacityError {}
281
282/// State store for multiple params
283#[derive(Debug)]
284pub struct StateStore {
285    params: HashMap<String, ParamState>,
286    config: StateStoreConfig,
287}
288
289impl Default for StateStore {
290    fn default() -> Self {
291        Self {
292            params: HashMap::new(),
293            config: StateStoreConfig::unlimited(), // Backwards compatible default
294        }
295    }
296}
297
298impl StateStore {
299    /// Create a new state store with default (unlimited) config
300    pub fn new() -> Self {
301        Self::default()
302    }
303
304    /// Create a new state store with the specified config
305    pub fn with_config(config: StateStoreConfig) -> Self {
306        Self {
307            params: HashMap::new(),
308            config,
309        }
310    }
311
312    /// Get the current configuration
313    pub fn config(&self) -> &StateStoreConfig {
314        &self.config
315    }
316
317    /// Get a param's current state (does not update last_accessed)
318    pub fn get(&self, address: &str) -> Option<&ParamState> {
319        self.params.get(address)
320    }
321
322    /// Get a param's current state and update last_accessed
323    pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
324        let param = self.params.get_mut(address)?;
325        param.touch();
326        Some(param)
327    }
328
329    /// Get a param's current value (does not update last_accessed)
330    pub fn get_value(&self, address: &str) -> Option<&Value> {
331        self.params.get(address).map(|p| &p.value)
332    }
333
334    /// Get a param's current value and update last_accessed
335    pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
336        let param = self.params.get_mut(address)?;
337        param.touch();
338        Some(&param.value)
339    }
340
341    /// Set a param value, creating if necessary
342    pub fn set(
343        &mut self,
344        address: &str,
345        value: Value,
346        writer: &str,
347        revision: Option<u64>,
348        lock: bool,
349        unlock: bool,
350        ttl: Option<Ttl>,
351    ) -> Result<u64, UpdateError> {
352        if let Some(param) = self.params.get_mut(address) {
353            param.try_update(value, writer, revision, lock, unlock, ttl)
354        } else {
355            // Check capacity before creating new param
356            if let Some(max) = self.config.max_params {
357                if self.params.len() >= max {
358                    match self.config.eviction {
359                        EvictionStrategy::RejectNew => {
360                            return Err(UpdateError::AtCapacity);
361                        }
362                        EvictionStrategy::Lru => {
363                            self.evict_lru();
364                        }
365                        EvictionStrategy::OldestFirst => {
366                            self.evict_oldest();
367                        }
368                    }
369                }
370            }
371
372            // Create new param
373            let mut param = ParamState::new(value, writer.to_string());
374            if lock {
375                param.lock_holder = Some(writer.to_string());
376            }
377            param.ttl = ttl;
378            let rev = param.revision;
379            self.params.insert(address.to_string(), param);
380            Ok(rev)
381        }
382    }
383
384    /// Evict the least recently accessed param
385    fn evict_lru(&mut self) {
386        if let Some(oldest_key) = self
387            .params
388            .iter()
389            .min_by_key(|(_, v)| v.last_accessed)
390            .map(|(k, _)| k.clone())
391        {
392            self.params.remove(&oldest_key);
393        }
394    }
395
396    /// Evict the oldest param by creation time (lowest revision is oldest)
397    fn evict_oldest(&mut self) {
398        if let Some(oldest_key) = self
399            .params
400            .iter()
401            .min_by_key(|(_, v)| v.timestamp)
402            .map(|(k, _)| k.clone())
403        {
404            self.params.remove(&oldest_key);
405        }
406    }
407
408    /// Remove params that haven't been accessed within the TTL
409    /// Returns the number of params removed
410    pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
411        let now = current_timestamp();
412        let global_ttl_micros = ttl.as_micros() as u64;
413
414        let before = self.params.len();
415        self.params.retain(|_, v| {
416            match v.ttl {
417                Some(Ttl::Never) => true,
418                Some(Ttl::Sliding(secs)) => {
419                    let cutoff = now.saturating_sub(secs as u64 * 1_000_000);
420                    v.last_accessed >= cutoff
421                }
422                Some(Ttl::Absolute(secs)) => {
423                    let expires_at = v.timestamp.saturating_add(secs as u64 * 1_000_000);
424                    now < expires_at
425                }
426                None => {
427                    let cutoff = now.saturating_sub(global_ttl_micros);
428                    v.last_accessed >= cutoff
429                }
430            }
431        });
432        before - self.params.len()
433    }
434
435    /// Run cleanup using the configured TTL (if any)
436    /// Returns the number of params removed
437    pub fn cleanup_stale_with_config(&mut self) -> usize {
438        if let Some(ttl) = self.config.param_ttl {
439            self.cleanup_stale(ttl)
440        } else {
441            0
442        }
443    }
444
445    /// Get all params matching a pattern
446    pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
447        use crate::address::glob_match;
448
449        self.params
450            .iter()
451            .filter(|(addr, _)| glob_match(pattern, addr))
452            .map(|(addr, state)| (addr.as_str(), state))
453            .collect()
454    }
455
456    /// Get all params as a snapshot
457    pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
458        self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
459    }
460
461    /// Number of params
462    pub fn len(&self) -> usize {
463        self.params.len()
464    }
465
466    /// Check if empty
467    pub fn is_empty(&self) -> bool {
468        self.params.is_empty()
469    }
470
471    /// Remove a param
472    pub fn remove(&mut self, address: &str) -> Option<ParamState> {
473        self.params.remove(address)
474    }
475
476    /// Clear all params
477    pub fn clear(&mut self) {
478        self.params.clear();
479    }
480}
481
482/// Get current timestamp in microseconds
483fn current_timestamp() -> u64 {
484    SystemTime::now()
485        .duration_since(UNIX_EPOCH)
486        .map(|d| d.as_micros() as u64)
487        .unwrap_or(0)
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn test_basic_update() {
496        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
497
498        let result = state.try_update(Value::Float(0.75), "session2", None, false, false, None);
499
500        assert!(result.is_ok());
501        assert_eq!(state.revision, 2);
502        assert_eq!(state.value, Value::Float(0.75));
503        assert_eq!(state.writer, "session2");
504    }
505
506    #[test]
507    fn test_revision_conflict() {
508        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
509
510        let result = state.try_update(
511            Value::Float(0.75),
512            "session2",
513            Some(999), // Wrong revision
514            false,
515            false,
516            None,
517        );
518
519        assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
520    }
521
522    #[test]
523    fn test_locking() {
524        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
525
526        // Session 1 takes lock
527        let result = state.try_update(
528            Value::Float(0.6),
529            "session1",
530            None,
531            true, // Request lock
532            false,
533            None,
534        );
535        assert!(result.is_ok());
536        assert_eq!(state.lock_holder, Some("session1".to_string()));
537
538        // Session 2 tries to update - should fail
539        let result = state.try_update(Value::Float(0.7), "session2", None, false, false, None);
540        assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
541
542        // Session 1 can still update
543        let result = state.try_update(Value::Float(0.8), "session1", None, false, false, None);
544        assert!(result.is_ok());
545    }
546
547    #[test]
548    fn test_max_strategy() {
549        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
550            .with_strategy(ConflictStrategy::Max);
551
552        // Higher value wins
553        let result = state.try_update(Value::Float(0.8), "session2", None, false, false, None);
554        assert!(result.is_ok());
555        assert_eq!(state.value, Value::Float(0.8));
556
557        // Lower value rejected
558        let result = state.try_update(Value::Float(0.3), "session3", None, false, false, None);
559        assert!(matches!(result, Err(UpdateError::ConflictRejected)));
560        assert_eq!(state.value, Value::Float(0.8)); // Unchanged
561    }
562
563    #[test]
564    fn test_state_store() {
565        let mut store = StateStore::new();
566
567        store
568            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
569            .unwrap();
570        store
571            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
572            .unwrap();
573        store
574            .set("/other/c", Value::Float(3.0), "s1", None, false, false, None)
575            .unwrap();
576
577        assert_eq!(store.len(), 3);
578
579        let matching = store.get_matching("/test/*");
580        assert_eq!(matching.len(), 2);
581    }
582
583    #[test]
584    fn test_state_store_capacity_reject() {
585        let config = StateStoreConfig {
586            max_params: Some(2),
587            param_ttl: None,
588            eviction: EvictionStrategy::RejectNew,
589        };
590        let mut store = StateStore::with_config(config);
591
592        store
593            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
594            .unwrap();
595        store
596            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
597            .unwrap();
598
599        // Third should fail
600        let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false, None);
601        assert!(matches!(result, Err(UpdateError::AtCapacity)));
602        assert_eq!(store.len(), 2);
603
604        // Updating existing should still work
605        store
606            .set("/test/a", Value::Float(1.5), "s1", None, false, false, None)
607            .unwrap();
608        assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
609    }
610
611    #[test]
612    fn test_state_store_capacity_lru_eviction() {
613        let config = StateStoreConfig {
614            max_params: Some(2),
615            param_ttl: None,
616            eviction: EvictionStrategy::Lru,
617        };
618        let mut store = StateStore::with_config(config);
619
620        store
621            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
622            .unwrap();
623        std::thread::sleep(std::time::Duration::from_millis(1));
624        store
625            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
626            .unwrap();
627
628        // Access /test/a to make it more recent
629        std::thread::sleep(std::time::Duration::from_millis(1));
630        store.get_mut("/test/a");
631
632        // Third should evict /test/b (least recently accessed)
633        store
634            .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
635            .unwrap();
636
637        assert_eq!(store.len(), 2);
638        assert!(store.get("/test/a").is_some());
639        assert!(store.get("/test/b").is_none()); // Evicted
640        assert!(store.get("/test/c").is_some());
641    }
642
643    #[test]
644    fn test_state_store_capacity_oldest_eviction() {
645        let config = StateStoreConfig {
646            max_params: Some(2),
647            param_ttl: None,
648            eviction: EvictionStrategy::OldestFirst,
649        };
650        let mut store = StateStore::with_config(config);
651
652        store
653            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
654            .unwrap();
655        std::thread::sleep(std::time::Duration::from_millis(1));
656        store
657            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
658            .unwrap();
659
660        // Third should evict /test/a (oldest)
661        store
662            .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
663            .unwrap();
664
665        assert_eq!(store.len(), 2);
666        assert!(store.get("/test/a").is_none()); // Evicted
667        assert!(store.get("/test/b").is_some());
668        assert!(store.get("/test/c").is_some());
669    }
670
671    #[test]
672    fn test_state_store_cleanup_stale() {
673        let mut store = StateStore::new();
674
675        store
676            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
677            .unwrap();
678        store
679            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
680            .unwrap();
681
682        // Sleep a bit, then access /test/a
683        std::thread::sleep(std::time::Duration::from_millis(10));
684        store.get_mut("/test/a");
685
686        // Cleanup with a very short TTL - should remove /test/b but not /test/a
687        let removed = store.cleanup_stale(Duration::from_millis(5));
688        assert_eq!(removed, 1);
689        assert!(store.get("/test/a").is_some());
690        assert!(store.get("/test/b").is_none());
691    }
692
693    #[test]
694    fn test_state_store_cleanup_stale_with_config() {
695        let config = StateStoreConfig {
696            max_params: None,
697            param_ttl: Some(Duration::from_millis(5)),
698            eviction: EvictionStrategy::Lru,
699        };
700        let mut store = StateStore::with_config(config);
701
702        store
703            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
704            .unwrap();
705
706        // Immediate cleanup should remove nothing
707        let removed = store.cleanup_stale_with_config();
708        assert_eq!(removed, 0);
709
710        // Wait and cleanup
711        std::thread::sleep(std::time::Duration::from_millis(10));
712        let removed = store.cleanup_stale_with_config();
713        assert_eq!(removed, 1);
714        assert!(store.is_empty());
715    }
716
717    #[test]
718    fn test_last_accessed_tracking() {
719        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
720        let initial_accessed = state.last_accessed;
721
722        std::thread::sleep(std::time::Duration::from_millis(1));
723        state.touch();
724
725        assert!(state.last_accessed > initial_accessed);
726    }
727}