sentinel_dbms/store/operations.rs
1use std::sync::Arc;
2
3use tokio::fs as tokio_fs;
4use tracing::{debug, error, trace};
5use sentinel_wal::WalManager;
6
7use crate::{
8 events::StoreEvent,
9 Collection,
10 CollectionMetadata,
11 Result,
12 COLLECTION_METADATA_FILE,
13 DATA_DIR,
14 WAL_DIR,
15 WAL_FILE,
16};
17use super::{stor::Store, validation::validate_collection_name};
18
19/// Retrieves or creates a collection with the specified name and custom WAL configuration
20/// overrides.
21///
22/// This is an internal function used by the Store impl. Use Store::collection_with_config instead.
23pub async fn collection_with_config(
24 store: &Store,
25 name: &str,
26 wal_overrides: Option<sentinel_wal::CollectionWalConfigOverrides>,
27) -> Result<Collection> {
28 trace!("Accessing collection: {} with custom WAL config", name);
29 validate_collection_name(name)?;
30 let path = store.root_path.join(DATA_DIR).join(name);
31 tokio_fs::create_dir_all(&path).await.map_err(|e| {
32 error!("Failed to create collection directory {:?}: {}", path, e);
33 e
34 })?;
35 debug!("Collection directory ensured: {:?}", path);
36
37 // Load or create collection metadata
38 let metadata_path = path.join(COLLECTION_METADATA_FILE);
39 let is_new_collection = !tokio_fs::try_exists(&metadata_path).await.unwrap_or(false);
40 let metadata = if is_new_collection {
41 debug!("Creating new collection metadata for {}", name);
42 let mut metadata = CollectionMetadata::new(name.to_owned());
43 // For new collections, if overrides are provided, create a config with overrides applied to
44 // defaults
45 if let Some(overrides) = wal_overrides.as_ref() {
46 let base_config = store
47 .wal_config
48 .collection_configs
49 .get(name)
50 .cloned()
51 .unwrap_or_else(|| store.wal_config.default_collection_config.clone());
52 let merged_config = base_config.apply_overrides(overrides);
53 metadata.wal_config = Some(merged_config);
54 }
55 let content = serde_json::to_string_pretty(&metadata)?;
56 tokio_fs::write(&metadata_path, content).await?;
57 metadata
58 }
59 else {
60 debug!("Loading existing collection metadata for {}", name);
61 let content = tokio_fs::read_to_string(&metadata_path).await?;
62 let mut metadata: CollectionMetadata = serde_json::from_str(&content)?;
63 // For existing collections, conditionally update metadata if persist_overrides is true
64 if let Some(overrides) = wal_overrides.as_ref() &&
65 overrides.persist_overrides
66 {
67 let base_config = metadata.wal_config.unwrap_or_else(|| {
68 store
69 .wal_config
70 .collection_configs
71 .get(name)
72 .cloned()
73 .unwrap_or_else(|| store.wal_config.default_collection_config.clone())
74 });
75 let merged_config = base_config.apply_overrides(overrides);
76 metadata.wal_config = Some(merged_config);
77 let content = serde_json::to_string_pretty(&metadata)?;
78 tokio_fs::write(&metadata_path, content).await?;
79 }
80 metadata
81 };
82
83 // If this is a new collection, emit event (metadata will be saved by event handler)
84 if is_new_collection {
85 // Emit collection created event
86 let event = StoreEvent::CollectionCreated {
87 name: name.to_owned(),
88 };
89 let _ = store.event_sender.send(event).ok();
90 }
91
92 // Get collection WAL config: use metadata's config, or provided config, or fall back to
93 // store-derived
94 let stored_wal_config = metadata.wal_config.clone().unwrap_or_else(|| {
95 store
96 .wal_config
97 .collection_configs
98 .get(name)
99 .cloned()
100 .unwrap_or_else(|| store.wal_config.default_collection_config.clone())
101 });
102
103 let mut collection_wal_config = stored_wal_config.clone();
104
105 // Apply overrides if provided
106 if let Some(overrides) = wal_overrides {
107 collection_wal_config = collection_wal_config.apply_overrides(&overrides);
108 }
109
110 // Create WAL manager with collection config
111 let wal_path = path.join(WAL_DIR).join(WAL_FILE);
112 let wal_manager = Some(Arc::new(
113 WalManager::new(wal_path, collection_wal_config.clone().into()).await?,
114 ));
115
116 trace!("Collection '{}' accessed successfully", name);
117 let now = chrono::Utc::now();
118
119 // Update store metadata
120 *store.last_accessed_at.write().unwrap() = now;
121
122 let mut collection = Collection {
123 path,
124 signing_key: store.signing_key.clone(),
125 wal_manager,
126 wal_config: collection_wal_config,
127 stored_wal_config,
128 created_at: now,
129 updated_at: std::sync::RwLock::new(now),
130 last_checkpoint_at: std::sync::RwLock::new(None),
131 total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(metadata.document_count)),
132 total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(metadata.total_size_bytes)),
133 event_sender: Some(store.event_sender.clone()),
134 event_task: None,
135 recovery_mode: std::sync::atomic::AtomicBool::new(false),
136 };
137 collection.start_event_processor();
138 Ok(collection)
139}
140
141#[allow(
142 clippy::multiple_inherent_impl,
143 reason = "multiple impl blocks for Store are intentional for organization"
144)]
145impl Store {
146 /// Retrieves or creates a collection with the specified name.
147 ///
148 /// This method provides access to a named collection within the store. If the
149 /// collection directory doesn't exist, it will be created automatically under
150 /// the `data/` subdirectory of the store's root path.
151 ///
152 /// # Parameters
153 ///
154 /// * `name` - The name of the collection. This will be used as the directory name under
155 /// `data/`. The name should be filesystem-safe (avoid special characters that are invalid in
156 /// directory names on your target platform).
157 ///
158 /// # Returns
159 ///
160 /// * `Result<Collection>` - Returns a `Collection` instance on success, or a `SentinelError`
161 /// if:
162 /// - The collection directory cannot be created due to permission issues
163 /// - The name contains invalid characters for the filesystem
164 /// - I/O errors occur during directory creation
165 ///
166 /// # Examples
167 ///
168 /// ```no_run
169 /// use sentinel_dbms::Store;
170 /// use serde_json::json;
171 ///
172 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
173 /// let store = Store::new("/var/lib/sentinel", None).await?;
174 ///
175 /// // Access a users collection
176 /// let users = store.collection("users").await?;
177 ///
178 /// // Insert a document into the collection
179 /// users.insert("user-123", json!({
180 /// "name": "Alice",
181 /// "email": "alice@example.com"
182 /// })).await?;
183 ///
184 /// // Access multiple collections
185 /// let audit_logs = store.collection("audit_logs").await?;
186 /// let certificates = store.collection("certificates").await?;
187 /// # Ok(())
188 /// # }
189 /// ```
190 ///
191 /// # Collection Naming
192 ///
193 /// Collection names should follow these guidelines:
194 /// - Use lowercase letters, numbers, underscores, and hyphens
195 /// - Avoid spaces and special characters
196 /// - Keep names descriptive but concise (e.g., `users`, `audit_logs`, `api_keys`)
197 ///
198 /// # Notes
199 ///
200 /// - Calling this method multiple times with the same name returns separate `Collection`
201 /// instances pointing to the same directory
202 /// - The `data/` subdirectory is created automatically on first collection access
203 /// - Collections are not cached; each call creates a new `Collection` instance
204 /// - No validation is performed on the collection name beyond filesystem constraints
205 #[deprecated(
206 since = "2.0.2",
207 note = "Please use collection_with_config to specify WAL configuration"
208 )]
209 pub async fn collection(&self, name: &str) -> Result<Collection> { collection_with_config(self, name, None).await }
210
211 /// Retrieves or creates a collection with the specified name and custom WAL configuration
212 /// overrides.
213 ///
214 /// This method provides access to a named collection within the store with custom WAL settings
215 /// that override the stored or default configuration. If the collection directory doesn't
216 /// exist, it will be created automatically under the `data/` subdirectory of the store's
217 /// root path.
218 ///
219 /// # Parameters
220 ///
221 /// * `name` - The name of the collection. This will be used as the directory name under
222 /// `data/`. The name should be filesystem-safe.
223 /// * `wal_overrides` - Optional WAL configuration overrides for this collection
224 ///
225 /// # Returns
226 ///
227 /// * `Result<Collection>` - Returns a `Collection` instance on success, or a `SentinelError`
228 /// if:
229 /// - The collection directory cannot be created due to permission issues
230 /// - The name contains invalid characters for the filesystem
231 /// - I/O errors occur during directory creation
232 ///
233 /// # Examples
234 ///
235 /// ```no_run
236 /// use sentinel_dbms::Store;
237 /// use sentinel_wal::CollectionWalConfigOverrides;
238 /// use serde_json::json;
239 ///
240 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
241 /// let store = Store::new("/var/lib/sentinel", None).await?;
242 /// let wal_overrides = CollectionWalConfigOverrides {
243 /// write_mode: Some(sentinel_wal::WalFailureMode::Warn),
244 /// ..Default::default()
245 /// };
246 ///
247 /// // Access a users collection with WAL overrides
248 /// let users = store.collection_with_config("users", Some(wal_overrides)).await?;
249 ///
250 /// // Insert a document into the collection
251 /// users.insert("user-123", json!({
252 /// "name": "Alice",
253 /// "email": "alice@example.com"
254 /// })).await?;
255 /// # Ok(())
256 /// # }
257 /// ```
258 pub async fn collection_with_config(
259 &self,
260 name: &str,
261 wal_overrides: Option<sentinel_wal::CollectionWalConfigOverrides>,
262 ) -> Result<Collection> {
263 collection_with_config(self, name, wal_overrides).await
264 }
265
266 /// Deletes a collection and all its documents.
267 ///
268 /// This method removes the entire collection directory and all documents within it.
269 /// The operation is permanent and cannot be undone. If the collection doesn't exist,
270 /// the operation succeeds silently (idempotent).
271 ///
272 /// # Arguments
273 ///
274 /// * `name` - The name of the collection to delete
275 ///
276 /// # Returns
277 ///
278 /// Returns `Ok(())` on success, or a `SentinelError` if the operation fails.
279 ///
280 /// # Examples
281 ///
282 /// ```rust
283 /// use sentinel_dbms::Store;
284 ///
285 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
286 /// let store = Store::new("/path/to/data", None).await?;
287 ///
288 /// // Create a collection
289 /// let collection = store.collection("temp_collection").await?;
290 ///
291 /// // ... use collection ...
292 ///
293 /// // Delete the collection
294 /// store.delete_collection("temp_collection").await?;
295 /// # Ok(())
296 /// # }
297 /// ```
298 pub async fn delete_collection(&self, name: &str) -> Result<()> {
299 trace!("Deleting collection: {}", name);
300 validate_collection_name(name)?;
301 let path = self.root_path.join("data").join(name);
302
303 // Check if collection exists
304 if !path.exists() {
305 debug!("Collection '{}' does not exist, nothing to delete", name);
306 return Ok(());
307 }
308
309 // Load collection metadata to get document count and size before deletion
310 let metadata_path = path.join(COLLECTION_METADATA_FILE);
311 let collection_metadata = if tokio_fs::try_exists(&metadata_path).await.unwrap_or(false) {
312 let content = tokio_fs::read_to_string(&metadata_path).await?;
313 Some(serde_json::from_str::<CollectionMetadata>(&content)?)
314 }
315 else {
316 None
317 };
318
319 // Remove the entire directory
320 tokio_fs::remove_dir_all(&path).await.map_err(|e| {
321 error!("Failed to delete collection directory {:?}: {}", path, e);
322 e
323 })?;
324
325 debug!("Collection '{}' deleted successfully", name);
326
327 // Update store metadata
328 *self.last_accessed_at.write().unwrap() = chrono::Utc::now();
329 if let Some(metadata) = collection_metadata {
330 // Emit collection deleted event (metadata will be saved by event handler)
331 let event = StoreEvent::CollectionDeleted {
332 name: name.to_owned(),
333 document_count: metadata.document_count,
334 total_size_bytes: metadata.total_size_bytes,
335 };
336 drop(self.event_sender.send(event));
337 }
338
339 Ok(())
340 }
341
342 /// This method returns a list of all collection names that exist in the store.
343 /// The names are returned in no particular order.
344 ///
345 /// # Returns
346 ///
347 /// Returns a `Vec<String>` containing the names of all collections.
348 ///
349 /// # Examples
350 ///
351 /// ```rust
352 /// use sentinel_dbms::Store;
353 ///
354 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
355 /// let store = Store::new("/path/to/data", None).await?;
356 ///
357 /// // Create some collections
358 /// store.collection("users").await?;
359 /// store.collection("products").await?;
360 ///
361 /// // List all collections
362 /// let collections = store.list_collections().await?;
363 /// assert!(collections.contains(&"users".to_string()));
364 /// assert!(collections.contains(&"products".to_string()));
365 /// # Ok(())
366 /// # }
367 /// ```
368 pub async fn list_collections(&self) -> Result<Vec<String>> {
369 trace!("Listing collections");
370 let data_path = self.root_path.join("data");
371
372 // Ensure data directory exists
373 tokio_fs::create_dir_all(&data_path).await.map_err(|e| {
374 error!("Failed to create data directory {:?}: {}", data_path, e);
375 e
376 })?;
377
378 // Read directory entries
379 let mut entries = tokio_fs::read_dir(&data_path).await.map_err(|e| {
380 error!("Failed to read data directory {:?}: {}", data_path, e);
381 e
382 })?;
383
384 let mut collections = Vec::new();
385 while let Some(entry) = entries.next_entry().await.map_err(|e| {
386 error!("Failed to read directory entry: {}", e);
387 e
388 })? {
389 if entry
390 .file_type()
391 .await
392 .map_err(|e| {
393 error!("Failed to get file type for entry: {}", e);
394 e
395 })?
396 .is_dir() &&
397 let Some(name) = entry.file_name().to_str()
398 {
399 collections.push(name.to_owned());
400 }
401 }
402
403 debug!("Found {} collections", collections.len());
404 Ok(collections)
405 }
406
407 pub fn set_signing_key(&mut self, key: sentinel_crypto::SigningKey) { self.signing_key = Some(Arc::new(key)); }
408}