transientdb/
transient.rs

1use crate::{DataResult, DataStore, Equivalent};
2use serde_json::Value;
3use std::io::Result;
4use std::sync::Mutex;
5
6/// A thread-safe wrapper around a DataStore implementation that provides temporary data storage
7/// with batch processing capabilities.
8///
9/// TransientDB uses interior mutability through a Mutex to allow concurrent access to the
10/// underlying data store. It's designed for scenarios where data needs to be temporarily
11/// stored and processed in batches, such as queuing events or logs.
12pub struct TransientDB<T> {
13	#[cfg(not(target_arch = "wasm32"))]
14	store: Mutex<Box<dyn DataStore<Output = T> + Send>>,
15
16	#[cfg(target_arch = "wasm32")]
17	store: Mutex<Box<dyn DataStore<Output = T>>>,
18}
19
20// SAFETY: On WASM32, there are no threads. Send and Sync are vacuously satisfied
21// because there's nowhere to send to and nothing to synchronize with.
22//
23// This allows types like WebStore (which contains Rc<IdbDatabase>) to be used
24// with TransientDB on WASM targets without requiring complex trait gymnastics
25// that would propagate through the entire codebase.
26//
27// NOTE: If WASM gains real threading support (wasm32 + atomics + shared memory),
28// this will need to be revisited. However, that would likely be a different
29// compilation target requiring explicit opt-in.
30#[cfg(target_arch = "wasm32")]
31unsafe impl<T> Send for TransientDB<T> {}
32
33#[cfg(target_arch = "wasm32")]
34unsafe impl<T> Sync for TransientDB<T> {}
35
36impl<T> TransientDB<T> {
37	/// Creates a new TransientDB instance with the provided data store implementation.
38	///
39	/// # Arguments
40	/// * `store` - Any implementation of DataStore that is Send + 'static (on native)
41	///   or just DataStore + 'static (on WASM)
42	///
43	/// # Examples
44	/// ```
45	/// use transientdb::{TransientDB, MemoryConfig, MemoryStore};
46	///
47	/// let config = MemoryConfig {
48	///     write_key: "my-store".into(),
49	///     max_items: 1000,
50	///     max_fetch_size: 1024 * 1024, // 1MB
51	/// };
52	/// let store = MemoryStore::new(config);
53	/// let db = TransientDB::new(store);
54	/// ```
55	#[cfg(not(target_arch = "wasm32"))]
56	pub fn new(store: impl DataStore<Output = T> + Send + 'static) -> Self {
57		Self {
58			store: Mutex::new(Box::new(store)),
59		}
60	}
61
62	/// Creates a new TransientDB instance with the provided data store implementation.
63	#[cfg(target_arch = "wasm32")]
64	pub fn new(store: impl DataStore<Output = T> + 'static) -> Self {
65		Self {
66			store: Mutex::new(Box::new(store)),
67		}
68	}
69
70	/// Checks if the store contains any data that can be fetched.
71	///
72	/// # Examples
73	/// ```
74	/// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
75	/// use serde_json::json;
76	///
77	/// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
78	///     write_key: "test".into(),
79	///     max_items: 100,
80	///     max_fetch_size: 1024,
81	/// }));
82	///
83	/// assert!(!db.has_data());
84	/// db.append(json!({"test": "data"})).unwrap();
85	/// assert!(db.has_data());
86	/// ```
87	pub fn has_data(&self) -> bool {
88		self.store.lock().unwrap().has_data()
89	}
90
91	/// Removes all data from the store and resets it to initial state.
92	///
93	/// # Examples
94	/// ```
95	/// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
96	/// use serde_json::json;
97	///
98	/// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
99	///     write_key: "test".into(),
100	///     max_items: 100,
101	///     max_fetch_size: 1024,
102	/// }));
103	///
104	/// db.append(json!({"test": "data"})).unwrap();
105	/// assert!(db.has_data());
106	///
107	/// db.reset();
108	/// assert!(!db.has_data());
109	/// ```
110	pub fn reset(&self) {
111		self.store.lock().unwrap().reset();
112	}
113
114	/// Appends a new item to the store.
115	///
116	/// # Arguments
117	/// * `data` - JSON value to store
118	///
119	/// # Examples
120	/// ```
121	/// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
122	/// use serde_json::json;
123	///
124	/// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
125	///     write_key: "test".into(),
126	///     max_items: 100,
127	///     max_fetch_size: 1024,
128	/// }));
129	///
130	/// // Append a single value
131	/// db.append(json!({"event": "user_login", "user_id": 123})).unwrap();
132	///
133	/// // Append structured data
134	/// db.append(json!({
135	///     "event": "purchase",
136	///     "details": {
137	///         "item_id": "ABC123",
138	///         "amount": 99.99,
139	///         "currency": "USD"
140	///     }
141	/// })).unwrap();
142	/// ```
143	pub fn append(&self, data: Value) -> Result<()> {
144		self.store.lock().unwrap().append(data)
145	}
146
147	/// Fetches a batch of data from the store, respecting optional count and size limits.
148	///
149	/// # Arguments
150	/// * `count` - Optional maximum number of items to fetch
151	/// * `max_bytes` - Optional maximum total size in bytes to fetch
152	///
153	/// # Examples
154	/// ```
155	/// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
156	/// use serde_json::json;
157	///
158	/// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
159	///     write_key: "test".into(),
160	///     max_items: 100,
161	///     max_fetch_size: 1024,
162	/// }));
163	///
164	/// // Add some data
165	/// for i in 0..5 {
166	///     db.append(json!({"index": i})).unwrap();
167	/// }
168	///
169	/// // Fetch up to 3 items
170	/// if let Ok(Some(result)) = db.fetch(Some(3), None) {
171	///     // Process the data
172	///     if let Some(data) = result.data {
173	///         println!("Fetched data: {:?}", data);
174	///     }
175	///
176	///     // Clean up the fetched items
177	///     if let Some(removable) = result.removable {
178	///         db.remove(&removable).unwrap();
179	///     }
180	/// }
181	///
182	/// // Fetch items with size limit (1KB)
183	/// let result = db.fetch(None, Some(1024));
184	/// ```
185	pub fn fetch(
186		&self,
187		count: Option<usize>,
188		max_bytes: Option<usize>,
189	) -> Result<Option<DataResult<T>>> {
190		self.store.lock().unwrap().fetch(count, max_bytes)
191	}
192
193	/// Removes previously fetched data from the store.
194	///
195	/// # Arguments
196	/// * `data` - Slice of removable items from a previous fetch operation
197	///
198	/// # Examples
199	/// ```
200	/// use transientdb::{TransientDB, MemoryStore, MemoryConfig};
201	/// use serde_json::json;
202	///
203	/// let db = TransientDB::new(MemoryStore::new(MemoryConfig {
204	///     write_key: "test".into(),
205	///     max_items: 100,
206	///     max_fetch_size: 1024,
207	/// }));
208	///
209	/// // Add and fetch data
210	/// db.append(json!({"test": "data"})).unwrap();
211	///
212	/// if let Ok(Some(result)) = db.fetch(None, None) {
213	///     // Process the data...
214	///
215	///     // Then remove the processed items
216	///     if let Some(removable) = result.removable {
217	///         db.remove(&removable).unwrap();
218	///     }
219	/// }
220	/// ```
221	pub fn remove(&self, data: &[Box<dyn Equivalent>]) -> Result<()> {
222		self.store.lock().unwrap().remove(data)
223	}
224}