transientdb/transient.rs
1use crate::{DataResult, DataStore, Equivalent};
2use serde_json::Value;
3use std::io::Result;
4use std::sync::Mutex;
5
6/// A thread-safe wrapper around a DataStore implementation that provides temporary data storage
7/// with batch processing capabilities.
8///
9/// TransientDB uses interior mutability through a Mutex to allow concurrent access to the
10/// underlying data store. It's designed for scenarios where data needs to be temporarily
11/// stored and processed in batches, such as queuing events or logs.
12pub struct TransientDB<T> {
13 #[cfg(not(target_arch = "wasm32"))]
14 store: Mutex<Box<dyn DataStore<Output = T> + Send>>,
15
16 #[cfg(target_arch = "wasm32")]
17 store: Mutex<Box<dyn DataStore<Output = T>>>,
18}
19
20// SAFETY: On WASM32, there are no threads. Send and Sync are vacuously satisfied
21// because there's nowhere to send to and nothing to synchronize with.
22//
23// This allows types like WebStore (which contains Rc<IdbDatabase>) to be used
24// with TransientDB on WASM targets without requiring complex trait gymnastics
25// that would propagate through the entire codebase.
26//
27// NOTE: If WASM gains real threading support (wasm32 + atomics + shared memory),
28// this will need to be revisited. However, that would likely be a different
29// compilation target requiring explicit opt-in.
30#[cfg(target_arch = "wasm32")]
31unsafe impl<T> Send for TransientDB<T> {}
32
33#[cfg(target_arch = "wasm32")]
34unsafe impl<T> Sync for TransientDB<T> {}
35
36impl<T> TransientDB<T> {
37 /// Creates a new TransientDB instance with the provided data store implementation.
38 ///
39 /// # Arguments
40 /// * `store` - Any implementation of DataStore that is Send + 'static (on native)
41 /// or just DataStore + 'static (on WASM)
42 ///
43 /// # Examples
44 /// ```
45 /// use transientdb::{TransientDB, MemoryConfig, MemoryStore};
46 ///
47 /// let config = MemoryConfig {
48 /// write_key: "my-store".into(),
49 /// max_items: 1000,
50 /// max_fetch_size: 1024 * 1024, // 1MB
51 /// };
52 /// let store = MemoryStore::new(config);
53 /// let db = TransientDB::new(store);
54 /// ```
55 #[cfg(not(target_arch = "wasm32"))]
56 pub fn new(store: impl DataStore<Output = T> + Send + 'static) -> Self {
57 Self {
58 store: Mutex::new(Box::new(store)),
59 }
60 }
61
62 /// Creates a new TransientDB instance with the provided data store implementation.
63 #[cfg(target_arch = "wasm32")]
64 pub fn new(store: impl DataStore<Output = T> + 'static) -> Self {
65 Self {
66 store: Mutex::new(Box::new(store)),
67 }
68 }
69
70 /// Checks if the store contains any data that can be fetched.
71 ///
72 /// # Examples
73 /// ```
74 /// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
75 /// use serde_json::json;
76 ///
77 /// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
78 /// write_key: "test".into(),
79 /// max_items: 100,
80 /// max_fetch_size: 1024,
81 /// }));
82 ///
83 /// assert!(!db.has_data());
84 /// db.append(json!({"test": "data"})).unwrap();
85 /// assert!(db.has_data());
86 /// ```
87 pub fn has_data(&self) -> bool {
88 self.store.lock().unwrap().has_data()
89 }
90
91 /// Removes all data from the store and resets it to initial state.
92 ///
93 /// # Examples
94 /// ```
95 /// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
96 /// use serde_json::json;
97 ///
98 /// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
99 /// write_key: "test".into(),
100 /// max_items: 100,
101 /// max_fetch_size: 1024,
102 /// }));
103 ///
104 /// db.append(json!({"test": "data"})).unwrap();
105 /// assert!(db.has_data());
106 ///
107 /// db.reset();
108 /// assert!(!db.has_data());
109 /// ```
110 pub fn reset(&self) {
111 self.store.lock().unwrap().reset();
112 }
113
114 /// Appends a new item to the store.
115 ///
116 /// # Arguments
117 /// * `data` - JSON value to store
118 ///
119 /// # Examples
120 /// ```
121 /// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
122 /// use serde_json::json;
123 ///
124 /// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
125 /// write_key: "test".into(),
126 /// max_items: 100,
127 /// max_fetch_size: 1024,
128 /// }));
129 ///
130 /// // Append a single value
131 /// db.append(json!({"event": "user_login", "user_id": 123})).unwrap();
132 ///
133 /// // Append structured data
134 /// db.append(json!({
135 /// "event": "purchase",
136 /// "details": {
137 /// "item_id": "ABC123",
138 /// "amount": 99.99,
139 /// "currency": "USD"
140 /// }
141 /// })).unwrap();
142 /// ```
143 pub fn append(&self, data: Value) -> Result<()> {
144 self.store.lock().unwrap().append(data)
145 }
146
147 /// Fetches a batch of data from the store, respecting optional count and size limits.
148 ///
149 /// # Arguments
150 /// * `count` - Optional maximum number of items to fetch
151 /// * `max_bytes` - Optional maximum total size in bytes to fetch
152 ///
153 /// # Examples
154 /// ```
155 /// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
156 /// use serde_json::json;
157 ///
158 /// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
159 /// write_key: "test".into(),
160 /// max_items: 100,
161 /// max_fetch_size: 1024,
162 /// }));
163 ///
164 /// // Add some data
165 /// for i in 0..5 {
166 /// db.append(json!({"index": i})).unwrap();
167 /// }
168 ///
169 /// // Fetch up to 3 items
170 /// if let Ok(Some(result)) = db.fetch(Some(3), None) {
171 /// // Process the data
172 /// if let Some(data) = result.data {
173 /// println!("Fetched data: {:?}", data);
174 /// }
175 ///
176 /// // Clean up the fetched items
177 /// if let Some(removable) = result.removable {
178 /// db.remove(&removable).unwrap();
179 /// }
180 /// }
181 ///
182 /// // Fetch items with size limit (1KB)
183 /// let result = db.fetch(None, Some(1024));
184 /// ```
185 pub fn fetch(
186 &self,
187 count: Option<usize>,
188 max_bytes: Option<usize>,
189 ) -> Result<Option<DataResult<T>>> {
190 self.store.lock().unwrap().fetch(count, max_bytes)
191 }
192
193 /// Removes previously fetched data from the store.
194 ///
195 /// # Arguments
196 /// * `data` - Slice of removable items from a previous fetch operation
197 ///
198 /// # Examples
199 /// ```
200 /// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
201 /// use serde_json::json;
202 ///
203 /// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
204 /// write_key: "test".into(),
205 /// max_items: 100,
206 /// max_fetch_size: 1024,
207 /// }));
208 ///
209 /// // Add and fetch data
210 /// db.append(json!({"test": "data"})).unwrap();
211 ///
212 /// if let Ok(Some(result)) = db.fetch(None, None) {
213 /// // Process the data...
214 ///
215 /// // Then remove the processed items
216 /// if let Some(removable) = result.removable {
217 /// db.remove(&removable).unwrap();
218 /// }
219 /// }
220 /// ```
221 pub fn remove(&self, data: &[Box<dyn Equivalent>]) -> Result<()> {
222 self.store.lock().unwrap().remove(data)
223 }
224}