drasi_lib/state_store/mod.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! State Store Provider Module
16//!
17//! This module provides the `StateStoreProvider` trait that allows plugins
18//! (Sources, BootstrapProviders, and Reactions) to store and retrieve
19//! runtime state that can persist across runs of DrasiLib.
20//!
21//! # Architecture
22//!
23//! The state store plugin system follows pure dependency inversion:
24//! - **Lib** defines the `StateStoreProvider` trait and provides a default
25//! in-memory implementation (`MemoryStateStoreProvider`)
26//! - **External plugins** (in `components/state_stores/`) implement this trait
27//! for persistent storage
28//! - **Applications** optionally inject plugins into DrasiLib; if none provided,
29//! the in-memory default is used
30//!
31//! # Partitioning
32//!
33//! The state store supports partitioning via `StoreId`. Each plugin provides
34//! a unique `StoreId` (typically the plugin's ID) when interacting with the
35//! store. This ensures that different plugins don't interfere with each other's
36//! state.
37//!
38//! # Usage
39//!
40//! ## Without a plugin (uses in-memory default)
41//! ```ignore
42//! let drasi = DrasiLib::builder()
43//! .build()?;
44//! ```
45//!
46//! ## With an external plugin
47//! ```ignore
48//! use drasi_state_store_json::JsonStateStoreProvider;
49//!
50//! let state_store = JsonStateStoreProvider::new("/data/state");
51//! let drasi = DrasiLib::builder()
52//! .with_state_store_provider(Arc::new(state_store))
53//! .build()?;
54//! ```
55
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::sync::RwLock;
61
62/// Errors that can occur when interacting with a state store.
63///
64/// # Note on Missing Keys
65///
66/// Missing keys are typically represented as `Ok(None)` from `get()`, not as
67/// `KeyNotFound` errors. The `KeyNotFound` variant is available for custom
68/// implementations that need to distinguish between "key doesn't exist" and
69/// other error conditions, but the standard trait methods prefer returning
70/// `Option` values.
71#[derive(Error, Debug)]
72pub enum StateStoreError {
73 /// The requested key was not found in the store.
74 ///
75 /// Note: Standard implementations return `Ok(None)` for missing keys instead
76 /// of this error. This variant is provided for custom implementations that
77 /// need explicit error-based handling of missing keys.
78 #[error("Key not found: {0}")]
79 KeyNotFound(String),
80
81 /// Failed to serialize or deserialize data.
82 ///
83 /// This error typically occurs when stored data cannot be parsed or when
84 /// data being stored cannot be serialized to the expected format.
85 #[error("Serialization error: {0}")]
86 SerializationError(String),
87
88 /// Failed to read or write to the underlying storage.
89 ///
90 /// This error indicates I/O failures, database errors, or other storage
91 /// backend issues.
92 #[error("Storage error: {0}")]
93 StorageError(String),
94
95 /// Generic error for other failures.
96 ///
97 /// Use this for errors that don't fit the other categories.
98 #[error("State store error: {0}")]
99 Other(String),
100}
101
102/// Result type for state store operations
103pub type StateStoreResult<T> = Result<T, StateStoreError>;
104
105/// Trait defining the interface for state store providers.
106///
107/// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
108/// to persist runtime state that survives restarts of DrasiLib.
109///
110/// # Thread Safety
111///
112/// Implementations must be thread-safe and support concurrent access from
113/// multiple plugins.
114///
115/// # Partitioning
116///
117/// The state store uses `store_id` to partition data between different plugins.
118/// Each plugin should use a unique `store_id` (typically the plugin's ID) to
119/// avoid conflicts with other plugins.
120///
121/// # Example Implementation
122///
123/// ```ignore
124/// use drasi_lib::StateStoreProvider;
125/// use async_trait::async_trait;
126///
127/// pub struct MyStateStore {
128/// // implementation fields
129/// }
130///
131/// #[async_trait]
132/// impl StateStoreProvider for MyStateStore {
133/// async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
134/// // implementation
135/// }
136///
137/// async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
138/// // implementation
139/// }
140///
141/// async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
142/// // implementation
143/// }
144///
145/// // ... implement other methods
146/// }
147/// ```
148#[async_trait]
149pub trait StateStoreProvider: Send + Sync {
150 /// Get a single value by key from a store partition.
151 ///
152 /// # Arguments
153 /// * `store_id` - The partition identifier (typically the plugin ID)
154 /// * `key` - The key to retrieve
155 ///
156 /// # Returns
157 /// * `Ok(Some(value))` - The value was found
158 /// * `Ok(None)` - The key doesn't exist in the store
159 /// * `Err(e)` - An error occurred
160 async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>>;
161
162 /// Set a single value by key in a store partition.
163 ///
164 /// The state must be persisted before this method returns.
165 ///
166 /// # Arguments
167 /// * `store_id` - The partition identifier (typically the plugin ID)
168 /// * `key` - The key to set
169 /// * `value` - The value to store
170 ///
171 /// # Returns
172 /// * `Ok(())` - The value was successfully stored
173 /// * `Err(e)` - An error occurred
174 async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()>;
175
176 /// Delete a single key from a store partition.
177 ///
178 /// # Arguments
179 /// * `store_id` - The partition identifier (typically the plugin ID)
180 /// * `key` - The key to delete
181 ///
182 /// # Returns
183 /// * `Ok(true)` - The key existed and was deleted
184 /// * `Ok(false)` - The key didn't exist
185 /// * `Err(e)` - An error occurred
186 async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
187
188 /// Check if a key exists in a store partition without retrieving its value.
189 ///
190 /// This is more efficient than `get()` when you only need to check existence,
191 /// especially for large values.
192 ///
193 /// # Arguments
194 /// * `store_id` - The partition identifier (typically the plugin ID)
195 /// * `key` - The key to check
196 ///
197 /// # Returns
198 /// * `Ok(true)` - The key exists
199 /// * `Ok(false)` - The key doesn't exist
200 /// * `Err(e)` - An error occurred
201 async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
202
203 /// Get multiple values by keys from a store partition.
204 ///
205 /// # Arguments
206 /// * `store_id` - The partition identifier (typically the plugin ID)
207 /// * `keys` - The keys to retrieve
208 ///
209 /// # Returns
210 /// A HashMap mapping each found key to its value. Keys that don't exist
211 /// are simply not included in the result.
212 async fn get_many(
213 &self,
214 store_id: &str,
215 keys: &[&str],
216 ) -> StateStoreResult<HashMap<String, Vec<u8>>>;
217
218 /// Set multiple key-value pairs in a store partition.
219 ///
220 /// All values must be persisted before this method returns.
221 ///
222 /// # Arguments
223 /// * `store_id` - The partition identifier (typically the plugin ID)
224 /// * `entries` - The key-value pairs to store
225 ///
226 /// # Returns
227 /// * `Ok(())` - All values were successfully stored
228 /// * `Err(e)` - An error occurred (some values may have been stored)
229 async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()>;
230
231 /// Delete multiple keys from a store partition.
232 ///
233 /// # Arguments
234 /// * `store_id` - The partition identifier (typically the plugin ID)
235 /// * `keys` - The keys to delete
236 ///
237 /// # Returns
238 /// * `Ok(count)` - The number of keys that were deleted
239 /// * `Err(e)` - An error occurred (some keys may have been deleted)
240 async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize>;
241
242 /// Delete all data for a store partition.
243 ///
244 /// # Arguments
245 /// * `store_id` - The partition identifier to clear
246 ///
247 /// # Returns
248 /// * `Ok(count)` - The number of keys that were deleted
249 /// * `Err(e)` - An error occurred
250 async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize>;
251
252 /// List all keys in a store partition.
253 ///
254 /// # Arguments
255 /// * `store_id` - The partition identifier
256 ///
257 /// # Returns
258 /// A vector of all keys in the store partition
259 async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>>;
260
261 /// Check if a store partition exists and has any data.
262 ///
263 /// # Arguments
264 /// * `store_id` - The partition identifier to check
265 ///
266 /// # Returns
267 /// * `Ok(true)` - The store partition exists and has at least one key
268 /// * `Ok(false)` - The store partition doesn't exist or is empty
269 /// * `Err(e)` - An error occurred while checking
270 async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool>;
271
272 /// Get the number of keys in a store partition.
273 ///
274 /// # Arguments
275 /// * `store_id` - The partition identifier
276 ///
277 /// # Returns
278 /// * `Ok(count)` - The number of keys in the store
279 /// * `Err(e)` - An error occurred
280 async fn key_count(&self, store_id: &str) -> StateStoreResult<usize>;
281
282 /// Force pending writes to persistent storage.
283 ///
284 /// For in-memory stores, this is a no-op. For persistent stores, this
285 /// ensures all data is durably written to disk.
286 ///
287 /// # Returns
288 /// * `Ok(())` - Sync completed successfully
289 /// * `Err(e)` - An error occurred during sync
290 async fn sync(&self) -> StateStoreResult<()> {
291 Ok(())
292 }
293
294 /// Whether this state store provides durable (persistent) storage.
295 ///
296 /// Returns `true` for stores backed by disk or remote storage (e.g., redb, Redis).
297 /// Returns `false` for in-memory stores (data lost on restart).
298 ///
299 /// The host uses this to validate that durable reactions (`is_durable() == true`)
300 /// are paired with a durable state store.
301 fn is_durable(&self) -> bool {
302 false
303 }
304}
305
306/// In-memory implementation of StateStoreProvider.
307///
308/// This is the default state store provider used when no external provider
309/// is configured. Data is stored in memory and does not persist across restarts.
310///
311/// # Thread Safety
312///
313/// This implementation is thread-safe and supports concurrent access from
314/// multiple plugins.
315///
316/// # Usage
317///
318/// ```ignore
319/// use drasi_lib::MemoryStateStoreProvider;
320///
321/// let provider = MemoryStateStoreProvider::new();
322///
323/// // Store some data
324/// provider.set("my-plugin", "key1", b"value1".to_vec()).await?;
325///
326/// // Retrieve data
327/// if let Some(value) = provider.get("my-plugin", "key1").await? {
328/// println!("Value: {:?}", value);
329/// }
330/// ```
331pub struct MemoryStateStoreProvider {
332 /// Data storage: store_id -> (key -> value)
333 stores: Arc<RwLock<HashMap<String, HashMap<String, Vec<u8>>>>>,
334}
335
336impl Default for MemoryStateStoreProvider {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl MemoryStateStoreProvider {
343 /// Create a new in-memory state store provider.
344 pub fn new() -> Self {
345 Self {
346 stores: Arc::new(RwLock::new(HashMap::new())),
347 }
348 }
349}
350
351#[async_trait]
352impl StateStoreProvider for MemoryStateStoreProvider {
353 async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
354 let stores = self.stores.read().await;
355 Ok(stores
356 .get(store_id)
357 .and_then(|store| store.get(key).cloned()))
358 }
359
360 async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
361 let mut stores = self.stores.write().await;
362 stores
363 .entry(store_id.to_string())
364 .or_default()
365 .insert(key.to_string(), value);
366 Ok(())
367 }
368
369 async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
370 let mut stores = self.stores.write().await;
371 if let Some(store) = stores.get_mut(store_id) {
372 let existed = store.remove(key).is_some();
373 // Clean up empty stores
374 if store.is_empty() {
375 stores.remove(store_id);
376 }
377 Ok(existed)
378 } else {
379 Ok(false)
380 }
381 }
382
383 async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
384 let stores = self.stores.read().await;
385 Ok(stores
386 .get(store_id)
387 .is_some_and(|store| store.contains_key(key)))
388 }
389
390 async fn get_many(
391 &self,
392 store_id: &str,
393 keys: &[&str],
394 ) -> StateStoreResult<HashMap<String, Vec<u8>>> {
395 let stores = self.stores.read().await;
396 let mut result = HashMap::new();
397
398 if let Some(store) = stores.get(store_id) {
399 for key in keys {
400 if let Some(value) = store.get(*key) {
401 result.insert((*key).to_string(), value.clone());
402 }
403 }
404 }
405
406 Ok(result)
407 }
408
409 async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()> {
410 let mut stores = self.stores.write().await;
411 let store = stores.entry(store_id.to_string()).or_default();
412
413 for (key, value) in entries {
414 store.insert((*key).to_string(), value.to_vec());
415 }
416
417 Ok(())
418 }
419
420 async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize> {
421 let mut stores = self.stores.write().await;
422 let mut count = 0;
423
424 if let Some(store) = stores.get_mut(store_id) {
425 for key in keys {
426 if store.remove(*key).is_some() {
427 count += 1;
428 }
429 }
430 // Clean up empty stores
431 if store.is_empty() {
432 stores.remove(store_id);
433 }
434 }
435
436 Ok(count)
437 }
438
439 async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize> {
440 let mut stores = self.stores.write().await;
441 if let Some(store) = stores.remove(store_id) {
442 Ok(store.len())
443 } else {
444 Ok(0)
445 }
446 }
447
448 async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>> {
449 let stores = self.stores.read().await;
450 Ok(stores
451 .get(store_id)
452 .map(|store| store.keys().cloned().collect())
453 .unwrap_or_default())
454 }
455
456 async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool> {
457 let stores = self.stores.read().await;
458 Ok(stores.get(store_id).is_some_and(|store| !store.is_empty()))
459 }
460
461 async fn key_count(&self, store_id: &str) -> StateStoreResult<usize> {
462 let stores = self.stores.read().await;
463 Ok(stores.get(store_id).map(|store| store.len()).unwrap_or(0))
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[tokio::test]
472 async fn test_memory_state_store_get_set() {
473 let provider = MemoryStateStoreProvider::new();
474
475 // Set a value
476 provider
477 .set("store1", "key1", b"value1".to_vec())
478 .await
479 .unwrap();
480
481 // Get the value
482 let result = provider.get("store1", "key1").await.unwrap();
483 assert_eq!(result, Some(b"value1".to_vec()));
484
485 // Get non-existent key
486 let result = provider.get("store1", "nonexistent").await.unwrap();
487 assert_eq!(result, None);
488
489 // Get from non-existent store
490 let result = provider.get("nonexistent", "key1").await.unwrap();
491 assert_eq!(result, None);
492 }
493
494 #[tokio::test]
495 async fn test_memory_state_store_delete() {
496 let provider = MemoryStateStoreProvider::new();
497
498 // Set and delete
499 provider
500 .set("store1", "key1", b"value1".to_vec())
501 .await
502 .unwrap();
503 let deleted = provider.delete("store1", "key1").await.unwrap();
504 assert!(deleted);
505
506 // Verify deletion
507 let result = provider.get("store1", "key1").await.unwrap();
508 assert_eq!(result, None);
509
510 // Delete non-existent key
511 let deleted = provider.delete("store1", "nonexistent").await.unwrap();
512 assert!(!deleted);
513 }
514
515 #[tokio::test]
516 async fn test_memory_state_store_contains_key() {
517 let provider = MemoryStateStoreProvider::new();
518
519 // Key doesn't exist
520 assert!(!provider.contains_key("store1", "key1").await.unwrap());
521
522 // Set a value
523 provider
524 .set("store1", "key1", b"value1".to_vec())
525 .await
526 .unwrap();
527
528 // Key now exists
529 assert!(provider.contains_key("store1", "key1").await.unwrap());
530
531 // Different key doesn't exist
532 assert!(!provider.contains_key("store1", "key2").await.unwrap());
533 }
534
535 #[tokio::test]
536 async fn test_memory_state_store_get_many() {
537 let provider = MemoryStateStoreProvider::new();
538
539 // Set multiple values
540 provider
541 .set("store1", "key1", b"value1".to_vec())
542 .await
543 .unwrap();
544 provider
545 .set("store1", "key2", b"value2".to_vec())
546 .await
547 .unwrap();
548
549 // Get multiple (including non-existent key)
550 let result = provider
551 .get_many("store1", &["key1", "key2", "nonexistent"])
552 .await
553 .unwrap();
554 assert_eq!(result.len(), 2);
555 assert_eq!(result.get("key1"), Some(&b"value1".to_vec()));
556 assert_eq!(result.get("key2"), Some(&b"value2".to_vec()));
557 }
558
559 #[tokio::test]
560 async fn test_memory_state_store_set_many() {
561 let provider = MemoryStateStoreProvider::new();
562
563 // Set multiple values
564 provider
565 .set_many("store1", &[("key1", b"value1"), ("key2", b"value2")])
566 .await
567 .unwrap();
568
569 // Verify
570 let result = provider.get("store1", "key1").await.unwrap();
571 assert_eq!(result, Some(b"value1".to_vec()));
572 let result = provider.get("store1", "key2").await.unwrap();
573 assert_eq!(result, Some(b"value2".to_vec()));
574 }
575
576 #[tokio::test]
577 async fn test_memory_state_store_delete_many() {
578 let provider = MemoryStateStoreProvider::new();
579
580 // Set multiple values
581 provider
582 .set_many(
583 "store1",
584 &[("key1", b"value1"), ("key2", b"value2"), ("key3", b"value3")],
585 )
586 .await
587 .unwrap();
588
589 // Delete some
590 let count = provider
591 .delete_many("store1", &["key1", "key2", "nonexistent"])
592 .await
593 .unwrap();
594 assert_eq!(count, 2);
595
596 // Verify
597 let result = provider.get("store1", "key1").await.unwrap();
598 assert_eq!(result, None);
599 let result = provider.get("store1", "key3").await.unwrap();
600 assert_eq!(result, Some(b"value3".to_vec()));
601 }
602
603 #[tokio::test]
604 async fn test_memory_state_store_clear_store() {
605 let provider = MemoryStateStoreProvider::new();
606
607 // Set values in multiple stores
608 provider
609 .set("store1", "key1", b"value1".to_vec())
610 .await
611 .unwrap();
612 provider
613 .set("store1", "key2", b"value2".to_vec())
614 .await
615 .unwrap();
616 provider
617 .set("store2", "key1", b"value1".to_vec())
618 .await
619 .unwrap();
620
621 // Clear store1
622 let count = provider.clear_store("store1").await.unwrap();
623 assert_eq!(count, 2);
624
625 // Verify store1 is cleared
626 let result = provider.get("store1", "key1").await.unwrap();
627 assert_eq!(result, None);
628
629 // Verify store2 is intact
630 let result = provider.get("store2", "key1").await.unwrap();
631 assert_eq!(result, Some(b"value1".to_vec()));
632 }
633
634 #[tokio::test]
635 async fn test_memory_state_store_list_keys() {
636 let provider = MemoryStateStoreProvider::new();
637
638 // Set values
639 provider
640 .set("store1", "key1", b"value1".to_vec())
641 .await
642 .unwrap();
643 provider
644 .set("store1", "key2", b"value2".to_vec())
645 .await
646 .unwrap();
647
648 // List keys
649 let mut keys = provider.list_keys("store1").await.unwrap();
650 keys.sort();
651 assert_eq!(keys, vec!["key1", "key2"]);
652
653 // List keys from non-existent store
654 let keys = provider.list_keys("nonexistent").await.unwrap();
655 assert!(keys.is_empty());
656 }
657
658 #[tokio::test]
659 async fn test_memory_state_store_store_exists() {
660 let provider = MemoryStateStoreProvider::new();
661
662 // Non-existent store
663 assert!(!provider.store_exists("store1").await.unwrap());
664
665 // Set a value
666 provider
667 .set("store1", "key1", b"value1".to_vec())
668 .await
669 .unwrap();
670 assert!(provider.store_exists("store1").await.unwrap());
671
672 // Delete the value (store should be cleaned up)
673 provider.delete("store1", "key1").await.unwrap();
674 assert!(!provider.store_exists("store1").await.unwrap());
675 }
676
677 #[tokio::test]
678 async fn test_memory_state_store_key_count() {
679 let provider = MemoryStateStoreProvider::new();
680
681 // Empty store has 0 keys
682 assert_eq!(provider.key_count("store1").await.unwrap(), 0);
683
684 // Set a value
685 provider
686 .set("store1", "key1", b"value1".to_vec())
687 .await
688 .unwrap();
689 assert_eq!(provider.key_count("store1").await.unwrap(), 1);
690
691 // Set another value
692 provider
693 .set("store1", "key2", b"value2".to_vec())
694 .await
695 .unwrap();
696 assert_eq!(provider.key_count("store1").await.unwrap(), 2);
697 }
698
699 #[tokio::test]
700 async fn test_memory_state_store_partitioning() {
701 let provider = MemoryStateStoreProvider::new();
702
703 // Set same key in different stores
704 provider
705 .set("store1", "key", b"value1".to_vec())
706 .await
707 .unwrap();
708 provider
709 .set("store2", "key", b"value2".to_vec())
710 .await
711 .unwrap();
712
713 // Verify they don't interfere
714 let result1 = provider.get("store1", "key").await.unwrap();
715 let result2 = provider.get("store2", "key").await.unwrap();
716 assert_eq!(result1, Some(b"value1".to_vec()));
717 assert_eq!(result2, Some(b"value2".to_vec()));
718 }
719
720 #[tokio::test]
721 async fn test_memory_state_store_sync() {
722 let provider = MemoryStateStoreProvider::new();
723
724 // sync is a no-op for memory provider but should succeed
725 provider.sync().await.unwrap();
726
727 // Set some data and sync again
728 provider
729 .set("store1", "key1", b"value1".to_vec())
730 .await
731 .unwrap();
732 provider.sync().await.unwrap();
733 }
734}