rs2_stream/state/
config.rs

1use super::storage::InMemoryState;
2use super::traits::StateStorage;
3use super::traits::StateStorageType;
4use std::sync::Arc;
5use std::time::Duration;
6
7/// Configuration for state management
8#[derive(Clone)]
9pub struct StateConfig {
10    pub storage_type: StateStorageType,
11    pub ttl: Duration,
12    pub cleanup_interval: Duration,
13    pub max_size: Option<usize>,
14    pub custom_storage: Option<Arc<dyn StateStorage + Send + Sync>>,
15}
16
17impl std::fmt::Debug for StateConfig {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        f.debug_struct("StateConfig")
20            .field("storage_type", &self.storage_type)
21            .field("ttl", &self.ttl)
22            .field("cleanup_interval", &self.cleanup_interval)
23            .field("max_size", &self.max_size)
24            .field(
25                "custom_storage",
26                &if self.custom_storage.is_some() {
27                    "Some(CustomStorage)"
28                } else {
29                    "None"
30                },
31            )
32            .finish()
33    }
34}
35
36impl Default for StateConfig {
37    fn default() -> Self {
38        Self {
39            storage_type: StateStorageType::InMemory,
40            ttl: Duration::from_secs(24 * 60 * 60), // 24 hours
41            cleanup_interval: Duration::from_secs(5 * 60), // 5 minutes
42            max_size: None,
43            custom_storage: None,
44        }
45    }
46}
47
48impl StateConfig {
49    /// Create a new state configuration
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Set the storage type
55    pub fn storage_type(mut self, storage_type: StateStorageType) -> Self {
56        self.storage_type = storage_type;
57        self
58    }
59
60    /// Set the TTL (time to live) for state entries
61    pub fn ttl(mut self, ttl: Duration) -> Self {
62        self.ttl = ttl;
63        self
64    }
65
66    /// Set the cleanup interval for expired entries
67    pub fn cleanup_interval(mut self, interval: Duration) -> Self {
68        self.cleanup_interval = interval;
69        self
70    }
71
72    /// Set the maximum size for in-memory storage
73    pub fn max_size(mut self, max_size: usize) -> Self {
74        self.max_size = Some(max_size);
75        self
76    }
77
78    /// Set a custom storage backend
79    pub fn with_custom_storage(mut self, storage: Arc<dyn StateStorage + Send + Sync>) -> Self {
80        self.custom_storage = Some(storage);
81        self.storage_type = StateStorageType::Custom;
82        self
83    }
84
85    /// Create a storage instance from this configuration
86    pub fn create_storage(&self) -> Box<dyn StateStorage + Send + Sync> {
87        match self.storage_type {
88            StateStorageType::InMemory => {
89                let mut storage = InMemoryState::new(self.ttl);
90                if let Some(max_size) = self.max_size {
91                    storage = storage.with_max_size(max_size);
92                }
93                Box::new(storage)
94            }
95            StateStorageType::Custom => {
96                if let Some(ref custom_storage) = self.custom_storage {
97                    // Clone the Arc to get a new reference to the same storage
98                    let cloned_storage = custom_storage.clone();
99                    // Convert Arc to Box by dereferencing and boxing
100                    Box::new(ArcStorageWrapper(cloned_storage))
101                } else {
102                    // Fallback to in-memory if no custom storage is provided
103                    let mut storage = InMemoryState::new(self.ttl);
104                    if let Some(max_size) = self.max_size {
105                        storage = storage.with_max_size(max_size);
106                    }
107                    Box::new(storage)
108                }
109            }
110        }
111    }
112
113    /// Create a storage instance as Arc from this configuration
114    pub fn create_storage_arc(&self) -> Arc<dyn StateStorage + Send + Sync> {
115        match self.storage_type {
116            StateStorageType::InMemory => {
117                let mut storage = InMemoryState::new(self.ttl);
118                if let Some(max_size) = self.max_size {
119                    storage = storage.with_max_size(max_size);
120                }
121                Arc::new(storage)
122            }
123            StateStorageType::Custom => {
124                if let Some(ref custom_storage) = self.custom_storage {
125                    // Clone the Arc to get a new reference to the same storage
126                    custom_storage.clone()
127                } else {
128                    // Fallback to in-memory if no custom storage is provided
129                    let mut storage = InMemoryState::new(self.ttl);
130                    if let Some(max_size) = self.max_size {
131                        storage = storage.with_max_size(max_size);
132                    }
133                    Arc::new(storage)
134                }
135            }
136        }
137    }
138
139    /// Validate the configuration
140    pub fn validate(&self) -> Result<(), String> {
141        if self.ttl.is_zero() {
142            return Err("TTL cannot be zero".to_string());
143        }
144
145        if self.cleanup_interval.is_zero() {
146            return Err("Cleanup interval cannot be zero".to_string());
147        }
148
149        if self.cleanup_interval > self.ttl {
150            return Err("Cleanup interval should be less than or equal to TTL".to_string());
151        }
152
153        if let Some(max_size) = self.max_size {
154            if max_size == 0 {
155                return Err("Max size cannot be zero".to_string());
156            }
157        }
158
159        Ok(())
160    }
161}
162
163/// Wrapper to convert Arc<dyn StateStorage> to Box<dyn StateStorage>
164struct ArcStorageWrapper(Arc<dyn StateStorage + Send + Sync>);
165
166#[async_trait::async_trait]
167impl StateStorage for ArcStorageWrapper {
168    async fn get(&self, key: &str) -> Option<Vec<u8>> {
169        self.0.get(key).await
170    }
171
172    async fn set(
173        &self,
174        key: &str,
175        value: &[u8],
176    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
177        self.0.set(key, value).await
178    }
179
180    async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181        self.0.delete(key).await
182    }
183
184    async fn exists(&self, key: &str) -> bool {
185        self.0.exists(key).await
186    }
187
188    async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
189        self.0.clear().await
190    }
191}
192
193/// Builder for state configurations
194pub struct StateConfigBuilder {
195    config: StateConfig,
196}
197
198impl StateConfigBuilder {
199    pub fn new() -> Self {
200        Self {
201            config: StateConfig::default(),
202        }
203    }
204
205    pub fn storage_type(mut self, storage_type: StateStorageType) -> Self {
206        self.config.storage_type = storage_type;
207        self
208    }
209
210    pub fn ttl(mut self, ttl: Duration) -> Self {
211        self.config.ttl = ttl;
212        self
213    }
214
215    pub fn cleanup_interval(mut self, interval: Duration) -> Self {
216        self.config.cleanup_interval = interval;
217        self
218    }
219
220    pub fn max_size(mut self, max_size: usize) -> Self {
221        self.config.max_size = Some(max_size);
222        self
223    }
224
225    pub fn custom_storage(mut self, storage: Arc<dyn StateStorage + Send + Sync>) -> Self {
226        self.config.custom_storage = Some(storage);
227        self.config.storage_type = StateStorageType::Custom;
228        self
229    }
230
231    pub fn build(self) -> Result<StateConfig, String> {
232        self.config.validate()?;
233        Ok(self.config)
234    }
235}
236
237impl Default for StateConfigBuilder {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243/// Predefined state configurations for common use cases
244pub struct StateConfigs;
245
246impl StateConfigs {
247    /// Configuration for high-performance, in-memory state
248    pub fn high_performance() -> StateConfig {
249        StateConfigBuilder::new()
250            .storage_type(StateStorageType::InMemory)
251            .ttl(Duration::from_secs(60 * 60)) // 1 hour
252            .cleanup_interval(Duration::from_secs(60)) // 1 minute
253            .max_size(10000)
254            .build()
255            .unwrap()
256    }
257
258    /// Configuration for session state
259    pub fn session() -> StateConfig {
260        StateConfigBuilder::new()
261            .storage_type(StateStorageType::InMemory)
262            .ttl(Duration::from_secs(30 * 60)) // 30 minutes
263            .cleanup_interval(Duration::from_secs(5 * 60)) // 5 minutes
264            .max_size(1000)
265            .build()
266            .unwrap()
267    }
268
269    /// Configuration for short-lived state
270    pub fn short_lived() -> StateConfig {
271        StateConfigBuilder::new()
272            .storage_type(StateStorageType::InMemory)
273            .ttl(Duration::from_secs(5 * 60)) // 5 minutes
274            .cleanup_interval(Duration::from_secs(30)) // 30 seconds
275            .max_size(100)
276            .build()
277            .unwrap()
278    }
279
280    /// Configuration for long-lived state
281    pub fn long_lived() -> StateConfig {
282        StateConfigBuilder::new()
283            .storage_type(StateStorageType::InMemory)
284            .ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
285            .cleanup_interval(Duration::from_secs(60 * 60)) // 1 hour
286            .max_size(100000)
287            .build()
288            .unwrap()
289    }
290}