Skip to main content

sentinel_dbms/collection/
coll.rs

1use std::{path::PathBuf, sync::Arc};
2
3use tokio::fs as tokio_fs;
4use tracing::{debug, warn};
5use sentinel_wal::WalManager;
6
7use crate::{
8    constants::COLLECTION_METADATA_FILE,
9    metadata::CollectionMetadata,
10    validation::is_valid_document_id_chars,
11    Result,
12    SentinelError,
13};
14
15/// A collection represents a namespace for documents in the Sentinel database.
16///
17/// Collections are backed by filesystem directories, where each document is stored
18/// as a JSON file with metadata including version, timestamps, hash, and optional signature.
19/// The collection provides CRUD operations (Create, Read, Update, Delete) and advanced
20/// querying capabilities with streaming support for memory-efficient handling of large datasets.
21///
22/// # Structure
23///
24/// Each collection is stored in a directory with the following structure:
25/// - `{collection_name}/` - Root directory for the collection
26/// - `{collection_name}/{id}.json` - Individual document files with embedded metadata
27/// - `{collection_name}/.deleted/` - Soft-deleted documents (for recovery)
28/// - `{collection_name}/.metadata.json` - Collection metadata and indices (future)
29///
30/// # Streaming Operations
31///
32/// For memory efficiency with large datasets, operations like `filter()` and `query()`
33/// return async streams that process documents one-by-one rather than loading
34/// all documents into memory simultaneously.
35///
36/// # Example
37///
38/// ```rust
39/// use sentinel_dbms::{Store, Collection};
40/// use futures::TryStreamExt;
41/// use serde_json::json;
42///
43/// # async fn example() -> sentinel_dbms::Result<()> {
44/// // Create a store and get a collection
45/// let store = Store::new("/tmp/sentinel", None).await?;
46/// let collection = store.collection("users").await?;
47///
48/// // Insert a document
49/// let user_data = json!({
50///     "name": "Alice",
51///     "email": "alice@example.com",
52///     "age": 30
53/// });
54/// collection.insert("user-123", user_data).await?;
55///
56/// // Retrieve the document
57/// let doc = collection.get("user-123").await?;
58/// assert!(doc.is_some());
59/// assert_eq!(doc.unwrap().id(), "user-123");
60///
61/// // Stream all documents matching a predicate
62/// let adults = collection.filter(|doc| {
63///     doc.data().get("age")
64///         .and_then(|v| v.as_i64())
65///         .map_or(false, |age| age >= 18)
66/// });
67/// let adult_docs: Vec<_> = adults.try_collect().await?;
68/// assert_eq!(adult_docs.len(), 1);
69/// # Ok(())
70/// # }
71/// ```
72#[derive(Debug)]
73#[allow(
74    clippy::field_scoped_visibility_modifiers,
75    reason = "fields need to be pub(crate) for internal access"
76)]
77pub struct Collection {
78    /// The filesystem path to the collection directory.
79    pub(crate) path:               PathBuf,
80    /// The signing key for the collection.
81    pub(crate) signing_key:        Option<Arc<sentinel_crypto::SigningKey>>,
82    /// The Write-Ahead Log manager for durability.
83    pub(crate) wal_manager:        Option<Arc<WalManager>>,
84    /// WAL configuration stored in metadata (without temporary overrides).
85    pub(crate) stored_wal_config:  sentinel_wal::CollectionWalConfig,
86    /// Effective WAL configuration (stored + any temporary overrides).
87    pub(crate) wal_config:         sentinel_wal::CollectionWalConfig,
88    /// When the collection was created.
89    pub(crate) created_at:         chrono::DateTime<chrono::Utc>,
90    /// When the collection was last updated.
91    pub(crate) updated_at:         std::sync::RwLock<chrono::DateTime<chrono::Utc>>,
92    /// When the collection was last checkpointed.
93    pub(crate) last_checkpoint_at: std::sync::RwLock<Option<chrono::DateTime<chrono::Utc>>>,
94    /// Total number of documents in the collection.
95    pub(crate) total_documents:    std::sync::Arc<std::sync::atomic::AtomicU64>,
96    /// Total size of all documents in the collection in bytes.
97    pub(crate) total_size_bytes:   std::sync::Arc<std::sync::atomic::AtomicU64>,
98    /// Event sender for notifying the store of metadata changes.
99    pub(crate) event_sender:       Option<tokio::sync::mpsc::UnboundedSender<crate::events::StoreEvent>>,
100    /// Background task handle for processing internal events.
101    pub(crate) event_task:         Option<tokio::task::JoinHandle<()>>,
102    /// Whether the collection is currently in recovery mode (skip WAL logging).
103    pub(crate) recovery_mode:      std::sync::atomic::AtomicBool,
104}
105
106#[allow(
107    clippy::multiple_inherent_impl,
108    reason = "multiple impl blocks for Collection are intentional for organization"
109)]
110impl Collection {
111    /// Returns the name of the collection.
112    pub fn name(&self) -> &str { self.path.file_name().unwrap().to_str().unwrap() }
113
114    /// Returns the creation timestamp of the collection.
115    pub const fn created_at(&self) -> chrono::DateTime<chrono::Utc> { self.created_at }
116
117    /// Returns the last update timestamp of the collection.
118    pub fn updated_at(&self) -> chrono::DateTime<chrono::Utc> { *self.updated_at.read().unwrap() }
119
120    /// Returns the last checkpoint timestamp of the collection, if any.
121    pub fn last_checkpoint_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
122        *self.last_checkpoint_at.read().unwrap()
123    }
124
125    /// Returns the total number of documents in the collection.
126    pub fn total_documents(&self) -> u64 {
127        self.total_documents
128            .load(std::sync::atomic::Ordering::Relaxed)
129    }
130
131    /// Returns the total size of all documents in the collection in bytes.
132    pub fn total_size_bytes(&self) -> u64 {
133        self.total_size_bytes
134            .load(std::sync::atomic::Ordering::Relaxed)
135    }
136
137    /// Returns a reference to the stored WAL configuration for this collection.
138    ///
139    /// This is the WAL configuration as persisted in the collection metadata,
140    /// without any temporary overrides that may be applied at runtime.
141    pub const fn stored_wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.stored_wal_config }
142
143    /// Returns the effective WAL configuration for this collection.
144    ///
145    /// This includes the stored configuration plus any runtime overrides that
146    /// may have been applied when the collection was accessed.
147    pub const fn wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.wal_config }
148
149    /// Saves the current collection metadata to disk.
150    ///
151    /// This method persists the collection's current state (document count, size, timestamps,
152    /// and WAL configuration) to the `.metadata.json` file in the collection directory. This
153    /// ensures that metadata remains consistent across restarts and can be used for monitoring
154    /// and optimization.
155    ///
156    /// # Returns
157    ///
158    /// Returns `Ok(())` on success, or a `SentinelError` if the metadata cannot be saved.
159    pub async fn save_metadata(&self) -> Result<()> {
160        let metadata_path = self.path.join(COLLECTION_METADATA_FILE);
161
162        // Load existing metadata to preserve other fields
163        let mut metadata = if tokio_fs::try_exists(&metadata_path).await.unwrap_or(false) {
164            let content = tokio_fs::read_to_string(&metadata_path).await?;
165            serde_json::from_str(&content)?
166        }
167        else {
168            // Create new metadata if file doesn't exist
169            CollectionMetadata::new(self.name().to_owned())
170        };
171
172        // Update the runtime statistics
173        metadata.document_count = self.total_documents();
174        metadata.total_size_bytes = self.total_size_bytes();
175        metadata.updated_at = std::time::SystemTime::now()
176            .duration_since(std::time::UNIX_EPOCH)
177            .unwrap()
178            .as_secs();
179
180        // Update the WAL configuration
181        metadata.wal_config = Some(self.stored_wal_config.clone());
182
183        // Save back to disk
184        let content = serde_json::to_string_pretty(&metadata)?;
185        tokio_fs::write(&metadata_path, content).await?;
186
187        debug!("Collection metadata saved for {}", self.name());
188        Ok(())
189    }
190
191    /// Flushes any pending metadata changes to disk immediately.
192    ///
193    /// This method forces a synchronous save of the collection metadata to disk,
194    /// bypassing the normal debounced save mechanism. This is useful for tests
195    /// and for ensuring data durability when needed.
196    ///
197    /// # Returns
198    ///
199    /// Returns `Ok(())` on success, or a `SentinelError` if the metadata cannot be saved.
200    pub async fn flush_metadata(&self) -> Result<()> { self.save_metadata().await }
201
202    /// Validates a document ID according to filesystem-safe naming rules.
203    ///
204    /// Document IDs must be filesystem-safe and cannot contain reserved characters
205    /// or Windows reserved names. This prevents issues with file operations and
206    /// ensures cross-platform compatibility.
207    ///
208    /// # Arguments
209    ///
210    /// * `id` - The document ID to validate.
211    ///
212    /// # Returns
213    ///
214    /// Returns `Ok(())` if the ID is valid, or a `SentinelError::InvalidDocumentId`
215    /// if the ID contains invalid characters or is a reserved name.
216    ///
217    /// # Validation Rules
218    ///
219    /// - Must not be empty
220    /// - Must not contain path separators (`/` or `\`)
221    /// - Must not contain control characters (0x00-0x1F)
222    /// - Must not contain Windows reserved characters (`< > : " | ? *`)
223    /// - Must not be a Windows reserved name (CON, PRN, AUX, NUL, COM1-9, LPT1-9)
224    /// - Must not contain spaces or other filesystem-unsafe characters
225    ///
226    /// # Examples
227    ///
228    /// ```rust
229    /// use sentinel_dbms::Collection;
230    ///
231    /// // Valid IDs
232    /// assert!(Collection::validate_document_id("user-123").is_ok());
233    /// assert!(Collection::validate_document_id("my_document").is_ok());
234    ///
235    /// // Invalid IDs
236    /// assert!(Collection::validate_document_id("").is_err()); // empty
237    /// assert!(Collection::validate_document_id("path/file").is_err()); // path separator
238    /// assert!(Collection::validate_document_id("CON").is_err()); // reserved name
239    /// ```
240    pub fn validate_document_id(id: &str) -> Result<()> {
241        if id.is_empty() {
242            return Err(SentinelError::InvalidDocumentId {
243                id: id.to_owned(),
244            });
245        }
246
247        // Check for path separators
248        if id.contains('/') || id.contains('\\') {
249            return Err(SentinelError::InvalidDocumentId {
250                id: id.to_owned(),
251            });
252        }
253
254        // Check for control characters
255        if id.chars().any(|c| c.is_control()) {
256            return Err(SentinelError::InvalidDocumentId {
257                id: id.to_owned(),
258            });
259        }
260
261        // Check for Windows reserved characters
262        let reserved_chars = ['<', '>', ':', '"', '|', '?', '*'];
263        if id.chars().any(|c| reserved_chars.contains(&c)) {
264            return Err(SentinelError::InvalidDocumentId {
265                id: id.to_owned(),
266            });
267        }
268
269        // Check for Windows reserved names (case-insensitive)
270        let reserved_names = [
271            "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1",
272            "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
273        ];
274        let upper_id = id.to_uppercase();
275        for reserved in &reserved_names {
276            if upper_id == *reserved || upper_id.starts_with(&format!("{}.", reserved)) {
277                return Err(SentinelError::InvalidDocumentId {
278                    id: id.to_owned(),
279                });
280            }
281        }
282
283        // Check for other filesystem-unsafe characters
284        if !is_valid_document_id_chars(id) {
285            return Err(SentinelError::InvalidDocumentId {
286                id: id.to_owned(),
287            });
288        }
289
290        Ok(())
291    }
292
293    /// Starts the background event processing task for the collection.
294    ///
295    /// This method spawns an async task that processes internal collection events
296    /// such as metadata updates and WAL operations. The task runs in the background
297    /// and handles events sent via the event channel.
298    ///
299    /// The event processor is responsible for:
300    /// - Processing document events (insert, update, delete)
301    /// - Debounced metadata persistence (every 500ms)
302    /// - Coordinating with the store's event system
303    ///
304    /// # Note
305    ///
306    /// This method should only be called once during collection initialization.
307    /// Multiple calls will replace the previous event task.
308    #[allow(
309        clippy::integer_division_remainder_used,
310        reason = "false positive in tokio select"
311    )]
312    pub fn start_event_processor(&mut self) {
313        let mut event_receiver = {
314            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
315            self.event_sender = Some(tx);
316            rx
317        };
318
319        // Clone necessary fields for the background task
320        let path = self.path.clone();
321        let total_documents = self.total_documents.clone();
322        let total_size_bytes = self.total_size_bytes.clone();
323        let updated_at = std::sync::Arc::new(std::sync::RwLock::new(*self.updated_at.read().unwrap()));
324
325        let task = tokio::spawn(async move {
326            // Debouncing: save metadata every 500 milliseconds instead of after every event
327            let mut save_interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
328            save_interval.tick().await; // First tick completes immediately
329
330            let mut changed = false;
331
332            loop {
333                tokio::select! {
334                    // Process events
335                    event = event_receiver.recv() => {
336                        match event {
337                            Some(crate::events::StoreEvent::CollectionCreated {
338                                ..
339                            }) => {
340                                // Collection creation is handled by the store
341                            },
342                            Some(crate::events::StoreEvent::CollectionDeleted {
343                                ..
344                            }) => {
345                                // Collection dropping is handled by the store
346                            },
347                            Some(crate::events::StoreEvent::DocumentInserted {
348                                collection,
349                                size_bytes,
350                            }) => {
351                                tracing::debug!("Processing document inserted event: {} (size: {})", collection, size_bytes);
352                                // Update atomic counters asynchronously
353                                total_documents.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
354                                total_size_bytes.fetch_add(size_bytes, std::sync::atomic::Ordering::Relaxed);
355                                changed = true;
356                            },
357                            Some(crate::events::StoreEvent::DocumentUpdated {
358                                collection,
359                                old_size_bytes,
360                                new_size_bytes,
361                            }) => {
362                                tracing::debug!("Processing document updated event: {} (old: {}, new: {})",
363                                    collection, old_size_bytes, new_size_bytes);
364                                // Update atomic counters asynchronously
365                                total_size_bytes.fetch_sub(old_size_bytes, std::sync::atomic::Ordering::Relaxed);
366                                total_size_bytes.fetch_add(new_size_bytes, std::sync::atomic::Ordering::Relaxed);
367                                changed = true;
368                            },
369                            Some(crate::events::StoreEvent::DocumentDeleted {
370                                collection,
371                                size_bytes,
372                            }) => {
373                                tracing::debug!("Processing document deleted event: {} (size: {})", collection, size_bytes);
374                                // Update atomic counters asynchronously
375                                total_documents.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
376                                total_size_bytes.fetch_sub(size_bytes, std::sync::atomic::Ordering::Relaxed);
377                                changed = true;
378                            },
379                            None => {
380                                // Channel closed, exit
381                                break;
382                            },
383                        }
384                    }
385
386                    // Periodic metadata save
387                    _ = save_interval.tick() => {
388                        if changed {
389                            // Update the updated_at timestamp (read lock and update)
390                            let now = chrono::Utc::now();
391                            *updated_at.write().unwrap() = now;
392
393                            // Load current values from atomic counters
394                            let document_count = total_documents.load(std::sync::atomic::Ordering::Relaxed);
395                            let size_bytes = total_size_bytes.load(std::sync::atomic::Ordering::Relaxed);
396
397                            // Load existing metadata to preserve other fields
398                            let metadata_path = path.join(crate::constants::COLLECTION_METADATA_FILE);
399                            let mut metadata = if tokio::fs::try_exists(&metadata_path).await.unwrap_or(false) {
400                                match tokio::fs::read_to_string(&metadata_path).await {
401                                    Ok(content) => match serde_json::from_str(&content) {
402                                        Ok(m) => m,
403                                        Err(e) => {
404                                            tracing::error!("Failed to parse collection metadata: {}", e);
405                                            continue;
406                                        }
407                                    },
408                                    Err(e) => {
409                                        tracing::error!("Failed to read collection metadata: {}", e);
410                                        continue;
411                                    }
412                                }
413                            } else {
414                                tracing::warn!("Collection metadata file not found, creating new");
415                                crate::CollectionMetadata::new(path.file_name().unwrap().to_str().unwrap().to_owned())
416                            };
417
418                            // Update the runtime statistics
419                            metadata.document_count = document_count;
420                            metadata.total_size_bytes = size_bytes;
421                            metadata.updated_at = std::time::SystemTime::now()
422                                .duration_since(std::time::UNIX_EPOCH)
423                                .unwrap()
424                                .as_secs();
425
426                            // Save back to disk
427                            match serde_json::to_string_pretty(&metadata) {
428                                Ok(content) => {
429                                    if let Err(e) = tokio::fs::write(&metadata_path, content).await {
430                                        tracing::error!("Failed to save collection metadata in background task: {}", e);
431                                    } else {
432                                        tracing::trace!("Collection metadata saved successfully for {:?}", path);
433                                        changed = false;
434                                    }
435                                }
436                                Err(e) => {
437                                    tracing::error!("Failed to serialize collection metadata: {}", e);
438                                }
439                            }
440                        }
441                    }
442                }
443            }
444        });
445
446        self.event_task = Some(task);
447    }
448
449    /// Emits an event to the store's event system.
450    ///
451    /// This is an internal method used to notify the store of collection-level
452    /// events such as document insertions, updates, and deletions. The events
453    /// are sent asynchronously and do not block the calling operation.
454    ///
455    /// # Arguments
456    /// Emits an event to the collection's event sender.
457    ///
458    /// * `event` - The event to emit to the store.
459    pub fn emit_event(&self, event: crate::events::StoreEvent) {
460        if let Some(sender) = self.event_sender.as_ref() &&
461            let Err(e) = sender.send(event)
462        {
463            warn!("Failed to emit collection event: {}", e);
464        }
465    }
466}