1use crate::{ConflictStrategy, Ttl, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12 #[default]
14 Lru,
15 OldestFirst,
17 RejectNew,
19}
20
21#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24 pub max_params: Option<usize>,
26 pub param_ttl: Option<Duration>,
28 pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33 fn default() -> Self {
34 Self {
35 max_params: Some(10_000),
36 param_ttl: Some(Duration::from_secs(3600)), eviction: EvictionStrategy::Lru,
38 }
39 }
40}
41
42impl StateStoreConfig {
43 pub fn unlimited() -> Self {
45 Self {
46 max_params: None,
47 param_ttl: None,
48 eviction: EvictionStrategy::Lru,
49 }
50 }
51
52 pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54 Self {
55 max_params: Some(max_params),
56 param_ttl: Some(Duration::from_secs(ttl_secs)),
57 eviction: EvictionStrategy::Lru,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct ParamState {
65 pub value: Value,
67 pub revision: u64,
69 pub writer: String,
71 pub timestamp: u64,
73 pub last_accessed: u64,
75 pub strategy: ConflictStrategy,
77 pub lock_holder: Option<String>,
79 pub meta: Option<ParamMeta>,
81 pub origin: Option<String>,
83 pub ttl: Option<Ttl>,
85}
86
87#[derive(Debug, Clone)]
89pub struct ParamMeta {
90 pub unit: Option<String>,
91 pub range: Option<(f64, f64)>,
92 pub default: Option<Value>,
93}
94
95impl ParamState {
96 pub fn new(value: Value, writer: String) -> Self {
98 let now = current_timestamp();
99 Self {
100 value,
101 revision: 1,
102 writer,
103 timestamp: now,
104 last_accessed: now,
105 strategy: ConflictStrategy::Lww,
106 lock_holder: None,
107 meta: None,
108 origin: None,
109 ttl: None,
110 }
111 }
112
113 pub fn touch(&mut self) {
115 self.last_accessed = current_timestamp();
116 }
117
118 pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
120 self.strategy = strategy;
121 self
122 }
123
124 pub fn with_meta(mut self, meta: ParamMeta) -> Self {
126 self.meta = Some(meta);
127 self
128 }
129
130 pub fn try_update(
135 &mut self,
136 new_value: Value,
137 writer: &str,
138 expected_revision: Option<u64>,
139 request_lock: bool,
140 release_lock: bool,
141 ttl: Option<Ttl>,
142 ) -> Result<u64, UpdateError> {
143 let timestamp = current_timestamp();
144
145 if let Some(expected) = expected_revision {
147 if expected != self.revision {
148 return Err(UpdateError::RevisionConflict {
149 expected,
150 actual: self.revision,
151 });
152 }
153 }
154
155 if let Some(ref holder) = self.lock_holder {
157 if holder != writer && !release_lock {
158 return Err(UpdateError::LockHeld {
159 holder: holder.clone(),
160 });
161 }
162 }
163
164 if release_lock && self.lock_holder.as_deref() == Some(writer) {
166 self.lock_holder = None;
167 }
168
169 let should_update = match self.strategy {
171 ConflictStrategy::Lww => timestamp >= self.timestamp,
172 ConflictStrategy::Max => {
173 match (&new_value, &self.value) {
174 (Value::Float(new), Value::Float(old)) => new > old,
175 (Value::Int(new), Value::Int(old)) => new > old,
176 _ => true, }
178 }
179 ConflictStrategy::Min => match (&new_value, &self.value) {
180 (Value::Float(new), Value::Float(old)) => new < old,
181 (Value::Int(new), Value::Int(old)) => new < old,
182 _ => true,
183 },
184 ConflictStrategy::Lock => {
185 self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
186 }
187 ConflictStrategy::Merge => true, };
189
190 if !should_update {
191 return Err(UpdateError::ConflictRejected);
192 }
193
194 if request_lock {
196 if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
197 return Err(UpdateError::LockHeld {
198 holder: self.lock_holder.clone().unwrap(),
199 });
200 }
201 self.lock_holder = Some(writer.to_string());
202 }
203
204 self.value = new_value;
206 self.revision += 1;
207 self.writer = writer.to_string();
208 self.timestamp = timestamp;
209 self.last_accessed = timestamp;
210
211 if let Some(t) = ttl {
213 self.ttl = Some(t);
214 }
215
216 Ok(self.revision)
217 }
218
219 pub fn validate_range(&self, value: &Value) -> bool {
221 if let Some(meta) = &self.meta {
222 if let Some((min, max)) = meta.range {
223 if let Some(v) = value.as_f64() {
224 return v >= min && v <= max;
225 }
226 }
227 }
228 true
229 }
230}
231
232#[derive(Debug, Clone)]
234pub enum UpdateError {
235 RevisionConflict { expected: u64, actual: u64 },
236 LockHeld { holder: String },
237 ConflictRejected,
238 OutOfRange,
239 AtCapacity,
240}
241
242impl std::fmt::Display for UpdateError {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 match self {
245 Self::RevisionConflict { expected, actual } => {
246 write!(
247 f,
248 "Revision conflict: expected {}, got {}",
249 expected, actual
250 )
251 }
252 Self::LockHeld { holder } => {
253 write!(f, "Parameter locked by {}", holder)
254 }
255 Self::ConflictRejected => {
256 write!(f, "Update rejected by conflict strategy")
257 }
258 Self::OutOfRange => {
259 write!(f, "Value out of allowed range")
260 }
261 Self::AtCapacity => {
262 write!(f, "State store at capacity")
263 }
264 }
265 }
266}
267
268impl std::error::Error for UpdateError {}
269
270#[derive(Debug, Clone)]
272pub struct CapacityError;
273
274impl std::fmt::Display for CapacityError {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 write!(f, "State store at capacity")
277 }
278}
279
280impl std::error::Error for CapacityError {}
281
282#[derive(Debug)]
284pub struct StateStore {
285 params: HashMap<String, ParamState>,
286 config: StateStoreConfig,
287}
288
289impl Default for StateStore {
290 fn default() -> Self {
291 Self {
292 params: HashMap::new(),
293 config: StateStoreConfig::unlimited(), }
295 }
296}
297
298impl StateStore {
299 pub fn new() -> Self {
301 Self::default()
302 }
303
304 pub fn with_config(config: StateStoreConfig) -> Self {
306 Self {
307 params: HashMap::new(),
308 config,
309 }
310 }
311
312 pub fn config(&self) -> &StateStoreConfig {
314 &self.config
315 }
316
317 pub fn get(&self, address: &str) -> Option<&ParamState> {
319 self.params.get(address)
320 }
321
322 pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
324 let param = self.params.get_mut(address)?;
325 param.touch();
326 Some(param)
327 }
328
329 pub fn get_value(&self, address: &str) -> Option<&Value> {
331 self.params.get(address).map(|p| &p.value)
332 }
333
334 pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
336 let param = self.params.get_mut(address)?;
337 param.touch();
338 Some(¶m.value)
339 }
340
341 pub fn set(
343 &mut self,
344 address: &str,
345 value: Value,
346 writer: &str,
347 revision: Option<u64>,
348 lock: bool,
349 unlock: bool,
350 ttl: Option<Ttl>,
351 ) -> Result<u64, UpdateError> {
352 if let Some(param) = self.params.get_mut(address) {
353 param.try_update(value, writer, revision, lock, unlock, ttl)
354 } else {
355 if let Some(max) = self.config.max_params {
357 if self.params.len() >= max {
358 match self.config.eviction {
359 EvictionStrategy::RejectNew => {
360 return Err(UpdateError::AtCapacity);
361 }
362 EvictionStrategy::Lru => {
363 self.evict_lru();
364 }
365 EvictionStrategy::OldestFirst => {
366 self.evict_oldest();
367 }
368 }
369 }
370 }
371
372 let mut param = ParamState::new(value, writer.to_string());
374 if lock {
375 param.lock_holder = Some(writer.to_string());
376 }
377 param.ttl = ttl;
378 let rev = param.revision;
379 self.params.insert(address.to_string(), param);
380 Ok(rev)
381 }
382 }
383
384 fn evict_lru(&mut self) {
386 if let Some(oldest_key) = self
387 .params
388 .iter()
389 .min_by_key(|(_, v)| v.last_accessed)
390 .map(|(k, _)| k.clone())
391 {
392 self.params.remove(&oldest_key);
393 }
394 }
395
396 fn evict_oldest(&mut self) {
398 if let Some(oldest_key) = self
399 .params
400 .iter()
401 .min_by_key(|(_, v)| v.timestamp)
402 .map(|(k, _)| k.clone())
403 {
404 self.params.remove(&oldest_key);
405 }
406 }
407
408 pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
411 let now = current_timestamp();
412 let global_ttl_micros = ttl.as_micros() as u64;
413
414 let before = self.params.len();
415 self.params.retain(|_, v| match v.ttl {
416 Some(Ttl::Never) => true,
417 Some(Ttl::Sliding(secs)) => {
418 let cutoff = now.saturating_sub(secs as u64 * 1_000_000);
419 v.last_accessed >= cutoff
420 }
421 Some(Ttl::Absolute(secs)) => {
422 let expires_at = v.timestamp.saturating_add(secs as u64 * 1_000_000);
423 now < expires_at
424 }
425 None => {
426 let cutoff = now.saturating_sub(global_ttl_micros);
427 v.last_accessed >= cutoff
428 }
429 });
430 before - self.params.len()
431 }
432
433 pub fn cleanup_stale_with_config(&mut self) -> usize {
436 if let Some(ttl) = self.config.param_ttl {
437 self.cleanup_stale(ttl)
438 } else {
439 0
440 }
441 }
442
443 pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
445 use crate::address::glob_match;
446
447 self.params
448 .iter()
449 .filter(|(addr, _)| glob_match(pattern, addr))
450 .map(|(addr, state)| (addr.as_str(), state))
451 .collect()
452 }
453
454 pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
456 self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
457 }
458
459 pub fn len(&self) -> usize {
461 self.params.len()
462 }
463
464 pub fn is_empty(&self) -> bool {
466 self.params.is_empty()
467 }
468
469 pub fn remove(&mut self, address: &str) -> Option<ParamState> {
471 self.params.remove(address)
472 }
473
474 pub fn clear(&mut self) {
476 self.params.clear();
477 }
478}
479
480fn current_timestamp() -> u64 {
482 SystemTime::now()
483 .duration_since(UNIX_EPOCH)
484 .map(|d| d.as_micros() as u64)
485 .unwrap_or(0)
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_basic_update() {
494 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
495
496 let result = state.try_update(Value::Float(0.75), "session2", None, false, false, None);
497
498 assert!(result.is_ok());
499 assert_eq!(state.revision, 2);
500 assert_eq!(state.value, Value::Float(0.75));
501 assert_eq!(state.writer, "session2");
502 }
503
504 #[test]
505 fn test_revision_conflict() {
506 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
507
508 let result = state.try_update(
509 Value::Float(0.75),
510 "session2",
511 Some(999), false,
513 false,
514 None,
515 );
516
517 assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
518 }
519
520 #[test]
521 fn test_locking() {
522 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
523
524 let result = state.try_update(
526 Value::Float(0.6),
527 "session1",
528 None,
529 true, false,
531 None,
532 );
533 assert!(result.is_ok());
534 assert_eq!(state.lock_holder, Some("session1".to_string()));
535
536 let result = state.try_update(Value::Float(0.7), "session2", None, false, false, None);
538 assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
539
540 let result = state.try_update(Value::Float(0.8), "session1", None, false, false, None);
542 assert!(result.is_ok());
543 }
544
545 #[test]
546 fn test_max_strategy() {
547 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
548 .with_strategy(ConflictStrategy::Max);
549
550 let result = state.try_update(Value::Float(0.8), "session2", None, false, false, None);
552 assert!(result.is_ok());
553 assert_eq!(state.value, Value::Float(0.8));
554
555 let result = state.try_update(Value::Float(0.3), "session3", None, false, false, None);
557 assert!(matches!(result, Err(UpdateError::ConflictRejected)));
558 assert_eq!(state.value, Value::Float(0.8)); }
560
561 #[test]
562 fn test_state_store() {
563 let mut store = StateStore::new();
564
565 store
566 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
567 .unwrap();
568 store
569 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
570 .unwrap();
571 store
572 .set(
573 "/other/c",
574 Value::Float(3.0),
575 "s1",
576 None,
577 false,
578 false,
579 None,
580 )
581 .unwrap();
582
583 assert_eq!(store.len(), 3);
584
585 let matching = store.get_matching("/test/*");
586 assert_eq!(matching.len(), 2);
587 }
588
589 #[test]
590 fn test_state_store_capacity_reject() {
591 let config = StateStoreConfig {
592 max_params: Some(2),
593 param_ttl: None,
594 eviction: EvictionStrategy::RejectNew,
595 };
596 let mut store = StateStore::with_config(config);
597
598 store
599 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
600 .unwrap();
601 store
602 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
603 .unwrap();
604
605 let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false, None);
607 assert!(matches!(result, Err(UpdateError::AtCapacity)));
608 assert_eq!(store.len(), 2);
609
610 store
612 .set("/test/a", Value::Float(1.5), "s1", None, false, false, None)
613 .unwrap();
614 assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
615 }
616
617 #[test]
618 fn test_state_store_capacity_lru_eviction() {
619 let config = StateStoreConfig {
620 max_params: Some(2),
621 param_ttl: None,
622 eviction: EvictionStrategy::Lru,
623 };
624 let mut store = StateStore::with_config(config);
625
626 store
627 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
628 .unwrap();
629 std::thread::sleep(std::time::Duration::from_millis(1));
630 store
631 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
632 .unwrap();
633
634 std::thread::sleep(std::time::Duration::from_millis(1));
636 store.get_mut("/test/a");
637
638 store
640 .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
641 .unwrap();
642
643 assert_eq!(store.len(), 2);
644 assert!(store.get("/test/a").is_some());
645 assert!(store.get("/test/b").is_none()); assert!(store.get("/test/c").is_some());
647 }
648
649 #[test]
650 fn test_state_store_capacity_oldest_eviction() {
651 let config = StateStoreConfig {
652 max_params: Some(2),
653 param_ttl: None,
654 eviction: EvictionStrategy::OldestFirst,
655 };
656 let mut store = StateStore::with_config(config);
657
658 store
659 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
660 .unwrap();
661 std::thread::sleep(std::time::Duration::from_millis(1));
662 store
663 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
664 .unwrap();
665
666 store
668 .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
669 .unwrap();
670
671 assert_eq!(store.len(), 2);
672 assert!(store.get("/test/a").is_none()); assert!(store.get("/test/b").is_some());
674 assert!(store.get("/test/c").is_some());
675 }
676
677 #[test]
678 fn test_state_store_cleanup_stale() {
679 let mut store = StateStore::new();
680
681 store
682 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
683 .unwrap();
684 store
685 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
686 .unwrap();
687
688 std::thread::sleep(std::time::Duration::from_millis(10));
690 store.get_mut("/test/a");
691
692 let removed = store.cleanup_stale(Duration::from_millis(5));
694 assert_eq!(removed, 1);
695 assert!(store.get("/test/a").is_some());
696 assert!(store.get("/test/b").is_none());
697 }
698
699 #[test]
700 fn test_state_store_cleanup_stale_with_config() {
701 let config = StateStoreConfig {
702 max_params: None,
703 param_ttl: Some(Duration::from_millis(5)),
704 eviction: EvictionStrategy::Lru,
705 };
706 let mut store = StateStore::with_config(config);
707
708 store
709 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
710 .unwrap();
711
712 let removed = store.cleanup_stale_with_config();
714 assert_eq!(removed, 0);
715
716 std::thread::sleep(std::time::Duration::from_millis(10));
718 let removed = store.cleanup_stale_with_config();
719 assert_eq!(removed, 1);
720 assert!(store.is_empty());
721 }
722
723 #[test]
724 fn test_last_accessed_tracking() {
725 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
726 let initial_accessed = state.last_accessed;
727
728 std::thread::sleep(std::time::Duration::from_millis(1));
729 state.touch();
730
731 assert!(state.last_accessed > initial_accessed);
732 }
733}