drasi_lib/state_store/mod.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! State Store Provider Module
16//!
17//! This module provides the `StateStoreProvider` trait that allows plugins
18//! (Sources, BootstrapProviders, and Reactions) to store and retrieve
19//! runtime state that can persist across runs of DrasiLib.
20//!
21//! # Architecture
22//!
23//! The state store plugin system follows pure dependency inversion:
24//! - **Lib** defines the `StateStoreProvider` trait and provides a default
25//! in-memory implementation (`MemoryStateStoreProvider`)
26//! - **External plugins** (in `components/state_stores/`) implement this trait
27//! for persistent storage
28//! - **Applications** optionally inject plugins into DrasiLib; if none provided,
29//! the in-memory default is used
30//!
31//! # Partitioning
32//!
33//! The state store supports partitioning via `StoreId`. Each plugin provides
34//! a unique `StoreId` (typically the plugin's ID) when interacting with the
35//! store. This ensures that different plugins don't interfere with each other's
36//! state.
37//!
38//! # Usage
39//!
40//! ## Without a plugin (uses in-memory default)
41//! ```ignore
42//! let drasi = DrasiLib::builder()
43//! .build()?;
44//! ```
45//!
46//! ## With an external plugin
47//! ```ignore
48//! use drasi_state_store_json::JsonStateStoreProvider;
49//!
50//! let state_store = JsonStateStoreProvider::new("/data/state");
51//! let drasi = DrasiLib::builder()
52//! .with_state_store_provider(Arc::new(state_store))
53//! .build()?;
54//! ```
55
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::sync::RwLock;
61
62/// Errors that can occur when interacting with a state store.
63///
64/// # Note on Missing Keys
65///
66/// Missing keys are typically represented as `Ok(None)` from `get()`, not as
67/// `KeyNotFound` errors. The `KeyNotFound` variant is available for custom
68/// implementations that need to distinguish between "key doesn't exist" and
69/// other error conditions, but the standard trait methods prefer returning
70/// `Option` values.
71#[derive(Error, Debug)]
72pub enum StateStoreError {
73 /// The requested key was not found in the store.
74 ///
75 /// Note: Standard implementations return `Ok(None)` for missing keys instead
76 /// of this error. This variant is provided for custom implementations that
77 /// need explicit error-based handling of missing keys.
78 #[error("Key not found: {0}")]
79 KeyNotFound(String),
80
81 /// Failed to serialize or deserialize data.
82 ///
83 /// This error typically occurs when stored data cannot be parsed or when
84 /// data being stored cannot be serialized to the expected format.
85 #[error("Serialization error: {0}")]
86 SerializationError(String),
87
88 /// Failed to read or write to the underlying storage.
89 ///
90 /// This error indicates I/O failures, database errors, or other storage
91 /// backend issues.
92 #[error("Storage error: {0}")]
93 StorageError(String),
94
95 /// Generic error for other failures.
96 ///
97 /// Use this for errors that don't fit the other categories.
98 #[error("State store error: {0}")]
99 Other(String),
100}
101
102/// Result type for state store operations
103pub type StateStoreResult<T> = Result<T, StateStoreError>;
104
105/// Trait defining the interface for state store providers.
106///
107/// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
108/// to persist runtime state that survives restarts of DrasiLib.
109///
110/// # Thread Safety
111///
112/// Implementations must be thread-safe and support concurrent access from
113/// multiple plugins.
114///
115/// # Partitioning
116///
117/// The state store uses `store_id` to partition data between different plugins.
118/// Each plugin should use a unique `store_id` (typically the plugin's ID) to
119/// avoid conflicts with other plugins.
120///
121/// # Example Implementation
122///
123/// ```ignore
124/// use drasi_lib::StateStoreProvider;
125/// use async_trait::async_trait;
126///
127/// pub struct MyStateStore {
128/// // implementation fields
129/// }
130///
131/// #[async_trait]
132/// impl StateStoreProvider for MyStateStore {
133/// async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
134/// // implementation
135/// }
136///
137/// async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
138/// // implementation
139/// }
140///
141/// async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
142/// // implementation
143/// }
144///
145/// // ... implement other methods
146/// }
147/// ```
148#[async_trait]
149pub trait StateStoreProvider: Send + Sync {
150 /// Get a single value by key from a store partition.
151 ///
152 /// # Arguments
153 /// * `store_id` - The partition identifier (typically the plugin ID)
154 /// * `key` - The key to retrieve
155 ///
156 /// # Returns
157 /// * `Ok(Some(value))` - The value was found
158 /// * `Ok(None)` - The key doesn't exist in the store
159 /// * `Err(e)` - An error occurred
160 async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>>;
161
162 /// Set a single value by key in a store partition.
163 ///
164 /// The state must be persisted before this method returns.
165 ///
166 /// # Arguments
167 /// * `store_id` - The partition identifier (typically the plugin ID)
168 /// * `key` - The key to set
169 /// * `value` - The value to store
170 ///
171 /// # Returns
172 /// * `Ok(())` - The value was successfully stored
173 /// * `Err(e)` - An error occurred
174 async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()>;
175
176 /// Delete a single key from a store partition.
177 ///
178 /// # Arguments
179 /// * `store_id` - The partition identifier (typically the plugin ID)
180 /// * `key` - The key to delete
181 ///
182 /// # Returns
183 /// * `Ok(true)` - The key existed and was deleted
184 /// * `Ok(false)` - The key didn't exist
185 /// * `Err(e)` - An error occurred
186 async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
187
188 /// Check if a key exists in a store partition without retrieving its value.
189 ///
190 /// This is more efficient than `get()` when you only need to check existence,
191 /// especially for large values.
192 ///
193 /// # Arguments
194 /// * `store_id` - The partition identifier (typically the plugin ID)
195 /// * `key` - The key to check
196 ///
197 /// # Returns
198 /// * `Ok(true)` - The key exists
199 /// * `Ok(false)` - The key doesn't exist
200 /// * `Err(e)` - An error occurred
201 async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool>;
202
203 /// Get multiple values by keys from a store partition.
204 ///
205 /// # Arguments
206 /// * `store_id` - The partition identifier (typically the plugin ID)
207 /// * `keys` - The keys to retrieve
208 ///
209 /// # Returns
210 /// A HashMap mapping each found key to its value. Keys that don't exist
211 /// are simply not included in the result.
212 async fn get_many(
213 &self,
214 store_id: &str,
215 keys: &[&str],
216 ) -> StateStoreResult<HashMap<String, Vec<u8>>>;
217
218 /// Set multiple key-value pairs in a store partition.
219 ///
220 /// All values must be persisted before this method returns.
221 ///
222 /// # Arguments
223 /// * `store_id` - The partition identifier (typically the plugin ID)
224 /// * `entries` - The key-value pairs to store
225 ///
226 /// # Returns
227 /// * `Ok(())` - All values were successfully stored
228 /// * `Err(e)` - An error occurred (some values may have been stored)
229 async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()>;
230
231 /// Delete multiple keys from a store partition.
232 ///
233 /// # Arguments
234 /// * `store_id` - The partition identifier (typically the plugin ID)
235 /// * `keys` - The keys to delete
236 ///
237 /// # Returns
238 /// * `Ok(count)` - The number of keys that were deleted
239 /// * `Err(e)` - An error occurred (some keys may have been deleted)
240 async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize>;
241
242 /// Delete all data for a store partition.
243 ///
244 /// # Arguments
245 /// * `store_id` - The partition identifier to clear
246 ///
247 /// # Returns
248 /// * `Ok(count)` - The number of keys that were deleted
249 /// * `Err(e)` - An error occurred
250 async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize>;
251
252 /// List all keys in a store partition.
253 ///
254 /// # Arguments
255 /// * `store_id` - The partition identifier
256 ///
257 /// # Returns
258 /// A vector of all keys in the store partition
259 async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>>;
260
261 /// Check if a store partition exists and has any data.
262 ///
263 /// # Arguments
264 /// * `store_id` - The partition identifier to check
265 ///
266 /// # Returns
267 /// * `Ok(true)` - The store partition exists and has at least one key
268 /// * `Ok(false)` - The store partition doesn't exist or is empty
269 /// * `Err(e)` - An error occurred while checking
270 async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool>;
271
272 /// Get the number of keys in a store partition.
273 ///
274 /// # Arguments
275 /// * `store_id` - The partition identifier
276 ///
277 /// # Returns
278 /// * `Ok(count)` - The number of keys in the store
279 /// * `Err(e)` - An error occurred
280 async fn key_count(&self, store_id: &str) -> StateStoreResult<usize>;
281
282 /// Force pending writes to persistent storage.
283 ///
284 /// For in-memory stores, this is a no-op. For persistent stores, this
285 /// ensures all data is durably written to disk.
286 ///
287 /// # Returns
288 /// * `Ok(())` - Sync completed successfully
289 /// * `Err(e)` - An error occurred during sync
290 async fn sync(&self) -> StateStoreResult<()> {
291 Ok(())
292 }
293}
294
295/// In-memory implementation of StateStoreProvider.
296///
297/// This is the default state store provider used when no external provider
298/// is configured. Data is stored in memory and does not persist across restarts.
299///
300/// # Thread Safety
301///
302/// This implementation is thread-safe and supports concurrent access from
303/// multiple plugins.
304///
305/// # Usage
306///
307/// ```ignore
308/// use drasi_lib::MemoryStateStoreProvider;
309///
310/// let provider = MemoryStateStoreProvider::new();
311///
312/// // Store some data
313/// provider.set("my-plugin", "key1", b"value1".to_vec()).await?;
314///
315/// // Retrieve data
316/// if let Some(value) = provider.get("my-plugin", "key1").await? {
317/// println!("Value: {:?}", value);
318/// }
319/// ```
320pub struct MemoryStateStoreProvider {
321 /// Data storage: store_id -> (key -> value)
322 stores: Arc<RwLock<HashMap<String, HashMap<String, Vec<u8>>>>>,
323}
324
325impl Default for MemoryStateStoreProvider {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331impl MemoryStateStoreProvider {
332 /// Create a new in-memory state store provider.
333 pub fn new() -> Self {
334 Self {
335 stores: Arc::new(RwLock::new(HashMap::new())),
336 }
337 }
338}
339
340#[async_trait]
341impl StateStoreProvider for MemoryStateStoreProvider {
342 async fn get(&self, store_id: &str, key: &str) -> StateStoreResult<Option<Vec<u8>>> {
343 let stores = self.stores.read().await;
344 Ok(stores
345 .get(store_id)
346 .and_then(|store| store.get(key).cloned()))
347 }
348
349 async fn set(&self, store_id: &str, key: &str, value: Vec<u8>) -> StateStoreResult<()> {
350 let mut stores = self.stores.write().await;
351 stores
352 .entry(store_id.to_string())
353 .or_default()
354 .insert(key.to_string(), value);
355 Ok(())
356 }
357
358 async fn delete(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
359 let mut stores = self.stores.write().await;
360 if let Some(store) = stores.get_mut(store_id) {
361 let existed = store.remove(key).is_some();
362 // Clean up empty stores
363 if store.is_empty() {
364 stores.remove(store_id);
365 }
366 Ok(existed)
367 } else {
368 Ok(false)
369 }
370 }
371
372 async fn contains_key(&self, store_id: &str, key: &str) -> StateStoreResult<bool> {
373 let stores = self.stores.read().await;
374 Ok(stores
375 .get(store_id)
376 .is_some_and(|store| store.contains_key(key)))
377 }
378
379 async fn get_many(
380 &self,
381 store_id: &str,
382 keys: &[&str],
383 ) -> StateStoreResult<HashMap<String, Vec<u8>>> {
384 let stores = self.stores.read().await;
385 let mut result = HashMap::new();
386
387 if let Some(store) = stores.get(store_id) {
388 for key in keys {
389 if let Some(value) = store.get(*key) {
390 result.insert((*key).to_string(), value.clone());
391 }
392 }
393 }
394
395 Ok(result)
396 }
397
398 async fn set_many(&self, store_id: &str, entries: &[(&str, &[u8])]) -> StateStoreResult<()> {
399 let mut stores = self.stores.write().await;
400 let store = stores.entry(store_id.to_string()).or_default();
401
402 for (key, value) in entries {
403 store.insert((*key).to_string(), value.to_vec());
404 }
405
406 Ok(())
407 }
408
409 async fn delete_many(&self, store_id: &str, keys: &[&str]) -> StateStoreResult<usize> {
410 let mut stores = self.stores.write().await;
411 let mut count = 0;
412
413 if let Some(store) = stores.get_mut(store_id) {
414 for key in keys {
415 if store.remove(*key).is_some() {
416 count += 1;
417 }
418 }
419 // Clean up empty stores
420 if store.is_empty() {
421 stores.remove(store_id);
422 }
423 }
424
425 Ok(count)
426 }
427
428 async fn clear_store(&self, store_id: &str) -> StateStoreResult<usize> {
429 let mut stores = self.stores.write().await;
430 if let Some(store) = stores.remove(store_id) {
431 Ok(store.len())
432 } else {
433 Ok(0)
434 }
435 }
436
437 async fn list_keys(&self, store_id: &str) -> StateStoreResult<Vec<String>> {
438 let stores = self.stores.read().await;
439 Ok(stores
440 .get(store_id)
441 .map(|store| store.keys().cloned().collect())
442 .unwrap_or_default())
443 }
444
445 async fn store_exists(&self, store_id: &str) -> StateStoreResult<bool> {
446 let stores = self.stores.read().await;
447 Ok(stores.get(store_id).is_some_and(|store| !store.is_empty()))
448 }
449
450 async fn key_count(&self, store_id: &str) -> StateStoreResult<usize> {
451 let stores = self.stores.read().await;
452 Ok(stores.get(store_id).map(|store| store.len()).unwrap_or(0))
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[tokio::test]
461 async fn test_memory_state_store_get_set() {
462 let provider = MemoryStateStoreProvider::new();
463
464 // Set a value
465 provider
466 .set("store1", "key1", b"value1".to_vec())
467 .await
468 .unwrap();
469
470 // Get the value
471 let result = provider.get("store1", "key1").await.unwrap();
472 assert_eq!(result, Some(b"value1".to_vec()));
473
474 // Get non-existent key
475 let result = provider.get("store1", "nonexistent").await.unwrap();
476 assert_eq!(result, None);
477
478 // Get from non-existent store
479 let result = provider.get("nonexistent", "key1").await.unwrap();
480 assert_eq!(result, None);
481 }
482
483 #[tokio::test]
484 async fn test_memory_state_store_delete() {
485 let provider = MemoryStateStoreProvider::new();
486
487 // Set and delete
488 provider
489 .set("store1", "key1", b"value1".to_vec())
490 .await
491 .unwrap();
492 let deleted = provider.delete("store1", "key1").await.unwrap();
493 assert!(deleted);
494
495 // Verify deletion
496 let result = provider.get("store1", "key1").await.unwrap();
497 assert_eq!(result, None);
498
499 // Delete non-existent key
500 let deleted = provider.delete("store1", "nonexistent").await.unwrap();
501 assert!(!deleted);
502 }
503
504 #[tokio::test]
505 async fn test_memory_state_store_contains_key() {
506 let provider = MemoryStateStoreProvider::new();
507
508 // Key doesn't exist
509 assert!(!provider.contains_key("store1", "key1").await.unwrap());
510
511 // Set a value
512 provider
513 .set("store1", "key1", b"value1".to_vec())
514 .await
515 .unwrap();
516
517 // Key now exists
518 assert!(provider.contains_key("store1", "key1").await.unwrap());
519
520 // Different key doesn't exist
521 assert!(!provider.contains_key("store1", "key2").await.unwrap());
522 }
523
524 #[tokio::test]
525 async fn test_memory_state_store_get_many() {
526 let provider = MemoryStateStoreProvider::new();
527
528 // Set multiple values
529 provider
530 .set("store1", "key1", b"value1".to_vec())
531 .await
532 .unwrap();
533 provider
534 .set("store1", "key2", b"value2".to_vec())
535 .await
536 .unwrap();
537
538 // Get multiple (including non-existent key)
539 let result = provider
540 .get_many("store1", &["key1", "key2", "nonexistent"])
541 .await
542 .unwrap();
543 assert_eq!(result.len(), 2);
544 assert_eq!(result.get("key1"), Some(&b"value1".to_vec()));
545 assert_eq!(result.get("key2"), Some(&b"value2".to_vec()));
546 }
547
548 #[tokio::test]
549 async fn test_memory_state_store_set_many() {
550 let provider = MemoryStateStoreProvider::new();
551
552 // Set multiple values
553 provider
554 .set_many("store1", &[("key1", b"value1"), ("key2", b"value2")])
555 .await
556 .unwrap();
557
558 // Verify
559 let result = provider.get("store1", "key1").await.unwrap();
560 assert_eq!(result, Some(b"value1".to_vec()));
561 let result = provider.get("store1", "key2").await.unwrap();
562 assert_eq!(result, Some(b"value2".to_vec()));
563 }
564
565 #[tokio::test]
566 async fn test_memory_state_store_delete_many() {
567 let provider = MemoryStateStoreProvider::new();
568
569 // Set multiple values
570 provider
571 .set_many(
572 "store1",
573 &[("key1", b"value1"), ("key2", b"value2"), ("key3", b"value3")],
574 )
575 .await
576 .unwrap();
577
578 // Delete some
579 let count = provider
580 .delete_many("store1", &["key1", "key2", "nonexistent"])
581 .await
582 .unwrap();
583 assert_eq!(count, 2);
584
585 // Verify
586 let result = provider.get("store1", "key1").await.unwrap();
587 assert_eq!(result, None);
588 let result = provider.get("store1", "key3").await.unwrap();
589 assert_eq!(result, Some(b"value3".to_vec()));
590 }
591
592 #[tokio::test]
593 async fn test_memory_state_store_clear_store() {
594 let provider = MemoryStateStoreProvider::new();
595
596 // Set values in multiple stores
597 provider
598 .set("store1", "key1", b"value1".to_vec())
599 .await
600 .unwrap();
601 provider
602 .set("store1", "key2", b"value2".to_vec())
603 .await
604 .unwrap();
605 provider
606 .set("store2", "key1", b"value1".to_vec())
607 .await
608 .unwrap();
609
610 // Clear store1
611 let count = provider.clear_store("store1").await.unwrap();
612 assert_eq!(count, 2);
613
614 // Verify store1 is cleared
615 let result = provider.get("store1", "key1").await.unwrap();
616 assert_eq!(result, None);
617
618 // Verify store2 is intact
619 let result = provider.get("store2", "key1").await.unwrap();
620 assert_eq!(result, Some(b"value1".to_vec()));
621 }
622
623 #[tokio::test]
624 async fn test_memory_state_store_list_keys() {
625 let provider = MemoryStateStoreProvider::new();
626
627 // Set values
628 provider
629 .set("store1", "key1", b"value1".to_vec())
630 .await
631 .unwrap();
632 provider
633 .set("store1", "key2", b"value2".to_vec())
634 .await
635 .unwrap();
636
637 // List keys
638 let mut keys = provider.list_keys("store1").await.unwrap();
639 keys.sort();
640 assert_eq!(keys, vec!["key1", "key2"]);
641
642 // List keys from non-existent store
643 let keys = provider.list_keys("nonexistent").await.unwrap();
644 assert!(keys.is_empty());
645 }
646
647 #[tokio::test]
648 async fn test_memory_state_store_store_exists() {
649 let provider = MemoryStateStoreProvider::new();
650
651 // Non-existent store
652 assert!(!provider.store_exists("store1").await.unwrap());
653
654 // Set a value
655 provider
656 .set("store1", "key1", b"value1".to_vec())
657 .await
658 .unwrap();
659 assert!(provider.store_exists("store1").await.unwrap());
660
661 // Delete the value (store should be cleaned up)
662 provider.delete("store1", "key1").await.unwrap();
663 assert!(!provider.store_exists("store1").await.unwrap());
664 }
665
666 #[tokio::test]
667 async fn test_memory_state_store_key_count() {
668 let provider = MemoryStateStoreProvider::new();
669
670 // Empty store has 0 keys
671 assert_eq!(provider.key_count("store1").await.unwrap(), 0);
672
673 // Set a value
674 provider
675 .set("store1", "key1", b"value1".to_vec())
676 .await
677 .unwrap();
678 assert_eq!(provider.key_count("store1").await.unwrap(), 1);
679
680 // Set another value
681 provider
682 .set("store1", "key2", b"value2".to_vec())
683 .await
684 .unwrap();
685 assert_eq!(provider.key_count("store1").await.unwrap(), 2);
686 }
687
688 #[tokio::test]
689 async fn test_memory_state_store_partitioning() {
690 let provider = MemoryStateStoreProvider::new();
691
692 // Set same key in different stores
693 provider
694 .set("store1", "key", b"value1".to_vec())
695 .await
696 .unwrap();
697 provider
698 .set("store2", "key", b"value2".to_vec())
699 .await
700 .unwrap();
701
702 // Verify they don't interfere
703 let result1 = provider.get("store1", "key").await.unwrap();
704 let result2 = provider.get("store2", "key").await.unwrap();
705 assert_eq!(result1, Some(b"value1".to_vec()));
706 assert_eq!(result2, Some(b"value2".to_vec()));
707 }
708
709 #[tokio::test]
710 async fn test_memory_state_store_sync() {
711 let provider = MemoryStateStoreProvider::new();
712
713 // sync is a no-op for memory provider but should succeed
714 provider.sync().await.unwrap();
715
716 // Set some data and sync again
717 provider
718 .set("store1", "key1", b"value1".to_vec())
719 .await
720 .unwrap();
721 provider.sync().await.unwrap();
722 }
723}