Skip to main content

drasi_lib/state_store/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! State Store Provider Module
16//!
17//! This module provides the `StateStoreProvider` trait that allows plugins
18//! (Sources, BootstrapProviders, and Reactions) to store and retrieve
19//! runtime state that can persist across runs of DrasiLib.
20//!
21//! # Architecture
22//!
23//! The state store plugin system follows pure dependency inversion:
24//! - **Lib** defines the `StateStoreProvider` trait and provides a default
25//!   in-memory implementation (`MemoryStateStoreProvider`)
26//! - **External plugins** (in `components/state_stores/`) implement this trait
27//!   for persistent storage
28//! - **Applications** optionally inject plugins into DrasiLib; if none provided,
29//!   the in-memory default is used
30//!
31//! # Partitioning
32//!
33//! The state store supports partitioning via `StoreId`. Each plugin provides
34//! a unique `StoreId` (typically the plugin's ID) when interacting with the
35//! store. This ensures that different plugins don't interfere with each other's
36//! state.
37//!
38//! # Usage
39//!
40//! ## Without a plugin (uses in-memory default)
41//! ```ignore
42//! let drasi = DrasiLib::builder()
43//!     .build()?;
44//! ```
45//!
46//! ## With an external plugin
47//! ```ignore
48//! use drasi_state_store_json::JsonStateStoreProvider;
49//!
50//! let state_store = JsonStateStoreProvider::new("/data/state");
51//! let drasi = DrasiLib::builder()
52//!     .with_state_store_provider(Arc::new(state_store))
53//!     .build()?;
54//! ```
55
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::sync::RwLock;
61
62/// Errors that can occur when interacting with a state store.
63///
64/// # Note on Missing Keys
65///
66/// Missing keys are typically represented as `Ok(None)` from `get()`, not as
67/// `KeyNotFound` errors. The `KeyNotFound` variant is available for custom
68/// implementations that need to distinguish between "key doesn't exist" and
69/// other error conditions, but the standard trait methods prefer returning
70/// `Option` values.
71#[derive(Error, Debug)]
72pub enum StateStoreError {
73    /// The requested key was not found in the store.
74    ///
75    /// Note: Standard implementations return `Ok(None)` for missing keys instead
76    /// of this error. This variant is provided for custom implementations that
77    /// need explicit error-based handling of missing keys.
78    #[error("Key not found: {0}")]
79    KeyNotFound(String),
80
81    /// Failed to serialize or deserialize data.
82    ///
83    /// This error typically occurs when stored data cannot be parsed or when
84    /// data being stored cannot be serialized to the expected format.
85    #[error("Serialization error: {0}")]
86    SerializationError(String),
87
88    /// Failed to read or write to the underlying storage.
89    ///
90    /// This error indicates I/O failures, database errors, or other storage
91    /// backend issues.
92    #[error("Storage error: {0}")]
93    StorageError(String),
94
95    /// Generic error for other failures.
96    ///
97    /// Use this for errors that don't fit the other categories.
98    #[error("State store error: {0}")]
99    Other(String),
100}
101
102/// Result type for state store operations
103pub type StateStoreResult<T> = Result<T, StateStoreError>;
104
105/// Trait defining the interface for state store providers.
106///
107/// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
108/// to persist runtime state that survives restarts of DrasiLib.
109///
110/// # Thread Safety
111///
112/// Implementations must be thread-safe and support concurrent access from
113/// multiple plugins.
114///
115/// # Partitioning
116///
117/// The state store uses `store_id` to partition data between different plugins.
118/// Each plugin should use a unique `store_id` (typically the plugin's ID) to
119/// avoid conflicts with other plugins.
120///
121/// # Example Implementation
122///
123/// ```ignore
124/// use drasi_lib::StateStoreProvider;
125/// use async_trait::async_trait;
126///
127/// pub struct MyStateStore {
128///     // implementation fields
129/// }
130///
131/// #[async_trait]
132/// impl StateStoreProvider for MyStateStore {
133///     async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
134///         // implementation
135///     }
136///
137///     async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
138///         // implementation
139///     }
140///
141///     async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
142///         // implementation
143///     }
144///
145///     // ... implement other methods
146/// }
147/// ```
148#[async_trait]
149pub trait StateStoreProvider: Send + Sync {
150    /// Get a single value by key from a store partition.
151    ///
152    /// # Arguments
153    /// * `store_id` - The partition identifier (typically the plugin ID)
154    /// * `key` - The key to retrieve
155    ///
156    /// # Returns
157    /// * `Ok(Some(value))` - The value was found
158    /// * `Ok(None)` - The key doesn't exist in the store
159    /// * `Err(e)` - An error occurred
160    async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>>;
161
162    /// Set a single value by key in a store partition.
163    ///
164    /// The state must be persisted before this method returns.
165    ///
166    /// # Arguments
167    /// * `store_id` - The partition identifier (typically the plugin ID)
168    /// * `key` - The key to set
169    /// * `value` - The value to store
170    ///
171    /// # Returns
172    /// * `Ok(())` - The value was successfully stored
173    /// * `Err(e)` - An error occurred
174    async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()>;
175
176    /// Delete a single key from a store partition.
177    ///
178    /// # Arguments
179    /// * `store_id` - The partition identifier (typically the plugin ID)
180    /// * `key` - The key to delete
181    ///
182    /// # Returns
183    /// * `Ok(true)` - The key existed and was deleted
184    /// * `Ok(false)` - The key didn't exist
185    /// * `Err(e)` - An error occurred
186    async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
187
188    /// Check if a key exists in a store partition without retrieving its value.
189    ///
190    /// This is more efficient than `get()` when you only need to check existence,
191    /// especially for large values.
192    ///
193    /// # Arguments
194    /// * `store_id` - The partition identifier (typically the plugin ID)
195    /// * `key` - The key to check
196    ///
197    /// # Returns
198    /// * `Ok(true)` - The key exists
199    /// * `Ok(false)` - The key doesn't exist
200    /// * `Err(e)` - An error occurred
201    async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
202
203    /// Get multiple values by keys from a store partition.
204    ///
205    /// # Arguments
206    /// * `store_id` - The partition identifier (typically the plugin ID)
207    /// * `keys` - The keys to retrieve
208    ///
209    /// # Returns
210    /// A HashMap mapping each found key to its value. Keys that don't exist
211    /// are simply not included in the result.
212    async fn get_many(
213        &self,
214        store_id: &str,
215        keys: &[&str],
216    ) -> StateStoreResult<HashMap<String, Vec<u8>>>;
217
218    /// Set multiple key-value pairs in a store partition.
219    ///
220    /// All values must be persisted before this method returns.
221    ///
222    /// # Arguments
223    /// * `store_id` - The partition identifier (typically the plugin ID)
224    /// * `entries` - The key-value pairs to store
225    ///
226    /// # Returns
227    /// * `Ok(())` - All values were successfully stored
228    /// * `Err(e)` - An error occurred (some values may have been stored)
229    async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()>;
230
231    /// Delete multiple keys from a store partition.
232    ///
233    /// # Arguments
234    /// * `store_id` - The partition identifier (typically the plugin ID)
235    /// * `keys` - The keys to delete
236    ///
237    /// # Returns
238    /// * `Ok(count)` - The number of keys that were deleted
239    /// * `Err(e)` - An error occurred (some keys may have been deleted)
240    async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize>;
241
242    /// Delete all data for a store partition.
243    ///
244    /// # Arguments
245    /// * `store_id` - The partition identifier to clear
246    ///
247    /// # Returns
248    /// * `Ok(count)` - The number of keys that were deleted
249    /// * `Err(e)` - An error occurred
250    async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize>;
251
252    /// List all keys in a store partition.
253    ///
254    /// # Arguments
255    /// * `store_id` - The partition identifier
256    ///
257    /// # Returns
258    /// A vector of all keys in the store partition
259    async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>>;
260
261    /// Check if a store partition exists and has any data.
262    ///
263    /// # Arguments
264    /// * `store_id` - The partition identifier to check
265    ///
266    /// # Returns
267    /// * `Ok(true)` - The store partition exists and has at least one key
268    /// * `Ok(false)` - The store partition doesn't exist or is empty
269    /// * `Err(e)` - An error occurred while checking
270    async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool>;
271
272    /// Get the number of keys in a store partition.
273    ///
274    /// # Arguments
275    /// * `store_id` - The partition identifier
276    ///
277    /// # Returns
278    /// * `Ok(count)` - The number of keys in the store
279    /// * `Err(e)` - An error occurred
280    async fn key_count(&self, store_id: &str) -> StateStoreResult<usize>;
281
282    /// Force pending writes to persistent storage.
283    ///
284    /// For in-memory stores, this is a no-op. For persistent stores, this
285    /// ensures all data is durably written to disk.
286    ///
287    /// # Returns
288    /// * `Ok(())` - Sync completed successfully
289    /// * `Err(e)` - An error occurred during sync
290    async fn sync(&self) -> StateStoreResult<()> {
291        Ok(())
292    }
293}
294
295/// In-memory implementation of StateStoreProvider.
296///
297/// This is the default state store provider used when no external provider
298/// is configured. Data is stored in memory and does not persist across restarts.
299///
300/// # Thread Safety
301///
302/// This implementation is thread-safe and supports concurrent access from
303/// multiple plugins.
304///
305/// # Usage
306///
307/// ```ignore
308/// use drasi_lib::MemoryStateStoreProvider;
309///
310/// let provider = MemoryStateStoreProvider::new();
311///
312/// // Store some data
313/// provider.set("my-plugin", "key1", b"value1".to_vec()).await?;
314///
315/// // Retrieve data
316/// if let Some(value) = provider.get("my-plugin", "key1").await? {
317///     println!("Value: {:?}", value);
318/// }
319/// ```
320pub struct MemoryStateStoreProvider {
321    /// Data storage: store_id -> (key -> value)
322    stores: Arc<RwLock<HashMap<String, HashMap<String, Vec<u8>>>>>,
323}
324
325impl Default for MemoryStateStoreProvider {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331impl MemoryStateStoreProvider {
332    /// Create a new in-memory state store provider.
333    pub fn new() -> Self {
334        Self {
335            stores: Arc::new(RwLock::new(HashMap::new())),
336        }
337    }
338}
339
340#[async_trait]
341impl StateStoreProvider for MemoryStateStoreProvider {
342    async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
343        let stores = self.stores.read().await;
344        Ok(stores
345            .get(store_id)
346            .and_then(|store| store.get(key).cloned()))
347    }
348
349    async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
350        let mut stores = self.stores.write().await;
351        stores
352            .entry(store_id.to_string())
353            .or_default()
354            .insert(key.to_string(), value);
355        Ok(())
356    }
357
358    async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
359        let mut stores = self.stores.write().await;
360        if let Some(store) = stores.get_mut(store_id) {
361            let existed = store.remove(key).is_some();
362            // Clean up empty stores
363            if store.is_empty() {
364                stores.remove(store_id);
365            }
366            Ok(existed)
367        } else {
368            Ok(false)
369        }
370    }
371
372    async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
373        let stores = self.stores.read().await;
374        Ok(stores
375            .get(store_id)
376            .is_some_and(|store| store.contains_key(key)))
377    }
378
379    async fn get_many(
380        &self,
381        store_id: &str,
382        keys: &[&str],
383    ) -> StateStoreResult<HashMap<String, Vec<u8>>> {
384        let stores = self.stores.read().await;
385        let mut result = HashMap::new();
386
387        if let Some(store) = stores.get(store_id) {
388            for key in keys {
389                if let Some(value) = store.get(*key) {
390                    result.insert((*key).to_string(), value.clone());
391                }
392            }
393        }
394
395        Ok(result)
396    }
397
398    async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()> {
399        let mut stores = self.stores.write().await;
400        let store = stores.entry(store_id.to_string()).or_default();
401
402        for (key, value) in entries {
403            store.insert((*key).to_string(), value.to_vec());
404        }
405
406        Ok(())
407    }
408
409    async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize> {
410        let mut stores = self.stores.write().await;
411        let mut count = 0;
412
413        if let Some(store) = stores.get_mut(store_id) {
414            for key in keys {
415                if store.remove(*key).is_some() {
416                    count += 1;
417                }
418            }
419            // Clean up empty stores
420            if store.is_empty() {
421                stores.remove(store_id);
422            }
423        }
424
425        Ok(count)
426    }
427
428    async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize> {
429        let mut stores = self.stores.write().await;
430        if let Some(store) = stores.remove(store_id) {
431            Ok(store.len())
432        } else {
433            Ok(0)
434        }
435    }
436
437    async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>> {
438        let stores = self.stores.read().await;
439        Ok(stores
440            .get(store_id)
441            .map(|store| store.keys().cloned().collect())
442            .unwrap_or_default())
443    }
444
445    async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool> {
446        let stores = self.stores.read().await;
447        Ok(stores.get(store_id).is_some_and(|store| !store.is_empty()))
448    }
449
450    async fn key_count(&self, store_id: &str) -> StateStoreResult<usize> {
451        let stores = self.stores.read().await;
452        Ok(stores.get(store_id).map(|store| store.len()).unwrap_or(0))
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[tokio::test]
461    async fn test_memory_state_store_get_set() {
462        let provider = MemoryStateStoreProvider::new();
463
464        // Set a value
465        provider
466            .set("store1", "key1", b"value1".to_vec())
467            .await
468            .unwrap();
469
470        // Get the value
471        let result = provider.get("store1", "key1").await.unwrap();
472        assert_eq!(result, Some(b"value1".to_vec()));
473
474        // Get non-existent key
475        let result = provider.get("store1", "nonexistent").await.unwrap();
476        assert_eq!(result, None);
477
478        // Get from non-existent store
479        let result = provider.get("nonexistent", "key1").await.unwrap();
480        assert_eq!(result, None);
481    }
482
483    #[tokio::test]
484    async fn test_memory_state_store_delete() {
485        let provider = MemoryStateStoreProvider::new();
486
487        // Set and delete
488        provider
489            .set("store1", "key1", b"value1".to_vec())
490            .await
491            .unwrap();
492        let deleted = provider.delete("store1", "key1").await.unwrap();
493        assert!(deleted);
494
495        // Verify deletion
496        let result = provider.get("store1", "key1").await.unwrap();
497        assert_eq!(result, None);
498
499        // Delete non-existent key
500        let deleted = provider.delete("store1", "nonexistent").await.unwrap();
501        assert!(!deleted);
502    }
503
504    #[tokio::test]
505    async fn test_memory_state_store_contains_key() {
506        let provider = MemoryStateStoreProvider::new();
507
508        // Key doesn't exist
509        assert!(!provider.contains_key("store1", "key1").await.unwrap());
510
511        // Set a value
512        provider
513            .set("store1", "key1", b"value1".to_vec())
514            .await
515            .unwrap();
516
517        // Key now exists
518        assert!(provider.contains_key("store1", "key1").await.unwrap());
519
520        // Different key doesn't exist
521        assert!(!provider.contains_key("store1", "key2").await.unwrap());
522    }
523
524    #[tokio::test]
525    async fn test_memory_state_store_get_many() {
526        let provider = MemoryStateStoreProvider::new();
527
528        // Set multiple values
529        provider
530            .set("store1", "key1", b"value1".to_vec())
531            .await
532            .unwrap();
533        provider
534            .set("store1", "key2", b"value2".to_vec())
535            .await
536            .unwrap();
537
538        // Get multiple (including non-existent key)
539        let result = provider
540            .get_many("store1", &["key1", "key2", "nonexistent"])
541            .await
542            .unwrap();
543        assert_eq!(result.len(), 2);
544        assert_eq!(result.get("key1"), Some(&b"value1".to_vec()));
545        assert_eq!(result.get("key2"), Some(&b"value2".to_vec()));
546    }
547
548    #[tokio::test]
549    async fn test_memory_state_store_set_many() {
550        let provider = MemoryStateStoreProvider::new();
551
552        // Set multiple values
553        provider
554            .set_many("store1", &[("key1", b"value1"), ("key2", b"value2")])
555            .await
556            .unwrap();
557
558        // Verify
559        let result = provider.get("store1", "key1").await.unwrap();
560        assert_eq!(result, Some(b"value1".to_vec()));
561        let result = provider.get("store1", "key2").await.unwrap();
562        assert_eq!(result, Some(b"value2".to_vec()));
563    }
564
565    #[tokio::test]
566    async fn test_memory_state_store_delete_many() {
567        let provider = MemoryStateStoreProvider::new();
568
569        // Set multiple values
570        provider
571            .set_many(
572                "store1",
573                &[("key1", b"value1"), ("key2", b"value2"), ("key3", b"value3")],
574            )
575            .await
576            .unwrap();
577
578        // Delete some
579        let count = provider
580            .delete_many("store1", &["key1", "key2", "nonexistent"])
581            .await
582            .unwrap();
583        assert_eq!(count, 2);
584
585        // Verify
586        let result = provider.get("store1", "key1").await.unwrap();
587        assert_eq!(result, None);
588        let result = provider.get("store1", "key3").await.unwrap();
589        assert_eq!(result, Some(b"value3".to_vec()));
590    }
591
592    #[tokio::test]
593    async fn test_memory_state_store_clear_store() {
594        let provider = MemoryStateStoreProvider::new();
595
596        // Set values in multiple stores
597        provider
598            .set("store1", "key1", b"value1".to_vec())
599            .await
600            .unwrap();
601        provider
602            .set("store1", "key2", b"value2".to_vec())
603            .await
604            .unwrap();
605        provider
606            .set("store2", "key1", b"value1".to_vec())
607            .await
608            .unwrap();
609
610        // Clear store1
611        let count = provider.clear_store("store1").await.unwrap();
612        assert_eq!(count, 2);
613
614        // Verify store1 is cleared
615        let result = provider.get("store1", "key1").await.unwrap();
616        assert_eq!(result, None);
617
618        // Verify store2 is intact
619        let result = provider.get("store2", "key1").await.unwrap();
620        assert_eq!(result, Some(b"value1".to_vec()));
621    }
622
623    #[tokio::test]
624    async fn test_memory_state_store_list_keys() {
625        let provider = MemoryStateStoreProvider::new();
626
627        // Set values
628        provider
629            .set("store1", "key1", b"value1".to_vec())
630            .await
631            .unwrap();
632        provider
633            .set("store1", "key2", b"value2".to_vec())
634            .await
635            .unwrap();
636
637        // List keys
638        let mut keys = provider.list_keys("store1").await.unwrap();
639        keys.sort();
640        assert_eq!(keys, vec!["key1", "key2"]);
641
642        // List keys from non-existent store
643        let keys = provider.list_keys("nonexistent").await.unwrap();
644        assert!(keys.is_empty());
645    }
646
647    #[tokio::test]
648    async fn test_memory_state_store_store_exists() {
649        let provider = MemoryStateStoreProvider::new();
650
651        // Non-existent store
652        assert!(!provider.store_exists("store1").await.unwrap());
653
654        // Set a value
655        provider
656            .set("store1", "key1", b"value1".to_vec())
657            .await
658            .unwrap();
659        assert!(provider.store_exists("store1").await.unwrap());
660
661        // Delete the value (store should be cleaned up)
662        provider.delete("store1", "key1").await.unwrap();
663        assert!(!provider.store_exists("store1").await.unwrap());
664    }
665
666    #[tokio::test]
667    async fn test_memory_state_store_key_count() {
668        let provider = MemoryStateStoreProvider::new();
669
670        // Empty store has 0 keys
671        assert_eq!(provider.key_count("store1").await.unwrap(), 0);
672
673        // Set a value
674        provider
675            .set("store1", "key1", b"value1".to_vec())
676            .await
677            .unwrap();
678        assert_eq!(provider.key_count("store1").await.unwrap(), 1);
679
680        // Set another value
681        provider
682            .set("store1", "key2", b"value2".to_vec())
683            .await
684            .unwrap();
685        assert_eq!(provider.key_count("store1").await.unwrap(), 2);
686    }
687
688    #[tokio::test]
689    async fn test_memory_state_store_partitioning() {
690        let provider = MemoryStateStoreProvider::new();
691
692        // Set same key in different stores
693        provider
694            .set("store1", "key", b"value1".to_vec())
695            .await
696            .unwrap();
697        provider
698            .set("store2", "key", b"value2".to_vec())
699            .await
700            .unwrap();
701
702        // Verify they don't interfere
703        let result1 = provider.get("store1", "key").await.unwrap();
704        let result2 = provider.get("store2", "key").await.unwrap();
705        assert_eq!(result1, Some(b"value1".to_vec()));
706        assert_eq!(result2, Some(b"value2".to_vec()));
707    }
708
709    #[tokio::test]
710    async fn test_memory_state_store_sync() {
711        let provider = MemoryStateStoreProvider::new();
712
713        // sync is a no-op for memory provider but should succeed
714        provider.sync().await.unwrap();
715
716        // Set some data and sync again
717        provider
718            .set("store1", "key1", b"value1".to_vec())
719            .await
720            .unwrap();
721        provider.sync().await.unwrap();
722    }
723}