Skip to main content

sentinel_dbms/store/
operations.rs

1use std::sync::Arc;
2
3use tokio::fs as tokio_fs;
4use tracing::{debug, error, trace};
5use sentinel_wal::WalManager;
6
7use crate::{
8    events::StoreEvent,
9    Collection,
10    CollectionMetadata,
11    Result,
12    COLLECTION_METADATA_FILE,
13    DATA_DIR,
14    WAL_DIR,
15    WAL_FILE,
16};
17use super::{stor::Store, validation::validate_collection_name};
18
19/// Retrieves or creates a collection with the specified name and custom WAL configuration
20/// overrides.
21///
22/// This is an internal function used by the Store impl. Use Store::collection_with_config instead.
23pub async fn collection_with_config(
24    store: &Store,
25    name: &str,
26    wal_overrides: Option<sentinel_wal::CollectionWalConfigOverrides>,
27) -> Result<Collection> {
28    trace!("Accessing collection: {} with custom WAL config", name);
29    validate_collection_name(name)?;
30    let path = store.root_path.join(DATA_DIR).join(name);
31    tokio_fs::create_dir_all(&path).await.map_err(|e| {
32        error!("Failed to create collection directory {:?}: {}", path, e);
33        e
34    })?;
35    debug!("Collection directory ensured: {:?}", path);
36
37    // Load or create collection metadata
38    let metadata_path = path.join(COLLECTION_METADATA_FILE);
39    let is_new_collection = !tokio_fs::try_exists(&metadata_path).await.unwrap_or(false);
40    let metadata = if is_new_collection {
41        debug!("Creating new collection metadata for {}", name);
42        let mut metadata = CollectionMetadata::new(name.to_owned());
43        // For new collections, if overrides are provided, create a config with overrides applied to
44        // defaults
45        if let Some(overrides) = wal_overrides.as_ref() {
46            let base_config = store
47                .wal_config
48                .collection_configs
49                .get(name)
50                .cloned()
51                .unwrap_or_else(|| store.wal_config.default_collection_config.clone());
52            let merged_config = base_config.apply_overrides(overrides);
53            metadata.wal_config = Some(merged_config);
54        }
55        let content = serde_json::to_string_pretty(&metadata)?;
56        tokio_fs::write(&metadata_path, content).await?;
57        metadata
58    }
59    else {
60        debug!("Loading existing collection metadata for {}", name);
61        let content = tokio_fs::read_to_string(&metadata_path).await?;
62        let mut metadata: CollectionMetadata = serde_json::from_str(&content)?;
63        // For existing collections, conditionally update metadata if persist_overrides is true
64        if let Some(overrides) = wal_overrides.as_ref() &&
65            overrides.persist_overrides
66        {
67            let base_config = metadata.wal_config.unwrap_or_else(|| {
68                store
69                    .wal_config
70                    .collection_configs
71                    .get(name)
72                    .cloned()
73                    .unwrap_or_else(|| store.wal_config.default_collection_config.clone())
74            });
75            let merged_config = base_config.apply_overrides(overrides);
76            metadata.wal_config = Some(merged_config);
77            let content = serde_json::to_string_pretty(&metadata)?;
78            tokio_fs::write(&metadata_path, content).await?;
79        }
80        metadata
81    };
82
83    // If this is a new collection, emit event (metadata will be saved by event handler)
84    if is_new_collection {
85        // Emit collection created event
86        let event = StoreEvent::CollectionCreated {
87            name: name.to_owned(),
88        };
89        let _ = store.event_sender.send(event).ok();
90    }
91
92    // Get collection WAL config: use metadata's config, or provided config, or fall back to
93    // store-derived
94    let stored_wal_config = metadata.wal_config.clone().unwrap_or_else(|| {
95        store
96            .wal_config
97            .collection_configs
98            .get(name)
99            .cloned()
100            .unwrap_or_else(|| store.wal_config.default_collection_config.clone())
101    });
102
103    let mut collection_wal_config = stored_wal_config.clone();
104
105    // Apply overrides if provided
106    if let Some(overrides) = wal_overrides {
107        collection_wal_config = collection_wal_config.apply_overrides(&overrides);
108    }
109
110    // Create WAL manager with collection config
111    let wal_path = path.join(WAL_DIR).join(WAL_FILE);
112    let wal_manager = Some(Arc::new(
113        WalManager::new(wal_path, collection_wal_config.clone().into()).await?,
114    ));
115
116    trace!("Collection '{}' accessed successfully", name);
117    let now = chrono::Utc::now();
118
119    // Update store metadata
120    *store.last_accessed_at.write().unwrap() = now;
121
122    let mut collection = Collection {
123        path,
124        signing_key: store.signing_key.clone(),
125        wal_manager,
126        wal_config: collection_wal_config,
127        stored_wal_config,
128        created_at: now,
129        updated_at: std::sync::RwLock::new(now),
130        last_checkpoint_at: std::sync::RwLock::new(None),
131        total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(metadata.document_count)),
132        total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(metadata.total_size_bytes)),
133        event_sender: Some(store.event_sender.clone()),
134        event_task: None,
135        recovery_mode: std::sync::atomic::AtomicBool::new(false),
136    };
137    collection.start_event_processor();
138    Ok(collection)
139}
140
141#[allow(
142    clippy::multiple_inherent_impl,
143    reason = "multiple impl blocks for Store are intentional for organization"
144)]
145impl Store {
146    /// Retrieves or creates a collection with the specified name.
147    ///
148    /// This method provides access to a named collection within the store. If the
149    /// collection directory doesn't exist, it will be created automatically under
150    /// the `data/` subdirectory of the store's root path.
151    ///
152    /// # Parameters
153    ///
154    /// * `name` - The name of the collection. This will be used as the directory name under
155    ///   `data/`. The name should be filesystem-safe (avoid special characters that are invalid in
156    ///   directory names on your target platform).
157    ///
158    /// # Returns
159    ///
160    /// * `Result<Collection>` - Returns a `Collection` instance on success, or a `SentinelError`
161    ///   if:
162    ///   - The collection directory cannot be created due to permission issues
163    ///   - The name contains invalid characters for the filesystem
164    ///   - I/O errors occur during directory creation
165    ///
166    /// # Examples
167    ///
168    /// ```no_run
169    /// use sentinel_dbms::Store;
170    /// use serde_json::json;
171    ///
172    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
173    /// let store = Store::new("/var/lib/sentinel", None).await?;
174    ///
175    /// // Access a users collection
176    /// let users = store.collection("users").await?;
177    ///
178    /// // Insert a document into the collection
179    /// users.insert("user-123", json!({
180    ///     "name": "Alice",
181    ///     "email": "alice@example.com"
182    /// })).await?;
183    ///
184    /// // Access multiple collections
185    /// let audit_logs = store.collection("audit_logs").await?;
186    /// let certificates = store.collection("certificates").await?;
187    /// # Ok(())
188    /// # }
189    /// ```
190    ///
191    /// # Collection Naming
192    ///
193    /// Collection names should follow these guidelines:
194    /// - Use lowercase letters, numbers, underscores, and hyphens
195    /// - Avoid spaces and special characters
196    /// - Keep names descriptive but concise (e.g., `users`, `audit_logs`, `api_keys`)
197    ///
198    /// # Notes
199    ///
200    /// - Calling this method multiple times with the same name returns separate `Collection`
201    ///   instances pointing to the same directory
202    /// - The `data/` subdirectory is created automatically on first collection access
203    /// - Collections are not cached; each call creates a new `Collection` instance
204    /// - No validation is performed on the collection name beyond filesystem constraints
205    #[deprecated(
206        since = "2.0.2",
207        note = "Please use collection_with_config to specify WAL configuration"
208    )]
209    pub async fn collection(&self, name: &str) -> Result<Collection> { collection_with_config(self, name, None).await }
210
211    /// Retrieves or creates a collection with the specified name and custom WAL configuration
212    /// overrides.
213    ///
214    /// This method provides access to a named collection within the store with custom WAL settings
215    /// that override the stored or default configuration. If the collection directory doesn't
216    /// exist, it will be created automatically under the `data/` subdirectory of the store's
217    /// root path.
218    ///
219    /// # Parameters
220    ///
221    /// * `name` - The name of the collection. This will be used as the directory name under
222    ///   `data/`. The name should be filesystem-safe.
223    /// * `wal_overrides` - Optional WAL configuration overrides for this collection
224    ///
225    /// # Returns
226    ///
227    /// * `Result<Collection>` - Returns a `Collection` instance on success, or a `SentinelError`
228    ///   if:
229    ///   - The collection directory cannot be created due to permission issues
230    ///   - The name contains invalid characters for the filesystem
231    ///   - I/O errors occur during directory creation
232    ///
233    /// # Examples
234    ///
235    /// ```no_run
236    /// use sentinel_dbms::Store;
237    /// use sentinel_wal::CollectionWalConfigOverrides;
238    /// use serde_json::json;
239    ///
240    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
241    /// let store = Store::new("/var/lib/sentinel", None).await?;
242    /// let wal_overrides = CollectionWalConfigOverrides {
243    ///     write_mode: Some(sentinel_wal::WalFailureMode::Warn),
244    ///     ..Default::default()
245    /// };
246    ///
247    /// // Access a users collection with WAL overrides
248    /// let users = store.collection_with_config("users", Some(wal_overrides)).await?;
249    ///
250    /// // Insert a document into the collection
251    /// users.insert("user-123", json!({
252    ///     "name": "Alice",
253    ///     "email": "alice@example.com"
254    /// })).await?;
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn collection_with_config(
259        &self,
260        name: &str,
261        wal_overrides: Option<sentinel_wal::CollectionWalConfigOverrides>,
262    ) -> Result<Collection> {
263        collection_with_config(self, name, wal_overrides).await
264    }
265
266    /// Deletes a collection and all its documents.
267    ///
268    /// This method removes the entire collection directory and all documents within it.
269    /// The operation is permanent and cannot be undone. If the collection doesn't exist,
270    /// the operation succeeds silently (idempotent).
271    ///
272    /// # Arguments
273    ///
274    /// * `name` - The name of the collection to delete
275    ///
276    /// # Returns
277    ///
278    /// Returns `Ok(())` on success, or a `SentinelError` if the operation fails.
279    ///
280    /// # Examples
281    ///
282    /// ```rust
283    /// use sentinel_dbms::Store;
284    ///
285    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
286    /// let store = Store::new("/path/to/data", None).await?;
287    ///
288    /// // Create a collection
289    /// let collection = store.collection("temp_collection").await?;
290    ///
291    /// // ... use collection ...
292    ///
293    /// // Delete the collection
294    /// store.delete_collection("temp_collection").await?;
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub async fn delete_collection(&self, name: &str) -> Result<()> {
299        trace!("Deleting collection: {}", name);
300        validate_collection_name(name)?;
301        let path = self.root_path.join("data").join(name);
302
303        // Check if collection exists
304        if !path.exists() {
305            debug!("Collection '{}' does not exist, nothing to delete", name);
306            return Ok(());
307        }
308
309        // Load collection metadata to get document count and size before deletion
310        let metadata_path = path.join(COLLECTION_METADATA_FILE);
311        let collection_metadata = if tokio_fs::try_exists(&metadata_path).await.unwrap_or(false) {
312            let content = tokio_fs::read_to_string(&metadata_path).await?;
313            Some(serde_json::from_str::<CollectionMetadata>(&content)?)
314        }
315        else {
316            None
317        };
318
319        // Remove the entire directory
320        tokio_fs::remove_dir_all(&path).await.map_err(|e| {
321            error!("Failed to delete collection directory {:?}: {}", path, e);
322            e
323        })?;
324
325        debug!("Collection '{}' deleted successfully", name);
326
327        // Update store metadata
328        *self.last_accessed_at.write().unwrap() = chrono::Utc::now();
329        if let Some(metadata) = collection_metadata {
330            // Emit collection deleted event (metadata will be saved by event handler)
331            let event = StoreEvent::CollectionDeleted {
332                name:             name.to_owned(),
333                document_count:   metadata.document_count,
334                total_size_bytes: metadata.total_size_bytes,
335            };
336            drop(self.event_sender.send(event));
337        }
338
339        Ok(())
340    }
341
342    /// This method returns a list of all collection names that exist in the store.
343    /// The names are returned in no particular order.
344    ///
345    /// # Returns
346    ///
347    /// Returns a `Vec<String>` containing the names of all collections.
348    ///
349    /// # Examples
350    ///
351    /// ```rust
352    /// use sentinel_dbms::Store;
353    ///
354    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
355    /// let store = Store::new("/path/to/data", None).await?;
356    ///
357    /// // Create some collections
358    /// store.collection("users").await?;
359    /// store.collection("products").await?;
360    ///
361    /// // List all collections
362    /// let collections = store.list_collections().await?;
363    /// assert!(collections.contains(&"users".to_string()));
364    /// assert!(collections.contains(&"products".to_string()));
365    /// # Ok(())
366    /// # }
367    /// ```
368    pub async fn list_collections(&self) -> Result<Vec<String>> {
369        trace!("Listing collections");
370        let data_path = self.root_path.join("data");
371
372        // Ensure data directory exists
373        tokio_fs::create_dir_all(&data_path).await.map_err(|e| {
374            error!("Failed to create data directory {:?}: {}", data_path, e);
375            e
376        })?;
377
378        // Read directory entries
379        let mut entries = tokio_fs::read_dir(&data_path).await.map_err(|e| {
380            error!("Failed to read data directory {:?}: {}", data_path, e);
381            e
382        })?;
383
384        let mut collections = Vec::new();
385        while let Some(entry) = entries.next_entry().await.map_err(|e| {
386            error!("Failed to read directory entry: {}", e);
387            e
388        })? {
389            if entry
390                .file_type()
391                .await
392                .map_err(|e| {
393                    error!("Failed to get file type for entry: {}", e);
394                    e
395                })?
396                .is_dir() &&
397                let Some(name) = entry.file_name().to_str()
398            {
399                collections.push(name.to_owned());
400            }
401        }
402
403        debug!("Found {} collections", collections.len());
404        Ok(collections)
405    }
406
407    pub fn set_signing_key(&mut self, key: sentinel_crypto::SigningKey) { self.signing_key = Some(Arc::new(key)); }
408}