rs2_stream/state/
storage.rs

1use super::StateStorage;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use crate::resource_manager::get_global_resource_manager;
8
9/// In-memory state storage (fastest, but not persistent)
10pub struct InMemoryState {
11    data: Arc<RwLock<HashMap<String, (Vec<u8>, Instant)>>>,
12    ttl: Duration,
13    max_size: Option<usize>,
14}
15
16impl InMemoryState {
17    pub fn new(ttl: Duration) -> Self {
18        Self {
19            data: Arc::new(RwLock::new(HashMap::new())),
20            ttl,
21            max_size: None,
22        }
23    }
24
25    pub fn with_max_size(mut self, max_size: usize) -> Self {
26        self.max_size = Some(max_size);
27        self
28    }
29
30    async fn cleanup_expired(&self) {
31        let mut data = self.data.write().await;
32        let now = Instant::now();
33        data.retain(|_, (_, timestamp)| now.duration_since(*timestamp) < self.ttl);
34    }
35
36    async fn enforce_max_size(&self) {
37        if let Some(max_size) = self.max_size {
38            let mut data = self.data.write().await;
39            if data.len() > max_size {
40                // Remove oldest entries
41                let mut entries: Vec<_> = data.iter().collect();
42                entries.sort_by_key(|(_, (_, timestamp))| *timestamp);
43
44                let to_remove = entries.len() - max_size;
45                let keys_to_remove: Vec<String> = entries
46                    .iter()
47                    .take(to_remove)
48                    .map(|(key, _)| (*key).clone())
49                    .collect();
50
51                for key in keys_to_remove {
52                    data.remove(&key);
53                }
54            }
55        }
56    }
57}
58
59#[async_trait]
60impl StateStorage for InMemoryState {
61    async fn get(&self, key: &str) -> Option<Vec<u8>> {
62        let data = self.data.read().await;
63        if let Some((value_bytes, timestamp)) = data.get(key) {
64            if Instant::now().duration_since(*timestamp) < self.ttl {
65                Some(value_bytes.clone())
66            } else {
67                None // Expired
68            }
69        } else {
70            None
71        }
72    }
73
74    async fn set(
75        &self,
76        key: &str,
77        value: &[u8],
78    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
79        let mut data = self.data.write().await;
80        let is_new_key = !data.contains_key(key);
81        let old_size = data.get(key).map(|(v, _)| v.len()).unwrap_or(0);
82        data.insert(key.to_string(), (value.to_vec(), Instant::now()));
83        let new_size = value.len();
84        drop(data);
85        let resource_manager = get_global_resource_manager();
86        if is_new_key {
87            resource_manager.track_key_creation().await.ok();
88        }
89        if new_size > old_size {
90            resource_manager.track_memory_allocation((new_size - old_size) as u64).await.ok();
91        } else if old_size > new_size {
92            resource_manager.track_memory_deallocation((old_size - new_size) as u64).await;
93        }
94        // Enforce max size within the same lock
95        let mut data = self.data.write().await;
96        if let Some(max_size) = self.max_size {
97            if data.len() > max_size {
98                // Remove oldest entries
99                let mut entries: Vec<_> = data.iter().collect();
100                entries.sort_by_key(|(_, (_, timestamp))| *timestamp);
101                let to_remove = entries.len() - max_size;
102                let keys_to_remove: Vec<String> = entries
103                    .iter()
104                    .take(to_remove)
105                    .map(|(key, _)| (*key).clone())
106                    .collect();
107                for key in keys_to_remove {
108                    if let Some((v, _)) = data.remove(&key) {
109                        resource_manager.track_memory_deallocation(v.len() as u64).await;
110                        resource_manager.track_key_removal().await;
111                    }
112                }
113            }
114        }
115        Ok(())
116    }
117
118    async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
119        let mut data = self.data.write().await;
120        if let Some((v, _)) = data.remove(key) {
121            let resource_manager = get_global_resource_manager();
122            resource_manager.track_memory_deallocation(v.len() as u64).await;
123            resource_manager.track_key_removal().await;
124        }
125        Ok(())
126    }
127
128    async fn exists(&self, key: &str) -> bool {
129        let data = self.data.read().await;
130        if let Some((_, timestamp)) = data.get(key) {
131            Instant::now().duration_since(*timestamp) < self.ttl
132        } else {
133            false
134        }
135    }
136
137    async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
138        let mut data = self.data.write().await;
139        let resource_manager = get_global_resource_manager();
140        for (v, _) in data.values() {
141            resource_manager.track_memory_deallocation(v.len() as u64).await;
142            resource_manager.track_key_removal().await;
143        }
144        data.clear();
145        Ok(())
146    }
147}