1use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12 #[default]
14 Lru,
15 OldestFirst,
17 RejectNew,
19}
20
21#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24 pub max_params: Option<usize>,
26 pub param_ttl: Option<Duration>,
28 pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33 fn default() -> Self {
34 Self {
35 max_params: Some(10_000),
36 param_ttl: Some(Duration::from_secs(3600)), eviction: EvictionStrategy::Lru,
38 }
39 }
40}
41
42impl StateStoreConfig {
43 pub fn unlimited() -> Self {
45 Self {
46 max_params: None,
47 param_ttl: None,
48 eviction: EvictionStrategy::Lru,
49 }
50 }
51
52 pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54 Self {
55 max_params: Some(max_params),
56 param_ttl: Some(Duration::from_secs(ttl_secs)),
57 eviction: EvictionStrategy::Lru,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct ParamState {
65 pub value: Value,
67 pub revision: u64,
69 pub writer: String,
71 pub timestamp: u64,
73 pub last_accessed: u64,
75 pub strategy: ConflictStrategy,
77 pub lock_holder: Option<String>,
79 pub meta: Option<ParamMeta>,
81}
82
83#[derive(Debug, Clone)]
85pub struct ParamMeta {
86 pub unit: Option<String>,
87 pub range: Option<(f64, f64)>,
88 pub default: Option<Value>,
89}
90
91impl ParamState {
92 pub fn new(value: Value, writer: String) -> Self {
94 let now = current_timestamp();
95 Self {
96 value,
97 revision: 1,
98 writer,
99 timestamp: now,
100 last_accessed: now,
101 strategy: ConflictStrategy::Lww,
102 lock_holder: None,
103 meta: None,
104 }
105 }
106
107 pub fn touch(&mut self) {
109 self.last_accessed = current_timestamp();
110 }
111
112 pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
114 self.strategy = strategy;
115 self
116 }
117
118 pub fn with_meta(mut self, meta: ParamMeta) -> Self {
120 self.meta = Some(meta);
121 self
122 }
123
124 pub fn try_update(
129 &mut self,
130 new_value: Value,
131 writer: &str,
132 expected_revision: Option<u64>,
133 request_lock: bool,
134 release_lock: bool,
135 ) -> Result<u64, UpdateError> {
136 let timestamp = current_timestamp();
137
138 if let Some(expected) = expected_revision {
140 if expected != self.revision {
141 return Err(UpdateError::RevisionConflict {
142 expected,
143 actual: self.revision,
144 });
145 }
146 }
147
148 if let Some(ref holder) = self.lock_holder {
150 if holder != writer && !release_lock {
151 return Err(UpdateError::LockHeld {
152 holder: holder.clone(),
153 });
154 }
155 }
156
157 if release_lock {
159 if self.lock_holder.as_deref() == Some(writer) {
160 self.lock_holder = None;
161 }
162 }
163
164 let should_update = match self.strategy {
166 ConflictStrategy::Lww => timestamp >= self.timestamp,
167 ConflictStrategy::Max => {
168 match (&new_value, &self.value) {
169 (Value::Float(new), Value::Float(old)) => new > old,
170 (Value::Int(new), Value::Int(old)) => new > old,
171 _ => true, }
173 }
174 ConflictStrategy::Min => match (&new_value, &self.value) {
175 (Value::Float(new), Value::Float(old)) => new < old,
176 (Value::Int(new), Value::Int(old)) => new < old,
177 _ => true,
178 },
179 ConflictStrategy::Lock => {
180 self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
181 }
182 ConflictStrategy::Merge => true, };
184
185 if !should_update {
186 return Err(UpdateError::ConflictRejected);
187 }
188
189 if request_lock {
191 if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
192 return Err(UpdateError::LockHeld {
193 holder: self.lock_holder.clone().unwrap(),
194 });
195 }
196 self.lock_holder = Some(writer.to_string());
197 }
198
199 self.value = new_value;
201 self.revision += 1;
202 self.writer = writer.to_string();
203 self.timestamp = timestamp;
204 self.last_accessed = timestamp;
205
206 Ok(self.revision)
207 }
208
209 pub fn validate_range(&self, value: &Value) -> bool {
211 if let Some(meta) = &self.meta {
212 if let Some((min, max)) = meta.range {
213 if let Some(v) = value.as_f64() {
214 return v >= min && v <= max;
215 }
216 }
217 }
218 true
219 }
220}
221
222#[derive(Debug, Clone)]
224pub enum UpdateError {
225 RevisionConflict { expected: u64, actual: u64 },
226 LockHeld { holder: String },
227 ConflictRejected,
228 OutOfRange,
229 AtCapacity,
230}
231
232impl std::fmt::Display for UpdateError {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 match self {
235 Self::RevisionConflict { expected, actual } => {
236 write!(
237 f,
238 "Revision conflict: expected {}, got {}",
239 expected, actual
240 )
241 }
242 Self::LockHeld { holder } => {
243 write!(f, "Parameter locked by {}", holder)
244 }
245 Self::ConflictRejected => {
246 write!(f, "Update rejected by conflict strategy")
247 }
248 Self::OutOfRange => {
249 write!(f, "Value out of allowed range")
250 }
251 Self::AtCapacity => {
252 write!(f, "State store at capacity")
253 }
254 }
255 }
256}
257
258impl std::error::Error for UpdateError {}
259
260#[derive(Debug, Clone)]
262pub struct CapacityError;
263
264impl std::fmt::Display for CapacityError {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 write!(f, "State store at capacity")
267 }
268}
269
270impl std::error::Error for CapacityError {}
271
272#[derive(Debug)]
274pub struct StateStore {
275 params: HashMap<String, ParamState>,
276 config: StateStoreConfig,
277}
278
279impl Default for StateStore {
280 fn default() -> Self {
281 Self {
282 params: HashMap::new(),
283 config: StateStoreConfig::unlimited(), }
285 }
286}
287
288impl StateStore {
289 pub fn new() -> Self {
291 Self::default()
292 }
293
294 pub fn with_config(config: StateStoreConfig) -> Self {
296 Self {
297 params: HashMap::new(),
298 config,
299 }
300 }
301
302 pub fn config(&self) -> &StateStoreConfig {
304 &self.config
305 }
306
307 pub fn get(&self, address: &str) -> Option<&ParamState> {
309 self.params.get(address)
310 }
311
312 pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
314 let param = self.params.get_mut(address)?;
315 param.touch();
316 Some(param)
317 }
318
319 pub fn get_value(&self, address: &str) -> Option<&Value> {
321 self.params.get(address).map(|p| &p.value)
322 }
323
324 pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
326 let param = self.params.get_mut(address)?;
327 param.touch();
328 Some(¶m.value)
329 }
330
331 pub fn set(
333 &mut self,
334 address: &str,
335 value: Value,
336 writer: &str,
337 revision: Option<u64>,
338 lock: bool,
339 unlock: bool,
340 ) -> Result<u64, UpdateError> {
341 if let Some(param) = self.params.get_mut(address) {
342 param.try_update(value, writer, revision, lock, unlock)
343 } else {
344 if let Some(max) = self.config.max_params {
346 if self.params.len() >= max {
347 match self.config.eviction {
348 EvictionStrategy::RejectNew => {
349 return Err(UpdateError::AtCapacity);
350 }
351 EvictionStrategy::Lru => {
352 self.evict_lru();
353 }
354 EvictionStrategy::OldestFirst => {
355 self.evict_oldest();
356 }
357 }
358 }
359 }
360
361 let mut param = ParamState::new(value, writer.to_string());
363 if lock {
364 param.lock_holder = Some(writer.to_string());
365 }
366 let rev = param.revision;
367 self.params.insert(address.to_string(), param);
368 Ok(rev)
369 }
370 }
371
372 fn evict_lru(&mut self) {
374 if let Some(oldest_key) = self
375 .params
376 .iter()
377 .min_by_key(|(_, v)| v.last_accessed)
378 .map(|(k, _)| k.clone())
379 {
380 self.params.remove(&oldest_key);
381 }
382 }
383
384 fn evict_oldest(&mut self) {
386 if let Some(oldest_key) = self
387 .params
388 .iter()
389 .min_by_key(|(_, v)| v.timestamp)
390 .map(|(k, _)| k.clone())
391 {
392 self.params.remove(&oldest_key);
393 }
394 }
395
396 pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
399 let now = current_timestamp();
400 let ttl_micros = ttl.as_micros() as u64;
401 let cutoff = now.saturating_sub(ttl_micros);
402
403 let before = self.params.len();
404 self.params.retain(|_, v| v.last_accessed >= cutoff);
405 before - self.params.len()
406 }
407
408 pub fn cleanup_stale_with_config(&mut self) -> usize {
411 if let Some(ttl) = self.config.param_ttl {
412 self.cleanup_stale(ttl)
413 } else {
414 0
415 }
416 }
417
418 pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
420 use crate::address::glob_match;
421
422 self.params
423 .iter()
424 .filter(|(addr, _)| glob_match(pattern, addr))
425 .map(|(addr, state)| (addr.as_str(), state))
426 .collect()
427 }
428
429 pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
431 self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
432 }
433
434 pub fn len(&self) -> usize {
436 self.params.len()
437 }
438
439 pub fn is_empty(&self) -> bool {
441 self.params.is_empty()
442 }
443
444 pub fn remove(&mut self, address: &str) -> Option<ParamState> {
446 self.params.remove(address)
447 }
448
449 pub fn clear(&mut self) {
451 self.params.clear();
452 }
453}
454
455fn current_timestamp() -> u64 {
457 SystemTime::now()
458 .duration_since(UNIX_EPOCH)
459 .map(|d| d.as_micros() as u64)
460 .unwrap_or(0)
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_basic_update() {
469 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
470
471 let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
472
473 assert!(result.is_ok());
474 assert_eq!(state.revision, 2);
475 assert_eq!(state.value, Value::Float(0.75));
476 assert_eq!(state.writer, "session2");
477 }
478
479 #[test]
480 fn test_revision_conflict() {
481 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
482
483 let result = state.try_update(
484 Value::Float(0.75),
485 "session2",
486 Some(999), false,
488 false,
489 );
490
491 assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
492 }
493
494 #[test]
495 fn test_locking() {
496 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
497
498 let result = state.try_update(
500 Value::Float(0.6),
501 "session1",
502 None,
503 true, false,
505 );
506 assert!(result.is_ok());
507 assert_eq!(state.lock_holder, Some("session1".to_string()));
508
509 let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
511 assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
512
513 let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
515 assert!(result.is_ok());
516 }
517
518 #[test]
519 fn test_max_strategy() {
520 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
521 .with_strategy(ConflictStrategy::Max);
522
523 let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
525 assert!(result.is_ok());
526 assert_eq!(state.value, Value::Float(0.8));
527
528 let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
530 assert!(matches!(result, Err(UpdateError::ConflictRejected)));
531 assert_eq!(state.value, Value::Float(0.8)); }
533
534 #[test]
535 fn test_state_store() {
536 let mut store = StateStore::new();
537
538 store
539 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
540 .unwrap();
541 store
542 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
543 .unwrap();
544 store
545 .set("/other/c", Value::Float(3.0), "s1", None, false, false)
546 .unwrap();
547
548 assert_eq!(store.len(), 3);
549
550 let matching = store.get_matching("/test/*");
551 assert_eq!(matching.len(), 2);
552 }
553
554 #[test]
555 fn test_state_store_capacity_reject() {
556 let config = StateStoreConfig {
557 max_params: Some(2),
558 param_ttl: None,
559 eviction: EvictionStrategy::RejectNew,
560 };
561 let mut store = StateStore::with_config(config);
562
563 store
564 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
565 .unwrap();
566 store
567 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
568 .unwrap();
569
570 let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false);
572 assert!(matches!(result, Err(UpdateError::AtCapacity)));
573 assert_eq!(store.len(), 2);
574
575 store
577 .set("/test/a", Value::Float(1.5), "s1", None, false, false)
578 .unwrap();
579 assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
580 }
581
582 #[test]
583 fn test_state_store_capacity_lru_eviction() {
584 let config = StateStoreConfig {
585 max_params: Some(2),
586 param_ttl: None,
587 eviction: EvictionStrategy::Lru,
588 };
589 let mut store = StateStore::with_config(config);
590
591 store
592 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
593 .unwrap();
594 std::thread::sleep(std::time::Duration::from_millis(1));
595 store
596 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
597 .unwrap();
598
599 std::thread::sleep(std::time::Duration::from_millis(1));
601 store.get_mut("/test/a");
602
603 store
605 .set("/test/c", Value::Float(3.0), "s1", None, false, false)
606 .unwrap();
607
608 assert_eq!(store.len(), 2);
609 assert!(store.get("/test/a").is_some());
610 assert!(store.get("/test/b").is_none()); assert!(store.get("/test/c").is_some());
612 }
613
614 #[test]
615 fn test_state_store_capacity_oldest_eviction() {
616 let config = StateStoreConfig {
617 max_params: Some(2),
618 param_ttl: None,
619 eviction: EvictionStrategy::OldestFirst,
620 };
621 let mut store = StateStore::with_config(config);
622
623 store
624 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
625 .unwrap();
626 std::thread::sleep(std::time::Duration::from_millis(1));
627 store
628 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
629 .unwrap();
630
631 store
633 .set("/test/c", Value::Float(3.0), "s1", None, false, false)
634 .unwrap();
635
636 assert_eq!(store.len(), 2);
637 assert!(store.get("/test/a").is_none()); assert!(store.get("/test/b").is_some());
639 assert!(store.get("/test/c").is_some());
640 }
641
642 #[test]
643 fn test_state_store_cleanup_stale() {
644 let mut store = StateStore::new();
645
646 store
647 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
648 .unwrap();
649 store
650 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
651 .unwrap();
652
653 std::thread::sleep(std::time::Duration::from_millis(10));
655 store.get_mut("/test/a");
656
657 let removed = store.cleanup_stale(Duration::from_millis(5));
659 assert_eq!(removed, 1);
660 assert!(store.get("/test/a").is_some());
661 assert!(store.get("/test/b").is_none());
662 }
663
664 #[test]
665 fn test_state_store_cleanup_stale_with_config() {
666 let config = StateStoreConfig {
667 max_params: None,
668 param_ttl: Some(Duration::from_millis(5)),
669 eviction: EvictionStrategy::Lru,
670 };
671 let mut store = StateStore::with_config(config);
672
673 store
674 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
675 .unwrap();
676
677 let removed = store.cleanup_stale_with_config();
679 assert_eq!(removed, 0);
680
681 std::thread::sleep(std::time::Duration::from_millis(10));
683 let removed = store.cleanup_stale_with_config();
684 assert_eq!(removed, 1);
685 assert!(store.is_empty());
686 }
687
688 #[test]
689 fn test_last_accessed_tracking() {
690 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
691 let initial_accessed = state.last_accessed;
692
693 std::thread::sleep(std::time::Duration::from_millis(1));
694 state.touch();
695
696 assert!(state.last_accessed > initial_accessed);
697 }
698}