1use crate::{ConflictStrategy, Ttl, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12 #[default]
14 Lru,
15 OldestFirst,
17 RejectNew,
19}
20
21#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24 pub max_params: Option<usize>,
26 pub param_ttl: Option<Duration>,
28 pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33 fn default() -> Self {
34 Self {
35 max_params: Some(10_000),
36 param_ttl: Some(Duration::from_secs(3600)), eviction: EvictionStrategy::Lru,
38 }
39 }
40}
41
42impl StateStoreConfig {
43 pub fn unlimited() -> Self {
45 Self {
46 max_params: None,
47 param_ttl: None,
48 eviction: EvictionStrategy::Lru,
49 }
50 }
51
52 pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54 Self {
55 max_params: Some(max_params),
56 param_ttl: Some(Duration::from_secs(ttl_secs)),
57 eviction: EvictionStrategy::Lru,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct ParamState {
65 pub value: Value,
67 pub revision: u64,
69 pub writer: String,
71 pub timestamp: u64,
73 pub last_accessed: u64,
75 pub strategy: ConflictStrategy,
77 pub lock_holder: Option<String>,
79 pub meta: Option<ParamMeta>,
81 pub origin: Option<String>,
83 pub ttl: Option<Ttl>,
85}
86
87#[derive(Debug, Clone)]
89pub struct ParamMeta {
90 pub unit: Option<String>,
91 pub range: Option<(f64, f64)>,
92 pub default: Option<Value>,
93}
94
95impl ParamState {
96 pub fn new(value: Value, writer: String) -> Self {
98 let now = current_timestamp();
99 Self {
100 value,
101 revision: 1,
102 writer,
103 timestamp: now,
104 last_accessed: now,
105 strategy: ConflictStrategy::Lww,
106 lock_holder: None,
107 meta: None,
108 origin: None,
109 ttl: None,
110 }
111 }
112
113 pub fn touch(&mut self) {
115 self.last_accessed = current_timestamp();
116 }
117
118 pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
120 self.strategy = strategy;
121 self
122 }
123
124 pub fn with_meta(mut self, meta: ParamMeta) -> Self {
126 self.meta = Some(meta);
127 self
128 }
129
130 pub fn try_update(
135 &mut self,
136 new_value: Value,
137 writer: &str,
138 expected_revision: Option<u64>,
139 request_lock: bool,
140 release_lock: bool,
141 ttl: Option<Ttl>,
142 ) -> Result<u64, UpdateError> {
143 let timestamp = current_timestamp();
144
145 if let Some(expected) = expected_revision {
147 if expected != self.revision {
148 return Err(UpdateError::RevisionConflict {
149 expected,
150 actual: self.revision,
151 });
152 }
153 }
154
155 if let Some(ref holder) = self.lock_holder {
157 if holder != writer && !release_lock {
158 return Err(UpdateError::LockHeld {
159 holder: holder.clone(),
160 });
161 }
162 }
163
164 if release_lock && self.lock_holder.as_deref() == Some(writer) {
166 self.lock_holder = None;
167 }
168
169 let should_update = match self.strategy {
171 ConflictStrategy::Lww => timestamp >= self.timestamp,
172 ConflictStrategy::Max => {
173 match (&new_value, &self.value) {
174 (Value::Float(new), Value::Float(old)) => new > old,
175 (Value::Int(new), Value::Int(old)) => new > old,
176 _ => true, }
178 }
179 ConflictStrategy::Min => match (&new_value, &self.value) {
180 (Value::Float(new), Value::Float(old)) => new < old,
181 (Value::Int(new), Value::Int(old)) => new < old,
182 _ => true,
183 },
184 ConflictStrategy::Lock => {
185 self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
186 }
187 ConflictStrategy::Merge => true, };
189
190 if !should_update {
191 return Err(UpdateError::ConflictRejected);
192 }
193
194 if request_lock {
196 if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
197 return Err(UpdateError::LockHeld {
198 holder: self.lock_holder.clone().unwrap(),
199 });
200 }
201 self.lock_holder = Some(writer.to_string());
202 }
203
204 self.value = new_value;
206 self.revision += 1;
207 self.writer = writer.to_string();
208 self.timestamp = timestamp;
209 self.last_accessed = timestamp;
210
211 if let Some(t) = ttl {
213 self.ttl = Some(t);
214 }
215
216 Ok(self.revision)
217 }
218
219 pub fn validate_range(&self, value: &Value) -> bool {
221 if let Some(meta) = &self.meta {
222 if let Some((min, max)) = meta.range {
223 if let Some(v) = value.as_f64() {
224 return v >= min && v <= max;
225 }
226 }
227 }
228 true
229 }
230}
231
232#[derive(Debug, Clone)]
234pub enum UpdateError {
235 RevisionConflict { expected: u64, actual: u64 },
236 LockHeld { holder: String },
237 ConflictRejected,
238 OutOfRange,
239 AtCapacity,
240}
241
242impl std::fmt::Display for UpdateError {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 match self {
245 Self::RevisionConflict { expected, actual } => {
246 write!(
247 f,
248 "Revision conflict: expected {}, got {}",
249 expected, actual
250 )
251 }
252 Self::LockHeld { holder } => {
253 write!(f, "Parameter locked by {}", holder)
254 }
255 Self::ConflictRejected => {
256 write!(f, "Update rejected by conflict strategy")
257 }
258 Self::OutOfRange => {
259 write!(f, "Value out of allowed range")
260 }
261 Self::AtCapacity => {
262 write!(f, "State store at capacity")
263 }
264 }
265 }
266}
267
268impl std::error::Error for UpdateError {}
269
270#[derive(Debug, Clone)]
272pub struct CapacityError;
273
274impl std::fmt::Display for CapacityError {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 write!(f, "State store at capacity")
277 }
278}
279
280impl std::error::Error for CapacityError {}
281
282#[derive(Debug)]
284pub struct StateStore {
285 params: HashMap<String, ParamState>,
286 config: StateStoreConfig,
287}
288
289impl Default for StateStore {
290 fn default() -> Self {
291 Self {
292 params: HashMap::new(),
293 config: StateStoreConfig::unlimited(), }
295 }
296}
297
298impl StateStore {
299 pub fn new() -> Self {
301 Self::default()
302 }
303
304 pub fn with_config(config: StateStoreConfig) -> Self {
306 Self {
307 params: HashMap::new(),
308 config,
309 }
310 }
311
312 pub fn config(&self) -> &StateStoreConfig {
314 &self.config
315 }
316
317 pub fn get(&self, address: &str) -> Option<&ParamState> {
319 self.params.get(address)
320 }
321
322 pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
324 let param = self.params.get_mut(address)?;
325 param.touch();
326 Some(param)
327 }
328
329 pub fn get_value(&self, address: &str) -> Option<&Value> {
331 self.params.get(address).map(|p| &p.value)
332 }
333
334 pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
336 let param = self.params.get_mut(address)?;
337 param.touch();
338 Some(¶m.value)
339 }
340
341 pub fn set(
343 &mut self,
344 address: &str,
345 value: Value,
346 writer: &str,
347 revision: Option<u64>,
348 lock: bool,
349 unlock: bool,
350 ttl: Option<Ttl>,
351 ) -> Result<u64, UpdateError> {
352 if let Some(param) = self.params.get_mut(address) {
353 param.try_update(value, writer, revision, lock, unlock, ttl)
354 } else {
355 if let Some(max) = self.config.max_params {
357 if self.params.len() >= max {
358 match self.config.eviction {
359 EvictionStrategy::RejectNew => {
360 return Err(UpdateError::AtCapacity);
361 }
362 EvictionStrategy::Lru => {
363 self.evict_lru();
364 }
365 EvictionStrategy::OldestFirst => {
366 self.evict_oldest();
367 }
368 }
369 }
370 }
371
372 let mut param = ParamState::new(value, writer.to_string());
374 if lock {
375 param.lock_holder = Some(writer.to_string());
376 }
377 param.ttl = ttl;
378 let rev = param.revision;
379 self.params.insert(address.to_string(), param);
380 Ok(rev)
381 }
382 }
383
384 fn evict_lru(&mut self) {
386 if let Some(oldest_key) = self
387 .params
388 .iter()
389 .min_by_key(|(_, v)| v.last_accessed)
390 .map(|(k, _)| k.clone())
391 {
392 self.params.remove(&oldest_key);
393 }
394 }
395
396 fn evict_oldest(&mut self) {
398 if let Some(oldest_key) = self
399 .params
400 .iter()
401 .min_by_key(|(_, v)| v.timestamp)
402 .map(|(k, _)| k.clone())
403 {
404 self.params.remove(&oldest_key);
405 }
406 }
407
408 pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
411 let now = current_timestamp();
412 let global_ttl_micros = ttl.as_micros() as u64;
413
414 let before = self.params.len();
415 self.params.retain(|_, v| {
416 match v.ttl {
417 Some(Ttl::Never) => true,
418 Some(Ttl::Sliding(secs)) => {
419 let cutoff = now.saturating_sub(secs as u64 * 1_000_000);
420 v.last_accessed >= cutoff
421 }
422 Some(Ttl::Absolute(secs)) => {
423 let expires_at = v.timestamp.saturating_add(secs as u64 * 1_000_000);
424 now < expires_at
425 }
426 None => {
427 let cutoff = now.saturating_sub(global_ttl_micros);
428 v.last_accessed >= cutoff
429 }
430 }
431 });
432 before - self.params.len()
433 }
434
435 pub fn cleanup_stale_with_config(&mut self) -> usize {
438 if let Some(ttl) = self.config.param_ttl {
439 self.cleanup_stale(ttl)
440 } else {
441 0
442 }
443 }
444
445 pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
447 use crate::address::glob_match;
448
449 self.params
450 .iter()
451 .filter(|(addr, _)| glob_match(pattern, addr))
452 .map(|(addr, state)| (addr.as_str(), state))
453 .collect()
454 }
455
456 pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
458 self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
459 }
460
461 pub fn len(&self) -> usize {
463 self.params.len()
464 }
465
466 pub fn is_empty(&self) -> bool {
468 self.params.is_empty()
469 }
470
471 pub fn remove(&mut self, address: &str) -> Option<ParamState> {
473 self.params.remove(address)
474 }
475
476 pub fn clear(&mut self) {
478 self.params.clear();
479 }
480}
481
482fn current_timestamp() -> u64 {
484 SystemTime::now()
485 .duration_since(UNIX_EPOCH)
486 .map(|d| d.as_micros() as u64)
487 .unwrap_or(0)
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn test_basic_update() {
496 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
497
498 let result = state.try_update(Value::Float(0.75), "session2", None, false, false, None);
499
500 assert!(result.is_ok());
501 assert_eq!(state.revision, 2);
502 assert_eq!(state.value, Value::Float(0.75));
503 assert_eq!(state.writer, "session2");
504 }
505
506 #[test]
507 fn test_revision_conflict() {
508 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
509
510 let result = state.try_update(
511 Value::Float(0.75),
512 "session2",
513 Some(999), false,
515 false,
516 None,
517 );
518
519 assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
520 }
521
522 #[test]
523 fn test_locking() {
524 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
525
526 let result = state.try_update(
528 Value::Float(0.6),
529 "session1",
530 None,
531 true, false,
533 None,
534 );
535 assert!(result.is_ok());
536 assert_eq!(state.lock_holder, Some("session1".to_string()));
537
538 let result = state.try_update(Value::Float(0.7), "session2", None, false, false, None);
540 assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
541
542 let result = state.try_update(Value::Float(0.8), "session1", None, false, false, None);
544 assert!(result.is_ok());
545 }
546
547 #[test]
548 fn test_max_strategy() {
549 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
550 .with_strategy(ConflictStrategy::Max);
551
552 let result = state.try_update(Value::Float(0.8), "session2", None, false, false, None);
554 assert!(result.is_ok());
555 assert_eq!(state.value, Value::Float(0.8));
556
557 let result = state.try_update(Value::Float(0.3), "session3", None, false, false, None);
559 assert!(matches!(result, Err(UpdateError::ConflictRejected)));
560 assert_eq!(state.value, Value::Float(0.8)); }
562
563 #[test]
564 fn test_state_store() {
565 let mut store = StateStore::new();
566
567 store
568 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
569 .unwrap();
570 store
571 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
572 .unwrap();
573 store
574 .set("/other/c", Value::Float(3.0), "s1", None, false, false, None)
575 .unwrap();
576
577 assert_eq!(store.len(), 3);
578
579 let matching = store.get_matching("/test/*");
580 assert_eq!(matching.len(), 2);
581 }
582
583 #[test]
584 fn test_state_store_capacity_reject() {
585 let config = StateStoreConfig {
586 max_params: Some(2),
587 param_ttl: None,
588 eviction: EvictionStrategy::RejectNew,
589 };
590 let mut store = StateStore::with_config(config);
591
592 store
593 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
594 .unwrap();
595 store
596 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
597 .unwrap();
598
599 let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false, None);
601 assert!(matches!(result, Err(UpdateError::AtCapacity)));
602 assert_eq!(store.len(), 2);
603
604 store
606 .set("/test/a", Value::Float(1.5), "s1", None, false, false, None)
607 .unwrap();
608 assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
609 }
610
611 #[test]
612 fn test_state_store_capacity_lru_eviction() {
613 let config = StateStoreConfig {
614 max_params: Some(2),
615 param_ttl: None,
616 eviction: EvictionStrategy::Lru,
617 };
618 let mut store = StateStore::with_config(config);
619
620 store
621 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
622 .unwrap();
623 std::thread::sleep(std::time::Duration::from_millis(1));
624 store
625 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
626 .unwrap();
627
628 std::thread::sleep(std::time::Duration::from_millis(1));
630 store.get_mut("/test/a");
631
632 store
634 .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
635 .unwrap();
636
637 assert_eq!(store.len(), 2);
638 assert!(store.get("/test/a").is_some());
639 assert!(store.get("/test/b").is_none()); assert!(store.get("/test/c").is_some());
641 }
642
643 #[test]
644 fn test_state_store_capacity_oldest_eviction() {
645 let config = StateStoreConfig {
646 max_params: Some(2),
647 param_ttl: None,
648 eviction: EvictionStrategy::OldestFirst,
649 };
650 let mut store = StateStore::with_config(config);
651
652 store
653 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
654 .unwrap();
655 std::thread::sleep(std::time::Duration::from_millis(1));
656 store
657 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
658 .unwrap();
659
660 store
662 .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
663 .unwrap();
664
665 assert_eq!(store.len(), 2);
666 assert!(store.get("/test/a").is_none()); assert!(store.get("/test/b").is_some());
668 assert!(store.get("/test/c").is_some());
669 }
670
671 #[test]
672 fn test_state_store_cleanup_stale() {
673 let mut store = StateStore::new();
674
675 store
676 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
677 .unwrap();
678 store
679 .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
680 .unwrap();
681
682 std::thread::sleep(std::time::Duration::from_millis(10));
684 store.get_mut("/test/a");
685
686 let removed = store.cleanup_stale(Duration::from_millis(5));
688 assert_eq!(removed, 1);
689 assert!(store.get("/test/a").is_some());
690 assert!(store.get("/test/b").is_none());
691 }
692
693 #[test]
694 fn test_state_store_cleanup_stale_with_config() {
695 let config = StateStoreConfig {
696 max_params: None,
697 param_ttl: Some(Duration::from_millis(5)),
698 eviction: EvictionStrategy::Lru,
699 };
700 let mut store = StateStore::with_config(config);
701
702 store
703 .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
704 .unwrap();
705
706 let removed = store.cleanup_stale_with_config();
708 assert_eq!(removed, 0);
709
710 std::thread::sleep(std::time::Duration::from_millis(10));
712 let removed = store.cleanup_stale_with_config();
713 assert_eq!(removed, 1);
714 assert!(store.is_empty());
715 }
716
717 #[test]
718 fn test_last_accessed_tracking() {
719 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
720 let initial_accessed = state.last_accessed;
721
722 std::thread::sleep(std::time::Duration::from_millis(1));
723 state.touch();
724
725 assert!(state.last_accessed > initial_accessed);
726 }
727}