Skip to main content

drasi_lib/state_store/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! State Store Provider Module
16//!
17//! This module provides the `StateStoreProvider` trait that allows plugins
18//! (Sources, BootstrapProviders, and Reactions) to store and retrieve
19//! runtime state that can persist across runs of DrasiLib.
20//!
21//! # Architecture
22//!
23//! The state store plugin system follows pure dependency inversion:
24//! - **Lib** defines the `StateStoreProvider` trait and provides a default
25//!   in-memory implementation (`MemoryStateStoreProvider`)
26//! - **External plugins** (in `components/state_stores/`) implement this trait
27//!   for persistent storage
28//! - **Applications** optionally inject plugins into DrasiLib; if none provided,
29//!   the in-memory default is used
30//!
31//! # Partitioning
32//!
33//! The state store supports partitioning via `StoreId`. Each plugin provides
34//! a unique `StoreId` (typically the plugin's ID) when interacting with the
35//! store. This ensures that different plugins don't interfere with each other's
36//! state.
37//!
38//! # Usage
39//!
40//! ## Without a plugin (uses in-memory default)
41//! ```ignore
42//! let drasi = DrasiLib::builder()
43//!     .build()?;
44//! ```
45//!
46//! ## With an external plugin
47//! ```ignore
48//! use drasi_state_store_json::JsonStateStoreProvider;
49//!
50//! let state_store = JsonStateStoreProvider::new("/data/state");
51//! let drasi = DrasiLib::builder()
52//!     .with_state_store_provider(Arc::new(state_store))
53//!     .build()?;
54//! ```
55
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::sync::RwLock;
61
62/// Errors that can occur when interacting with a state store.
63///
64/// # Note on Missing Keys
65///
66/// Missing keys are typically represented as `Ok(None)` from `get()`, not as
67/// `KeyNotFound` errors. The `KeyNotFound` variant is available for custom
68/// implementations that need to distinguish between "key doesn't exist" and
69/// other error conditions, but the standard trait methods prefer returning
70/// `Option` values.
71#[derive(Error, Debug)]
72pub enum StateStoreError {
73    /// The requested key was not found in the store.
74    ///
75    /// Note: Standard implementations return `Ok(None)` for missing keys instead
76    /// of this error. This variant is provided for custom implementations that
77    /// need explicit error-based handling of missing keys.
78    #[error("Key not found: {0}")]
79    KeyNotFound(String),
80
81    /// Failed to serialize or deserialize data.
82    ///
83    /// This error typically occurs when stored data cannot be parsed or when
84    /// data being stored cannot be serialized to the expected format.
85    #[error("Serialization error: {0}")]
86    SerializationError(String),
87
88    /// Failed to read or write to the underlying storage.
89    ///
90    /// This error indicates I/O failures, database errors, or other storage
91    /// backend issues.
92    #[error("Storage error: {0}")]
93    StorageError(String),
94
95    /// Generic error for other failures.
96    ///
97    /// Use this for errors that don't fit the other categories.
98    #[error("State store error: {0}")]
99    Other(String),
100}
101
102/// Result type for state store operations
103pub type StateStoreResult<T> = Result<T, StateStoreError>;
104
105/// Trait defining the interface for state store providers.
106///
107/// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
108/// to persist runtime state that survives restarts of DrasiLib.
109///
110/// # Thread Safety
111///
112/// Implementations must be thread-safe and support concurrent access from
113/// multiple plugins.
114///
115/// # Partitioning
116///
117/// The state store uses `store_id` to partition data between different plugins.
118/// Each plugin should use a unique `store_id` (typically the plugin's ID) to
119/// avoid conflicts with other plugins.
120///
121/// # Example Implementation
122///
123/// ```ignore
124/// use drasi_lib::StateStoreProvider;
125/// use async_trait::async_trait;
126///
127/// pub struct MyStateStore {
128///     // implementation fields
129/// }
130///
131/// #[async_trait]
132/// impl StateStoreProvider for MyStateStore {
133///     async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
134///         // implementation
135///     }
136///
137///     async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
138///         // implementation
139///     }
140///
141///     async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
142///         // implementation
143///     }
144///
145///     // ... implement other methods
146/// }
147/// ```
148#[async_trait]
149pub trait StateStoreProvider: Send + Sync {
150    /// Get a single value by key from a store partition.
151    ///
152    /// # Arguments
153    /// * `store_id` - The partition identifier (typically the plugin ID)
154    /// * `key` - The key to retrieve
155    ///
156    /// # Returns
157    /// * `Ok(Some(value))` - The value was found
158    /// * `Ok(None)` - The key doesn't exist in the store
159    /// * `Err(e)` - An error occurred
160    async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>>;
161
162    /// Set a single value by key in a store partition.
163    ///
164    /// The state must be persisted before this method returns.
165    ///
166    /// # Arguments
167    /// * `store_id` - The partition identifier (typically the plugin ID)
168    /// * `key` - The key to set
169    /// * `value` - The value to store
170    ///
171    /// # Returns
172    /// * `Ok(())` - The value was successfully stored
173    /// * `Err(e)` - An error occurred
174    async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()>;
175
176    /// Delete a single key from a store partition.
177    ///
178    /// # Arguments
179    /// * `store_id` - The partition identifier (typically the plugin ID)
180    /// * `key` - The key to delete
181    ///
182    /// # Returns
183    /// * `Ok(true)` - The key existed and was deleted
184    /// * `Ok(false)` - The key didn't exist
185    /// * `Err(e)` - An error occurred
186    async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
187
188    /// Check if a key exists in a store partition without retrieving its value.
189    ///
190    /// This is more efficient than `get()` when you only need to check existence,
191    /// especially for large values.
192    ///
193    /// # Arguments
194    /// * `store_id` - The partition identifier (typically the plugin ID)
195    /// * `key` - The key to check
196    ///
197    /// # Returns
198    /// * `Ok(true)` - The key exists
199    /// * `Ok(false)` - The key doesn't exist
200    /// * `Err(e)` - An error occurred
201    async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
202
203    /// Get multiple values by keys from a store partition.
204    ///
205    /// # Arguments
206    /// * `store_id` - The partition identifier (typically the plugin ID)
207    /// * `keys` - The keys to retrieve
208    ///
209    /// # Returns
210    /// A HashMap mapping each found key to its value. Keys that don't exist
211    /// are simply not included in the result.
212    async fn get_many(
213        &self,
214        store_id: &str,
215        keys: &[&str],
216    ) -> StateStoreResult<HashMap<String, Vec<u8>>>;
217
218    /// Set multiple key-value pairs in a store partition.
219    ///
220    /// All values must be persisted before this method returns.
221    ///
222    /// # Arguments
223    /// * `store_id` - The partition identifier (typically the plugin ID)
224    /// * `entries` - The key-value pairs to store
225    ///
226    /// # Returns
227    /// * `Ok(())` - All values were successfully stored
228    /// * `Err(e)` - An error occurred (some values may have been stored)
229    async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()>;
230
231    /// Delete multiple keys from a store partition.
232    ///
233    /// # Arguments
234    /// * `store_id` - The partition identifier (typically the plugin ID)
235    /// * `keys` - The keys to delete
236    ///
237    /// # Returns
238    /// * `Ok(count)` - The number of keys that were deleted
239    /// * `Err(e)` - An error occurred (some keys may have been deleted)
240    async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize>;
241
242    /// Delete all data for a store partition.
243    ///
244    /// # Arguments
245    /// * `store_id` - The partition identifier to clear
246    ///
247    /// # Returns
248    /// * `Ok(count)` - The number of keys that were deleted
249    /// * `Err(e)` - An error occurred
250    async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize>;
251
252    /// List all keys in a store partition.
253    ///
254    /// # Arguments
255    /// * `store_id` - The partition identifier
256    ///
257    /// # Returns
258    /// A vector of all keys in the store partition
259    async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>>;
260
261    /// Check if a store partition exists and has any data.
262    ///
263    /// # Arguments
264    /// * `store_id` - The partition identifier to check
265    ///
266    /// # Returns
267    /// * `Ok(true)` - The store partition exists and has at least one key
268    /// * `Ok(false)` - The store partition doesn't exist or is empty
269    /// * `Err(e)` - An error occurred while checking
270    async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool>;
271
272    /// Get the number of keys in a store partition.
273    ///
274    /// # Arguments
275    /// * `store_id` - The partition identifier
276    ///
277    /// # Returns
278    /// * `Ok(count)` - The number of keys in the store
279    /// * `Err(e)` - An error occurred
280    async fn key_count(&self, store_id: &str) -> StateStoreResult<usize>;
281
282    /// Force pending writes to persistent storage.
283    ///
284    /// For in-memory stores, this is a no-op. For persistent stores, this
285    /// ensures all data is durably written to disk.
286    ///
287    /// # Returns
288    /// * `Ok(())` - Sync completed successfully
289    /// * `Err(e)` - An error occurred during sync
290    async fn sync(&self) -> StateStoreResult<()> {
291        Ok(())
292    }
293
294    /// Whether this state store provides durable (persistent) storage.
295    ///
296    /// Returns `true` for stores backed by disk or remote storage (e.g., redb, Redis).
297    /// Returns `false` for in-memory stores (data lost on restart).
298    ///
299    /// The host uses this to validate that durable reactions (`is_durable() == true`)
300    /// are paired with a durable state store.
301    fn is_durable(&self) -> bool {
302        false
303    }
304}
305
306/// In-memory implementation of StateStoreProvider.
307///
308/// This is the default state store provider used when no external provider
309/// is configured. Data is stored in memory and does not persist across restarts.
310///
311/// # Thread Safety
312///
313/// This implementation is thread-safe and supports concurrent access from
314/// multiple plugins.
315///
316/// # Usage
317///
318/// ```ignore
319/// use drasi_lib::MemoryStateStoreProvider;
320///
321/// let provider = MemoryStateStoreProvider::new();
322///
323/// // Store some data
324/// provider.set("my-plugin", "key1", b"value1".to_vec()).await?;
325///
326/// // Retrieve data
327/// if let Some(value) = provider.get("my-plugin", "key1").await? {
328///     println!("Value: {:?}", value);
329/// }
330/// ```
331pub struct MemoryStateStoreProvider {
332    /// Data storage: store_id -> (key -> value)
333    stores: Arc<RwLock<HashMap<String, HashMap<String, Vec<u8>>>>>,
334}
335
336impl Default for MemoryStateStoreProvider {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342impl MemoryStateStoreProvider {
343    /// Create a new in-memory state store provider.
344    pub fn new() -> Self {
345        Self {
346            stores: Arc::new(RwLock::new(HashMap::new())),
347        }
348    }
349}
350
351#[async_trait]
352impl StateStoreProvider for MemoryStateStoreProvider {
353    async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
354        let stores = self.stores.read().await;
355        Ok(stores
356            .get(store_id)
357            .and_then(|store| store.get(key).cloned()))
358    }
359
360    async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
361        let mut stores = self.stores.write().await;
362        stores
363            .entry(store_id.to_string())
364            .or_default()
365            .insert(key.to_string(), value);
366        Ok(())
367    }
368
369    async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
370        let mut stores = self.stores.write().await;
371        if let Some(store) = stores.get_mut(store_id) {
372            let existed = store.remove(key).is_some();
373            // Clean up empty stores
374            if store.is_empty() {
375                stores.remove(store_id);
376            }
377            Ok(existed)
378        } else {
379            Ok(false)
380        }
381    }
382
383    async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
384        let stores = self.stores.read().await;
385        Ok(stores
386            .get(store_id)
387            .is_some_and(|store| store.contains_key(key)))
388    }
389
390    async fn get_many(
391        &self,
392        store_id: &str,
393        keys: &[&str],
394    ) -> StateStoreResult<HashMap<String, Vec<u8>>> {
395        let stores = self.stores.read().await;
396        let mut result = HashMap::new();
397
398        if let Some(store) = stores.get(store_id) {
399            for key in keys {
400                if let Some(value) = store.get(*key) {
401                    result.insert((*key).to_string(), value.clone());
402                }
403            }
404        }
405
406        Ok(result)
407    }
408
409    async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()> {
410        let mut stores = self.stores.write().await;
411        let store = stores.entry(store_id.to_string()).or_default();
412
413        for (key, value) in entries {
414            store.insert((*key).to_string(), value.to_vec());
415        }
416
417        Ok(())
418    }
419
420    async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize> {
421        let mut stores = self.stores.write().await;
422        let mut count = 0;
423
424        if let Some(store) = stores.get_mut(store_id) {
425            for key in keys {
426                if store.remove(*key).is_some() {
427                    count += 1;
428                }
429            }
430            // Clean up empty stores
431            if store.is_empty() {
432                stores.remove(store_id);
433            }
434        }
435
436        Ok(count)
437    }
438
439    async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize> {
440        let mut stores = self.stores.write().await;
441        if let Some(store) = stores.remove(store_id) {
442            Ok(store.len())
443        } else {
444            Ok(0)
445        }
446    }
447
448    async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>> {
449        let stores = self.stores.read().await;
450        Ok(stores
451            .get(store_id)
452            .map(|store| store.keys().cloned().collect())
453            .unwrap_or_default())
454    }
455
456    async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool> {
457        let stores = self.stores.read().await;
458        Ok(stores.get(store_id).is_some_and(|store| !store.is_empty()))
459    }
460
461    async fn key_count(&self, store_id: &str) -> StateStoreResult<usize> {
462        let stores = self.stores.read().await;
463        Ok(stores.get(store_id).map(|store| store.len()).unwrap_or(0))
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[tokio::test]
472    async fn test_memory_state_store_get_set() {
473        let provider = MemoryStateStoreProvider::new();
474
475        // Set a value
476        provider
477            .set("store1", "key1", b"value1".to_vec())
478            .await
479            .unwrap();
480
481        // Get the value
482        let result = provider.get("store1", "key1").await.unwrap();
483        assert_eq!(result, Some(b"value1".to_vec()));
484
485        // Get non-existent key
486        let result = provider.get("store1", "nonexistent").await.unwrap();
487        assert_eq!(result, None);
488
489        // Get from non-existent store
490        let result = provider.get("nonexistent", "key1").await.unwrap();
491        assert_eq!(result, None);
492    }
493
494    #[tokio::test]
495    async fn test_memory_state_store_delete() {
496        let provider = MemoryStateStoreProvider::new();
497
498        // Set and delete
499        provider
500            .set("store1", "key1", b"value1".to_vec())
501            .await
502            .unwrap();
503        let deleted = provider.delete("store1", "key1").await.unwrap();
504        assert!(deleted);
505
506        // Verify deletion
507        let result = provider.get("store1", "key1").await.unwrap();
508        assert_eq!(result, None);
509
510        // Delete non-existent key
511        let deleted = provider.delete("store1", "nonexistent").await.unwrap();
512        assert!(!deleted);
513    }
514
515    #[tokio::test]
516    async fn test_memory_state_store_contains_key() {
517        let provider = MemoryStateStoreProvider::new();
518
519        // Key doesn't exist
520        assert!(!provider.contains_key("store1", "key1").await.unwrap());
521
522        // Set a value
523        provider
524            .set("store1", "key1", b"value1".to_vec())
525            .await
526            .unwrap();
527
528        // Key now exists
529        assert!(provider.contains_key("store1", "key1").await.unwrap());
530
531        // Different key doesn't exist
532        assert!(!provider.contains_key("store1", "key2").await.unwrap());
533    }
534
535    #[tokio::test]
536    async fn test_memory_state_store_get_many() {
537        let provider = MemoryStateStoreProvider::new();
538
539        // Set multiple values
540        provider
541            .set("store1", "key1", b"value1".to_vec())
542            .await
543            .unwrap();
544        provider
545            .set("store1", "key2", b"value2".to_vec())
546            .await
547            .unwrap();
548
549        // Get multiple (including non-existent key)
550        let result = provider
551            .get_many("store1", &["key1", "key2", "nonexistent"])
552            .await
553            .unwrap();
554        assert_eq!(result.len(), 2);
555        assert_eq!(result.get("key1"), Some(&b"value1".to_vec()));
556        assert_eq!(result.get("key2"), Some(&b"value2".to_vec()));
557    }
558
559    #[tokio::test]
560    async fn test_memory_state_store_set_many() {
561        let provider = MemoryStateStoreProvider::new();
562
563        // Set multiple values
564        provider
565            .set_many("store1", &[("key1", b"value1"), ("key2", b"value2")])
566            .await
567            .unwrap();
568
569        // Verify
570        let result = provider.get("store1", "key1").await.unwrap();
571        assert_eq!(result, Some(b"value1".to_vec()));
572        let result = provider.get("store1", "key2").await.unwrap();
573        assert_eq!(result, Some(b"value2".to_vec()));
574    }
575
576    #[tokio::test]
577    async fn test_memory_state_store_delete_many() {
578        let provider = MemoryStateStoreProvider::new();
579
580        // Set multiple values
581        provider
582            .set_many(
583                "store1",
584                &[("key1", b"value1"), ("key2", b"value2"), ("key3", b"value3")],
585            )
586            .await
587            .unwrap();
588
589        // Delete some
590        let count = provider
591            .delete_many("store1", &["key1", "key2", "nonexistent"])
592            .await
593            .unwrap();
594        assert_eq!(count, 2);
595
596        // Verify
597        let result = provider.get("store1", "key1").await.unwrap();
598        assert_eq!(result, None);
599        let result = provider.get("store1", "key3").await.unwrap();
600        assert_eq!(result, Some(b"value3".to_vec()));
601    }
602
603    #[tokio::test]
604    async fn test_memory_state_store_clear_store() {
605        let provider = MemoryStateStoreProvider::new();
606
607        // Set values in multiple stores
608        provider
609            .set("store1", "key1", b"value1".to_vec())
610            .await
611            .unwrap();
612        provider
613            .set("store1", "key2", b"value2".to_vec())
614            .await
615            .unwrap();
616        provider
617            .set("store2", "key1", b"value1".to_vec())
618            .await
619            .unwrap();
620
621        // Clear store1
622        let count = provider.clear_store("store1").await.unwrap();
623        assert_eq!(count, 2);
624
625        // Verify store1 is cleared
626        let result = provider.get("store1", "key1").await.unwrap();
627        assert_eq!(result, None);
628
629        // Verify store2 is intact
630        let result = provider.get("store2", "key1").await.unwrap();
631        assert_eq!(result, Some(b"value1".to_vec()));
632    }
633
634    #[tokio::test]
635    async fn test_memory_state_store_list_keys() {
636        let provider = MemoryStateStoreProvider::new();
637
638        // Set values
639        provider
640            .set("store1", "key1", b"value1".to_vec())
641            .await
642            .unwrap();
643        provider
644            .set("store1", "key2", b"value2".to_vec())
645            .await
646            .unwrap();
647
648        // List keys
649        let mut keys = provider.list_keys("store1").await.unwrap();
650        keys.sort();
651        assert_eq!(keys, vec!["key1", "key2"]);
652
653        // List keys from non-existent store
654        let keys = provider.list_keys("nonexistent").await.unwrap();
655        assert!(keys.is_empty());
656    }
657
658    #[tokio::test]
659    async fn test_memory_state_store_store_exists() {
660        let provider = MemoryStateStoreProvider::new();
661
662        // Non-existent store
663        assert!(!provider.store_exists("store1").await.unwrap());
664
665        // Set a value
666        provider
667            .set("store1", "key1", b"value1".to_vec())
668            .await
669            .unwrap();
670        assert!(provider.store_exists("store1").await.unwrap());
671
672        // Delete the value (store should be cleaned up)
673        provider.delete("store1", "key1").await.unwrap();
674        assert!(!provider.store_exists("store1").await.unwrap());
675    }
676
677    #[tokio::test]
678    async fn test_memory_state_store_key_count() {
679        let provider = MemoryStateStoreProvider::new();
680
681        // Empty store has 0 keys
682        assert_eq!(provider.key_count("store1").await.unwrap(), 0);
683
684        // Set a value
685        provider
686            .set("store1", "key1", b"value1".to_vec())
687            .await
688            .unwrap();
689        assert_eq!(provider.key_count("store1").await.unwrap(), 1);
690
691        // Set another value
692        provider
693            .set("store1", "key2", b"value2".to_vec())
694            .await
695            .unwrap();
696        assert_eq!(provider.key_count("store1").await.unwrap(), 2);
697    }
698
699    #[tokio::test]
700    async fn test_memory_state_store_partitioning() {
701        let provider = MemoryStateStoreProvider::new();
702
703        // Set same key in different stores
704        provider
705            .set("store1", "key", b"value1".to_vec())
706            .await
707            .unwrap();
708        provider
709            .set("store2", "key", b"value2".to_vec())
710            .await
711            .unwrap();
712
713        // Verify they don't interfere
714        let result1 = provider.get("store1", "key").await.unwrap();
715        let result2 = provider.get("store2", "key").await.unwrap();
716        assert_eq!(result1, Some(b"value1".to_vec()));
717        assert_eq!(result2, Some(b"value2".to_vec()));
718    }
719
720    #[tokio::test]
721    async fn test_memory_state_store_sync() {
722        let provider = MemoryStateStoreProvider::new();
723
724        // sync is a no-op for memory provider but should succeed
725        provider.sync().await.unwrap();
726
727        // Set some data and sync again
728        provider
729            .set("store1", "key1", b"value1".to_vec())
730            .await
731            .unwrap();
732        provider.sync().await.unwrap();
733    }
734}