rs2_stream/state/
storage.rs1use super::StateStorage;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use crate::resource_manager::get_global_resource_manager;
8
9pub struct InMemoryState {
11 data: Arc<RwLock<HashMap<String, (Vec<u8>, Instant)>>>,
12 ttl: Duration,
13 max_size: Option<usize>,
14}
15
16impl InMemoryState {
17 pub fn new(ttl: Duration) -> Self {
18 Self {
19 data: Arc::new(RwLock::new(HashMap::new())),
20 ttl,
21 max_size: None,
22 }
23 }
24
25 pub fn with_max_size(mut self, max_size: usize) -> Self {
26 self.max_size = Some(max_size);
27 self
28 }
29
30 async fn cleanup_expired(&self) {
31 let mut data = self.data.write().await;
32 let now = Instant::now();
33 data.retain(|_, (_, timestamp)| now.duration_since(*timestamp) < self.ttl);
34 }
35
36 async fn enforce_max_size(&self) {
37 if let Some(max_size) = self.max_size {
38 let mut data = self.data.write().await;
39 if data.len() > max_size {
40 let mut entries: Vec<_> = data.iter().collect();
42 entries.sort_by_key(|(_, (_, timestamp))| *timestamp);
43
44 let to_remove = entries.len() - max_size;
45 let keys_to_remove: Vec<String> = entries
46 .iter()
47 .take(to_remove)
48 .map(|(key, _)| (*key).clone())
49 .collect();
50
51 for key in keys_to_remove {
52 data.remove(&key);
53 }
54 }
55 }
56 }
57}
58
59#[async_trait]
60impl StateStorage for InMemoryState {
61 async fn get(&self, key: &str) -> Option<Vec<u8>> {
62 let data = self.data.read().await;
63 if let Some((value_bytes, timestamp)) = data.get(key) {
64 if Instant::now().duration_since(*timestamp) < self.ttl {
65 Some(value_bytes.clone())
66 } else {
67 None }
69 } else {
70 None
71 }
72 }
73
74 async fn set(
75 &self,
76 key: &str,
77 value: &[u8],
78 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
79 let mut data = self.data.write().await;
80 let is_new_key = !data.contains_key(key);
81 let old_size = data.get(key).map(|(v, _)| v.len()).unwrap_or(0);
82 data.insert(key.to_string(), (value.to_vec(), Instant::now()));
83 let new_size = value.len();
84 drop(data);
85 let resource_manager = get_global_resource_manager();
86 if is_new_key {
87 resource_manager.track_key_creation().await.ok();
88 }
89 if new_size > old_size {
90 resource_manager.track_memory_allocation((new_size - old_size) as u64).await.ok();
91 } else if old_size > new_size {
92 resource_manager.track_memory_deallocation((old_size - new_size) as u64).await;
93 }
94 let mut data = self.data.write().await;
96 if let Some(max_size) = self.max_size {
97 if data.len() > max_size {
98 let mut entries: Vec<_> = data.iter().collect();
100 entries.sort_by_key(|(_, (_, timestamp))| *timestamp);
101 let to_remove = entries.len() - max_size;
102 let keys_to_remove: Vec<String> = entries
103 .iter()
104 .take(to_remove)
105 .map(|(key, _)| (*key).clone())
106 .collect();
107 for key in keys_to_remove {
108 if let Some((v, _)) = data.remove(&key) {
109 resource_manager.track_memory_deallocation(v.len() as u64).await;
110 resource_manager.track_key_removal().await;
111 }
112 }
113 }
114 }
115 Ok(())
116 }
117
118 async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
119 let mut data = self.data.write().await;
120 if let Some((v, _)) = data.remove(key) {
121 let resource_manager = get_global_resource_manager();
122 resource_manager.track_memory_deallocation(v.len() as u64).await;
123 resource_manager.track_key_removal().await;
124 }
125 Ok(())
126 }
127
128 async fn exists(&self, key: &str) -> bool {
129 let data = self.data.read().await;
130 if let Some((_, timestamp)) = data.get(key) {
131 Instant::now().duration_since(*timestamp) < self.ttl
132 } else {
133 false
134 }
135 }
136
137 async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
138 let mut data = self.data.write().await;
139 let resource_manager = get_global_resource_manager();
140 for (v, _) in data.values() {
141 resource_manager.track_memory_deallocation(v.len() as u64).await;
142 resource_manager.track_key_removal().await;
143 }
144 data.clear();
145 Ok(())
146 }
147}