rs2_stream/state/
config.rs1use super::storage::InMemoryState;
2use super::traits::StateStorage;
3use super::traits::StateStorageType;
4use std::sync::Arc;
5use std::time::Duration;
6
7#[derive(Clone)]
9pub struct StateConfig {
10 pub storage_type: StateStorageType,
11 pub ttl: Duration,
12 pub cleanup_interval: Duration,
13 pub max_size: Option<usize>,
14 pub custom_storage: Option<Arc<dyn StateStorage + Send + Sync>>,
15}
16
17impl std::fmt::Debug for StateConfig {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 f.debug_struct("StateConfig")
20 .field("storage_type", &self.storage_type)
21 .field("ttl", &self.ttl)
22 .field("cleanup_interval", &self.cleanup_interval)
23 .field("max_size", &self.max_size)
24 .field(
25 "custom_storage",
26 &if self.custom_storage.is_some() {
27 "Some(CustomStorage)"
28 } else {
29 "None"
30 },
31 )
32 .finish()
33 }
34}
35
36impl Default for StateConfig {
37 fn default() -> Self {
38 Self {
39 storage_type: StateStorageType::InMemory,
40 ttl: Duration::from_secs(24 * 60 * 60), cleanup_interval: Duration::from_secs(5 * 60), max_size: None,
43 custom_storage: None,
44 }
45 }
46}
47
48impl StateConfig {
49 pub fn new() -> Self {
51 Self::default()
52 }
53
54 pub fn storage_type(mut self, storage_type: StateStorageType) -> Self {
56 self.storage_type = storage_type;
57 self
58 }
59
60 pub fn ttl(mut self, ttl: Duration) -> Self {
62 self.ttl = ttl;
63 self
64 }
65
66 pub fn cleanup_interval(mut self, interval: Duration) -> Self {
68 self.cleanup_interval = interval;
69 self
70 }
71
72 pub fn max_size(mut self, max_size: usize) -> Self {
74 self.max_size = Some(max_size);
75 self
76 }
77
78 pub fn with_custom_storage(mut self, storage: Arc<dyn StateStorage + Send + Sync>) -> Self {
80 self.custom_storage = Some(storage);
81 self.storage_type = StateStorageType::Custom;
82 self
83 }
84
85 pub fn create_storage(&self) -> Box<dyn StateStorage + Send + Sync> {
87 match self.storage_type {
88 StateStorageType::InMemory => {
89 let mut storage = InMemoryState::new(self.ttl);
90 if let Some(max_size) = self.max_size {
91 storage = storage.with_max_size(max_size);
92 }
93 Box::new(storage)
94 }
95 StateStorageType::Custom => {
96 if let Some(ref custom_storage) = self.custom_storage {
97 let cloned_storage = custom_storage.clone();
99 Box::new(ArcStorageWrapper(cloned_storage))
101 } else {
102 let mut storage = InMemoryState::new(self.ttl);
104 if let Some(max_size) = self.max_size {
105 storage = storage.with_max_size(max_size);
106 }
107 Box::new(storage)
108 }
109 }
110 }
111 }
112
113 pub fn create_storage_arc(&self) -> Arc<dyn StateStorage + Send + Sync> {
115 match self.storage_type {
116 StateStorageType::InMemory => {
117 let mut storage = InMemoryState::new(self.ttl);
118 if let Some(max_size) = self.max_size {
119 storage = storage.with_max_size(max_size);
120 }
121 Arc::new(storage)
122 }
123 StateStorageType::Custom => {
124 if let Some(ref custom_storage) = self.custom_storage {
125 custom_storage.clone()
127 } else {
128 let mut storage = InMemoryState::new(self.ttl);
130 if let Some(max_size) = self.max_size {
131 storage = storage.with_max_size(max_size);
132 }
133 Arc::new(storage)
134 }
135 }
136 }
137 }
138
139 pub fn validate(&self) -> Result<(), String> {
141 if self.ttl.is_zero() {
142 return Err("TTL cannot be zero".to_string());
143 }
144
145 if self.cleanup_interval.is_zero() {
146 return Err("Cleanup interval cannot be zero".to_string());
147 }
148
149 if self.cleanup_interval > self.ttl {
150 return Err("Cleanup interval should be less than or equal to TTL".to_string());
151 }
152
153 if let Some(max_size) = self.max_size {
154 if max_size == 0 {
155 return Err("Max size cannot be zero".to_string());
156 }
157 }
158
159 Ok(())
160 }
161}
162
163struct ArcStorageWrapper(Arc<dyn StateStorage + Send + Sync>);
165
166#[async_trait::async_trait]
167impl StateStorage for ArcStorageWrapper {
168 async fn get(&self, key: &str) -> Option<Vec<u8>> {
169 self.0.get(key).await
170 }
171
172 async fn set(
173 &self,
174 key: &str,
175 value: &[u8],
176 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
177 self.0.set(key, value).await
178 }
179
180 async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181 self.0.delete(key).await
182 }
183
184 async fn exists(&self, key: &str) -> bool {
185 self.0.exists(key).await
186 }
187
188 async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
189 self.0.clear().await
190 }
191}
192
193pub struct StateConfigBuilder {
195 config: StateConfig,
196}
197
198impl StateConfigBuilder {
199 pub fn new() -> Self {
200 Self {
201 config: StateConfig::default(),
202 }
203 }
204
205 pub fn storage_type(mut self, storage_type: StateStorageType) -> Self {
206 self.config.storage_type = storage_type;
207 self
208 }
209
210 pub fn ttl(mut self, ttl: Duration) -> Self {
211 self.config.ttl = ttl;
212 self
213 }
214
215 pub fn cleanup_interval(mut self, interval: Duration) -> Self {
216 self.config.cleanup_interval = interval;
217 self
218 }
219
220 pub fn max_size(mut self, max_size: usize) -> Self {
221 self.config.max_size = Some(max_size);
222 self
223 }
224
225 pub fn custom_storage(mut self, storage: Arc<dyn StateStorage + Send + Sync>) -> Self {
226 self.config.custom_storage = Some(storage);
227 self.config.storage_type = StateStorageType::Custom;
228 self
229 }
230
231 pub fn build(self) -> Result<StateConfig, String> {
232 self.config.validate()?;
233 Ok(self.config)
234 }
235}
236
237impl Default for StateConfigBuilder {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243pub struct StateConfigs;
245
246impl StateConfigs {
247 pub fn high_performance() -> StateConfig {
249 StateConfigBuilder::new()
250 .storage_type(StateStorageType::InMemory)
251 .ttl(Duration::from_secs(60 * 60)) .cleanup_interval(Duration::from_secs(60)) .max_size(10000)
254 .build()
255 .unwrap()
256 }
257
258 pub fn session() -> StateConfig {
260 StateConfigBuilder::new()
261 .storage_type(StateStorageType::InMemory)
262 .ttl(Duration::from_secs(30 * 60)) .cleanup_interval(Duration::from_secs(5 * 60)) .max_size(1000)
265 .build()
266 .unwrap()
267 }
268
269 pub fn short_lived() -> StateConfig {
271 StateConfigBuilder::new()
272 .storage_type(StateStorageType::InMemory)
273 .ttl(Duration::from_secs(5 * 60)) .cleanup_interval(Duration::from_secs(30)) .max_size(100)
276 .build()
277 .unwrap()
278 }
279
280 pub fn long_lived() -> StateConfig {
282 StateConfigBuilder::new()
283 .storage_type(StateStorageType::InMemory)
284 .ttl(Duration::from_secs(7 * 24 * 60 * 60)) .cleanup_interval(Duration::from_secs(60 * 60)) .max_size(100000)
287 .build()
288 .unwrap()
289 }
290}