sentinel_dbms/collection/coll.rs
1use std::{path::PathBuf, sync::Arc};
2
3use tokio::fs as tokio_fs;
4use tracing::{debug, warn};
5use sentinel_wal::WalManager;
6
7use crate::{
8 constants::COLLECTION_METADATA_FILE,
9 metadata::CollectionMetadata,
10 validation::is_valid_document_id_chars,
11 Result,
12 SentinelError,
13};
14
15/// A collection represents a namespace for documents in the Sentinel database.
16///
17/// Collections are backed by filesystem directories, where each document is stored
18/// as a JSON file with metadata including version, timestamps, hash, and optional signature.
19/// The collection provides CRUD operations (Create, Read, Update, Delete) and advanced
20/// querying capabilities with streaming support for memory-efficient handling of large datasets.
21///
22/// # Structure
23///
24/// Each collection is stored in a directory with the following structure:
25/// - `{collection_name}/` - Root directory for the collection
26/// - `{collection_name}/{id}.json` - Individual document files with embedded metadata
27/// - `{collection_name}/.deleted/` - Soft-deleted documents (for recovery)
28/// - `{collection_name}/.metadata.json` - Collection metadata and indices (future)
29///
30/// # Streaming Operations
31///
32/// For memory efficiency with large datasets, operations like `filter()` and `query()`
33/// return async streams that process documents one-by-one rather than loading
34/// all documents into memory simultaneously.
35///
36/// # Example
37///
38/// ```rust
39/// use sentinel_dbms::{Store, Collection};
40/// use futures::TryStreamExt;
41/// use serde_json::json;
42///
43/// # async fn example() -> sentinel_dbms::Result<()> {
44/// // Create a store and get a collection
45/// let store = Store::new("/tmp/sentinel", None).await?;
46/// let collection = store.collection("users").await?;
47///
48/// // Insert a document
49/// let user_data = json!({
50/// "name": "Alice",
51/// "email": "alice@example.com",
52/// "age": 30
53/// });
54/// collection.insert("user-123", user_data).await?;
55///
56/// // Retrieve the document
57/// let doc = collection.get("user-123").await?;
58/// assert!(doc.is_some());
59/// assert_eq!(doc.unwrap().id(), "user-123");
60///
61/// // Stream all documents matching a predicate
62/// let adults = collection.filter(|doc| {
63/// doc.data().get("age")
64/// .and_then(|v| v.as_i64())
65/// .map_or(false, |age| age >= 18)
66/// });
67/// let adult_docs: Vec<_> = adults.try_collect().await?;
68/// assert_eq!(adult_docs.len(), 1);
69/// # Ok(())
70/// # }
71/// ```
72#[derive(Debug)]
73#[allow(
74 clippy::field_scoped_visibility_modifiers,
75 reason = "fields need to be pub(crate) for internal access"
76)]
77pub struct Collection {
78 /// The filesystem path to the collection directory.
79 pub(crate) path: PathBuf,
80 /// The signing key for the collection.
81 pub(crate) signing_key: Option<Arc<sentinel_crypto::SigningKey>>,
82 /// The Write-Ahead Log manager for durability.
83 pub(crate) wal_manager: Option<Arc<WalManager>>,
84 /// WAL configuration stored in metadata (without temporary overrides).
85 pub(crate) stored_wal_config: sentinel_wal::CollectionWalConfig,
86 /// Effective WAL configuration (stored + any temporary overrides).
87 pub(crate) wal_config: sentinel_wal::CollectionWalConfig,
88 /// When the collection was created.
89 pub(crate) created_at: chrono::DateTime<chrono::Utc>,
90 /// When the collection was last updated.
91 pub(crate) updated_at: std::sync::RwLock<chrono::DateTime<chrono::Utc>>,
92 /// When the collection was last checkpointed.
93 pub(crate) last_checkpoint_at: std::sync::RwLock<Option<chrono::DateTime<chrono::Utc>>>,
94 /// Total number of documents in the collection.
95 pub(crate) total_documents: std::sync::Arc<std::sync::atomic::AtomicU64>,
96 /// Total size of all documents in the collection in bytes.
97 pub(crate) total_size_bytes: std::sync::Arc<std::sync::atomic::AtomicU64>,
98 /// Event sender for notifying the store of metadata changes.
99 pub(crate) event_sender: Option<tokio::sync::mpsc::UnboundedSender<crate::events::StoreEvent>>,
100 /// Background task handle for processing internal events.
101 pub(crate) event_task: Option<tokio::task::JoinHandle<()>>,
102 /// Whether the collection is currently in recovery mode (skip WAL logging).
103 pub(crate) recovery_mode: std::sync::atomic::AtomicBool,
104}
105
106#[allow(
107 clippy::multiple_inherent_impl,
108 reason = "multiple impl blocks for Collection are intentional for organization"
109)]
110impl Collection {
111 /// Returns the name of the collection.
112 pub fn name(&self) -> &str { self.path.file_name().unwrap().to_str().unwrap() }
113
114 /// Returns the creation timestamp of the collection.
115 pub const fn created_at(&self) -> chrono::DateTime<chrono::Utc> { self.created_at }
116
117 /// Returns the last update timestamp of the collection.
118 pub fn updated_at(&self) -> chrono::DateTime<chrono::Utc> { *self.updated_at.read().unwrap() }
119
120 /// Returns the last checkpoint timestamp of the collection, if any.
121 pub fn last_checkpoint_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
122 *self.last_checkpoint_at.read().unwrap()
123 }
124
125 /// Returns the total number of documents in the collection.
126 pub fn total_documents(&self) -> u64 {
127 self.total_documents
128 .load(std::sync::atomic::Ordering::Relaxed)
129 }
130
131 /// Returns the total size of all documents in the collection in bytes.
132 pub fn total_size_bytes(&self) -> u64 {
133 self.total_size_bytes
134 .load(std::sync::atomic::Ordering::Relaxed)
135 }
136
137 /// Returns a reference to the stored WAL configuration for this collection.
138 ///
139 /// This is the WAL configuration as persisted in the collection metadata,
140 /// without any temporary overrides that may be applied at runtime.
141 pub const fn stored_wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.stored_wal_config }
142
143 /// Returns the effective WAL configuration for this collection.
144 ///
145 /// This includes the stored configuration plus any runtime overrides that
146 /// may have been applied when the collection was accessed.
147 pub const fn wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.wal_config }
148
149 /// Saves the current collection metadata to disk.
150 ///
151 /// This method persists the collection's current state (document count, size, timestamps,
152 /// and WAL configuration) to the `.metadata.json` file in the collection directory. This
153 /// ensures that metadata remains consistent across restarts and can be used for monitoring
154 /// and optimization.
155 ///
156 /// # Returns
157 ///
158 /// Returns `Ok(())` on success, or a `SentinelError` if the metadata cannot be saved.
159 pub async fn save_metadata(&self) -> Result<()> {
160 let metadata_path = self.path.join(COLLECTION_METADATA_FILE);
161
162 // Load existing metadata to preserve other fields
163 let mut metadata = if tokio_fs::try_exists(&metadata_path).await.unwrap_or(false) {
164 let content = tokio_fs::read_to_string(&metadata_path).await?;
165 serde_json::from_str(&content)?
166 }
167 else {
168 // Create new metadata if file doesn't exist
169 CollectionMetadata::new(self.name().to_owned())
170 };
171
172 // Update the runtime statistics
173 metadata.document_count = self.total_documents();
174 metadata.total_size_bytes = self.total_size_bytes();
175 metadata.updated_at = std::time::SystemTime::now()
176 .duration_since(std::time::UNIX_EPOCH)
177 .unwrap()
178 .as_secs();
179
180 // Update the WAL configuration
181 metadata.wal_config = Some(self.stored_wal_config.clone());
182
183 // Save back to disk
184 let content = serde_json::to_string_pretty(&metadata)?;
185 tokio_fs::write(&metadata_path, content).await?;
186
187 debug!("Collection metadata saved for {}", self.name());
188 Ok(())
189 }
190
191 /// Flushes any pending metadata changes to disk immediately.
192 ///
193 /// This method forces a synchronous save of the collection metadata to disk,
194 /// bypassing the normal debounced save mechanism. This is useful for tests
195 /// and for ensuring data durability when needed.
196 ///
197 /// # Returns
198 ///
199 /// Returns `Ok(())` on success, or a `SentinelError` if the metadata cannot be saved.
200 pub async fn flush_metadata(&self) -> Result<()> { self.save_metadata().await }
201
202 /// Validates a document ID according to filesystem-safe naming rules.
203 ///
204 /// Document IDs must be filesystem-safe and cannot contain reserved characters
205 /// or Windows reserved names. This prevents issues with file operations and
206 /// ensures cross-platform compatibility.
207 ///
208 /// # Arguments
209 ///
210 /// * `id` - The document ID to validate.
211 ///
212 /// # Returns
213 ///
214 /// Returns `Ok(())` if the ID is valid, or a `SentinelError::InvalidDocumentId`
215 /// if the ID contains invalid characters or is a reserved name.
216 ///
217 /// # Validation Rules
218 ///
219 /// - Must not be empty
220 /// - Must not contain path separators (`/` or `\`)
221 /// - Must not contain control characters (0x00-0x1F)
222 /// - Must not contain Windows reserved characters (`< > : " | ? *`)
223 /// - Must not be a Windows reserved name (CON, PRN, AUX, NUL, COM1-9, LPT1-9)
224 /// - Must not contain spaces or other filesystem-unsafe characters
225 ///
226 /// # Examples
227 ///
228 /// ```rust
229 /// use sentinel_dbms::Collection;
230 ///
231 /// // Valid IDs
232 /// assert!(Collection::validate_document_id("user-123").is_ok());
233 /// assert!(Collection::validate_document_id("my_document").is_ok());
234 ///
235 /// // Invalid IDs
236 /// assert!(Collection::validate_document_id("").is_err()); // empty
237 /// assert!(Collection::validate_document_id("path/file").is_err()); // path separator
238 /// assert!(Collection::validate_document_id("CON").is_err()); // reserved name
239 /// ```
240 pub fn validate_document_id(id: &str) -> Result<()> {
241 if id.is_empty() {
242 return Err(SentinelError::InvalidDocumentId {
243 id: id.to_owned(),
244 });
245 }
246
247 // Check for path separators
248 if id.contains('/') || id.contains('\\') {
249 return Err(SentinelError::InvalidDocumentId {
250 id: id.to_owned(),
251 });
252 }
253
254 // Check for control characters
255 if id.chars().any(|c| c.is_control()) {
256 return Err(SentinelError::InvalidDocumentId {
257 id: id.to_owned(),
258 });
259 }
260
261 // Check for Windows reserved characters
262 let reserved_chars = ['<', '>', ':', '"', '|', '?', '*'];
263 if id.chars().any(|c| reserved_chars.contains(&c)) {
264 return Err(SentinelError::InvalidDocumentId {
265 id: id.to_owned(),
266 });
267 }
268
269 // Check for Windows reserved names (case-insensitive)
270 let reserved_names = [
271 "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1",
272 "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
273 ];
274 let upper_id = id.to_uppercase();
275 for reserved in &reserved_names {
276 if upper_id == *reserved || upper_id.starts_with(&format!("{}.", reserved)) {
277 return Err(SentinelError::InvalidDocumentId {
278 id: id.to_owned(),
279 });
280 }
281 }
282
283 // Check for other filesystem-unsafe characters
284 if !is_valid_document_id_chars(id) {
285 return Err(SentinelError::InvalidDocumentId {
286 id: id.to_owned(),
287 });
288 }
289
290 Ok(())
291 }
292
293 /// Starts the background event processing task for the collection.
294 ///
295 /// This method spawns an async task that processes internal collection events
296 /// such as metadata updates and WAL operations. The task runs in the background
297 /// and handles events sent via the event channel.
298 ///
299 /// The event processor is responsible for:
300 /// - Processing document events (insert, update, delete)
301 /// - Debounced metadata persistence (every 500ms)
302 /// - Coordinating with the store's event system
303 ///
304 /// # Note
305 ///
306 /// This method should only be called once during collection initialization.
307 /// Multiple calls will replace the previous event task.
308 #[allow(
309 clippy::integer_division_remainder_used,
310 reason = "false positive in tokio select"
311 )]
312 pub fn start_event_processor(&mut self) {
313 let mut event_receiver = {
314 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
315 self.event_sender = Some(tx);
316 rx
317 };
318
319 // Clone necessary fields for the background task
320 let path = self.path.clone();
321 let total_documents = self.total_documents.clone();
322 let total_size_bytes = self.total_size_bytes.clone();
323 let updated_at = std::sync::Arc::new(std::sync::RwLock::new(*self.updated_at.read().unwrap()));
324
325 let task = tokio::spawn(async move {
326 // Debouncing: save metadata every 500 milliseconds instead of after every event
327 let mut save_interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
328 save_interval.tick().await; // First tick completes immediately
329
330 let mut changed = false;
331
332 loop {
333 tokio::select! {
334 // Process events
335 event = event_receiver.recv() => {
336 match event {
337 Some(crate::events::StoreEvent::CollectionCreated {
338 ..
339 }) => {
340 // Collection creation is handled by the store
341 },
342 Some(crate::events::StoreEvent::CollectionDeleted {
343 ..
344 }) => {
345 // Collection dropping is handled by the store
346 },
347 Some(crate::events::StoreEvent::DocumentInserted {
348 collection,
349 size_bytes,
350 }) => {
351 tracing::debug!("Processing document inserted event: {} (size: {})", collection, size_bytes);
352 // Update atomic counters asynchronously
353 total_documents.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
354 total_size_bytes.fetch_add(size_bytes, std::sync::atomic::Ordering::Relaxed);
355 changed = true;
356 },
357 Some(crate::events::StoreEvent::DocumentUpdated {
358 collection,
359 old_size_bytes,
360 new_size_bytes,
361 }) => {
362 tracing::debug!("Processing document updated event: {} (old: {}, new: {})",
363 collection, old_size_bytes, new_size_bytes);
364 // Update atomic counters asynchronously
365 total_size_bytes.fetch_sub(old_size_bytes, std::sync::atomic::Ordering::Relaxed);
366 total_size_bytes.fetch_add(new_size_bytes, std::sync::atomic::Ordering::Relaxed);
367 changed = true;
368 },
369 Some(crate::events::StoreEvent::DocumentDeleted {
370 collection,
371 size_bytes,
372 }) => {
373 tracing::debug!("Processing document deleted event: {} (size: {})", collection, size_bytes);
374 // Update atomic counters asynchronously
375 total_documents.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
376 total_size_bytes.fetch_sub(size_bytes, std::sync::atomic::Ordering::Relaxed);
377 changed = true;
378 },
379 None => {
380 // Channel closed, exit
381 break;
382 },
383 }
384 }
385
386 // Periodic metadata save
387 _ = save_interval.tick() => {
388 if changed {
389 // Update the updated_at timestamp (read lock and update)
390 let now = chrono::Utc::now();
391 *updated_at.write().unwrap() = now;
392
393 // Load current values from atomic counters
394 let document_count = total_documents.load(std::sync::atomic::Ordering::Relaxed);
395 let size_bytes = total_size_bytes.load(std::sync::atomic::Ordering::Relaxed);
396
397 // Load existing metadata to preserve other fields
398 let metadata_path = path.join(crate::constants::COLLECTION_METADATA_FILE);
399 let mut metadata = if tokio::fs::try_exists(&metadata_path).await.unwrap_or(false) {
400 match tokio::fs::read_to_string(&metadata_path).await {
401 Ok(content) => match serde_json::from_str(&content) {
402 Ok(m) => m,
403 Err(e) => {
404 tracing::error!("Failed to parse collection metadata: {}", e);
405 continue;
406 }
407 },
408 Err(e) => {
409 tracing::error!("Failed to read collection metadata: {}", e);
410 continue;
411 }
412 }
413 } else {
414 tracing::warn!("Collection metadata file not found, creating new");
415 crate::CollectionMetadata::new(path.file_name().unwrap().to_str().unwrap().to_owned())
416 };
417
418 // Update the runtime statistics
419 metadata.document_count = document_count;
420 metadata.total_size_bytes = size_bytes;
421 metadata.updated_at = std::time::SystemTime::now()
422 .duration_since(std::time::UNIX_EPOCH)
423 .unwrap()
424 .as_secs();
425
426 // Save back to disk
427 match serde_json::to_string_pretty(&metadata) {
428 Ok(content) => {
429 if let Err(e) = tokio::fs::write(&metadata_path, content).await {
430 tracing::error!("Failed to save collection metadata in background task: {}", e);
431 } else {
432 tracing::trace!("Collection metadata saved successfully for {:?}", path);
433 changed = false;
434 }
435 }
436 Err(e) => {
437 tracing::error!("Failed to serialize collection metadata: {}", e);
438 }
439 }
440 }
441 }
442 }
443 }
444 });
445
446 self.event_task = Some(task);
447 }
448
449 /// Emits an event to the store's event system.
450 ///
451 /// This is an internal method used to notify the store of collection-level
452 /// events such as document insertions, updates, and deletions. The events
453 /// are sent asynchronously and do not block the calling operation.
454 ///
455 /// # Arguments
456 /// Emits an event to the collection's event sender.
457 ///
458 /// * `event` - The event to emit to the store.
459 pub fn emit_event(&self, event: crate::events::StoreEvent) {
460 if let Some(sender) = self.event_sender.as_ref() &&
461 let Err(e) = sender.send(event)
462 {
463 warn!("Failed to emit collection event: {}", e);
464 }
465 }
466}