ngdb/
db.rs

1//! Core database implementation for NGDB
2//!
3//! This module provides a high-performance, thread-safe RocksDB wrapper with zero async overhead.
4//! All operations are synchronous and leverage RocksDB's internal thread-safety.
5
6use crate::{Error, Result, Storable, serialization::helpers, traits::KeyType};
7use parking_lot::{Mutex, RwLock};
8use rocksdb::{BoundColumnFamily, WriteBatch as RocksWriteBatch};
9use std::collections::HashMap;
10use std::marker::PhantomData;
11use std::path::Path;
12use std::sync::Arc;
13use tracing::{debug, error, info, instrument, warn};
14
15/// The main database handle
16///
17/// This is a thread-safe, cloneable handle to the underlying RocksDB instance.
18/// All operations are synchronous and fast - no async overhead.
19///
20/// # Thread Safety
21///
22/// The database uses RocksDB's multi-threaded column family mode, making it safe
23/// to use concurrently from multiple threads without additional synchronization.
24///
25/// # Examples
26///
27/// ```rust,no_run
28/// use ngdb::{Database, DatabaseConfig, Storable};
29/// use borsh::{BorshSerialize, BorshDeserialize};
30///
31/// #[derive(BorshSerialize, BorshDeserialize)]
32/// struct User {
33///     id: u64,
34///     name: String,
35/// }
36///
37/// impl Storable for User {
38///     type Key = u64;
39///     fn key(&self) -> Self::Key {
40///         self.id
41///     }
42/// }
43///
44/// fn main() -> Result<(), ngdb::Error> {
45///     let db = DatabaseConfig::new("./data")
46///         .create_if_missing(true)
47///         .open()?;
48///
49///     let users = db.collection::<User>("users")?;
50///     let user = User { id: 1, name: "Alice".to_string() };
51///     users.put(&user)?;
52///
53///     Ok(())
54/// }
55/// ```
56#[derive(Clone)]
57pub struct Database {
58    pub(crate) inner: Arc<DatabaseInner>,
59}
60
61pub(crate) struct DatabaseInner {
62    pub(crate) db: Arc<rocksdb::DB>,
63    // RwLock for shutdown: read locks allow operations, write lock for shutdown
64    shutdown: Arc<RwLock<bool>>,
65}
66
67impl Database {
68    /// Create a new database handle (internal use only)
69    pub(crate) fn new(db: rocksdb::DB) -> Self {
70        Self {
71            inner: Arc::new(DatabaseInner {
72                db: Arc::new(db),
73                shutdown: Arc::new(RwLock::new(false)),
74            }),
75        }
76    }
77
78    /// Get a typed collection for storing and retrieving values
79    ///
80    /// Collections are backed by RocksDB column families and provide type-safe
81    /// access to your data.
82    ///
83    /// # Arguments
84    ///
85    /// * `name` - The name of the column family
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the column family doesn't exist. Make sure to declare
90    /// all column families in `DatabaseConfig::add_column_family()` before opening.
91    ///
92    /// # Examples
93    ///
94    /// ```rust,no_run
95    /// # use ngdb::{Database, Storable};
96    /// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
97    /// # struct User { id: u64 }
98    /// # impl Storable for User {
99    /// #     type Key = u64;
100    /// #     fn key(&self) -> u64 { self.id }
101    /// # }
102    /// # fn example(db: Database) -> Result<(), ngdb::Error> {
103    /// let users = db.collection::<User>("users")?;
104    /// # Ok(())
105    /// # }
106    /// ```
107    #[instrument(skip(self))]
108    pub fn collection<T: Storable>(&self, name: &str) -> Result<Collection<T>> {
109        // Acquire read lock to prevent shutdown during this operation
110        // Keep the guard alive until we've created the collection to prevent TOCTOU
111        let shutdown_guard = self.inner.shutdown.read();
112
113        if *shutdown_guard {
114            return Err(Error::Database("Database has been shut down".to_string()));
115        }
116
117        // Verify column family exists while holding the lock
118        self.inner.db.cf_handle(name).ok_or_else(|| {
119            error!("Column family '{}' not found", name);
120            Error::Database(format!(
121                "Column family '{}' does not exist. Ensure it was declared in DatabaseConfig::add_column_family() before opening the database.",
122                name
123            ))
124        })?;
125
126        debug!("Created collection for column family '{}'", name);
127
128        // Collection creation is now safe - db is not shut down
129        // We can drop the guard now, Collection will check on each operation
130        drop(shutdown_guard);
131
132        Ok(Collection::new(
133            Arc::clone(&self.inner.db),
134            name,
135            Arc::clone(&self.inner.shutdown),
136        ))
137    }
138
139    /// List all column families in the database
140    ///
141    /// Returns a list of all collection names (column families) in the database.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the database has been shut down or if listing fails.
146    pub fn list_collections(&self) -> Result<Vec<String>> {
147        let _guard = self.check_shutdown()?;
148
149        rocksdb::DB::list_cf(&rocksdb::Options::default(), self.inner.db.path())
150            .map_err(|e| Error::Database(format!("Failed to list collections: {}", e)))
151    }
152
153    /// Flush all memtables to disk
154    ///
155    /// This forces all in-memory data to be written to SST files.
156    #[instrument(skip(self))]
157    pub fn flush(&self) -> Result<()> {
158        info!("Flushing database");
159        self.inner.db.flush().map_err(|e| {
160            error!("Flush failed: {}", e);
161            Error::Database(format!("Flush failed: {}", e))
162        })
163    }
164
165    /// Compact all data in the database
166    ///
167    /// This will trigger compaction across all column families.
168    #[instrument(skip(self))]
169    pub fn compact_all(&self) -> Result<()> {
170        info!("Compacting entire database");
171        self.inner.db.compact_range::<&[u8], &[u8]>(None, None);
172        Ok(())
173    }
174
175    /// Create a backup of the database
176    ///
177    /// # Arguments
178    ///
179    /// * `backup_path` - Directory where the backup will be stored
180    #[instrument(skip(self, backup_path))]
181    pub fn backup<P: AsRef<Path>>(&self, backup_path: P) -> Result<()> {
182        use rocksdb::backup::{BackupEngine, BackupEngineOptions};
183
184        // Check if database is shut down
185        let _guard = self.check_shutdown()?;
186
187        let path = backup_path.as_ref();
188        info!("Creating backup at {:?}", path);
189
190        let backup_opts = BackupEngineOptions::new(path).map_err(|e| {
191            error!("Failed to create backup options: {}", e);
192            Error::Database(format!("Failed to create backup options: {}", e))
193        })?;
194
195        let mut backup_engine =
196            BackupEngine::open(&backup_opts, &rocksdb::Env::new()?).map_err(|e| {
197                error!("Failed to open backup engine: {}", e);
198                Error::Database(format!("Failed to open backup engine: {}", e))
199            })?;
200
201        backup_engine
202            .create_new_backup(&self.inner.db)
203            .map_err(|e| {
204                error!("Failed to create backup: {}", e);
205                Error::Database(format!("Failed to create backup: {}", e))
206            })?;
207
208        info!("Backup created successfully");
209        Ok(())
210    }
211
212    #[inline]
213    fn check_shutdown(&self) -> Result<parking_lot::RwLockReadGuard<'_, bool>> {
214        let guard = self.inner.shutdown.read();
215
216        if *guard {
217            return Err(Error::Database("Database has been shut down".to_string()));
218        }
219
220        Ok(guard)
221    }
222
223    /// Restore database from a backup
224    ///
225    /// # Arguments
226    ///
227    /// * `backup_path` - Directory containing the backup
228    /// * `restore_path` - Directory where the database will be restored
229    pub fn restore_from_backup<P: AsRef<Path>>(backup_path: P, restore_path: P) -> Result<()> {
230        use rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions};
231
232        let backup_path = backup_path.as_ref();
233        let restore_path = restore_path.as_ref();
234
235        info!(
236            "Restoring from backup {:?} to {:?}",
237            backup_path, restore_path
238        );
239
240        let backup_opts = BackupEngineOptions::new(backup_path).map_err(|e| {
241            error!("Failed to create backup options: {}", e);
242            Error::Database(format!("Failed to create backup options: {}", e))
243        })?;
244
245        let mut backup_engine =
246            BackupEngine::open(&backup_opts, &rocksdb::Env::new()?).map_err(|e| {
247                error!("Failed to open backup engine: {}", e);
248                Error::Database(format!("Failed to open backup engine: {}", e))
249            })?;
250
251        let restore_opts = RestoreOptions::default();
252        backup_engine
253            .restore_from_latest_backup(restore_path, restore_path, &restore_opts)
254            .map_err(|e| {
255                error!("Failed to restore backup: {}", e);
256                Error::Database(format!("Failed to restore backup: {}", e))
257            })?;
258
259        info!("Backup restored successfully");
260        Ok(())
261    }
262
263    /// List all available backups
264    pub fn list_backups<P: AsRef<Path>>(backup_path: P) -> Result<Vec<BackupInfo>> {
265        use rocksdb::backup::{BackupEngine, BackupEngineOptions};
266
267        let path = backup_path.as_ref();
268        let backup_opts = BackupEngineOptions::new(path)
269            .map_err(|e| Error::Database(format!("Failed to create backup options: {}", e)))?;
270
271        let backup_engine = BackupEngine::open(&backup_opts, &rocksdb::Env::new()?)
272            .map_err(|e| Error::Database(format!("Failed to open backup engine: {}", e)))?;
273
274        let infos = backup_engine.get_backup_info();
275        Ok(infos
276            .iter()
277            .map(|info| BackupInfo {
278                backup_id: info.backup_id,
279                timestamp: info.timestamp,
280                size: info.size,
281            })
282            .collect())
283    }
284
285    /// Create a new transaction for atomic operations
286    ///
287    /// Transactions allow you to group multiple operations and commit them atomically.
288    /// All writes in a transaction are buffered in memory until commit.
289    ///
290    /// # Examples
291    ///
292    /// ```rust,no_run
293    /// # use ngdb::{Database, Storable};
294    /// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
295    /// # struct Account { id: u64, balance: i64 }
296    /// # impl Storable for Account {
297    /// #     type Key = u64;
298    /// #     fn key(&self) -> u64 { self.id }
299    /// # }
300    /// # fn example(db: Database) -> Result<(), ngdb::Error> {
301    /// let txn = db.transaction()?;
302    /// let accounts = txn.collection::<Account>("accounts")?;
303    ///
304    /// accounts.put(&Account { id: 1, balance: 100 })?;
305    /// accounts.put(&Account { id: 2, balance: 200 })?;
306    ///
307    /// txn.commit()?;
308    /// # Ok(())
309    /// # }
310    /// ```
311    #[instrument(skip(self))]
312    pub fn transaction(&self) -> Result<Transaction> {
313        // Acquire read lock to prevent shutdown during this operation
314        let shutdown = self.inner.shutdown.read();
315
316        if *shutdown {
317            return Err(Error::Database("Database has been shut down".to_string()));
318        }
319
320        Ok(Transaction::new(
321            Arc::clone(&self.inner.db),
322            Arc::clone(&self.inner.shutdown),
323        ))
324    }
325
326    /// Gracefully shut down the database
327    ///
328    /// Flushes all memtables, marks the database as shut down, and prevents new operations.
329    /// After calling this, all subsequent operations will fail.
330    ///
331    /// This acquires a write lock, which will block until all ongoing operations
332    /// (which hold read locks) complete. This eliminates the TOCTOU race condition.
333    ///
334    /// If flushing fails, the database will NOT be marked as shut down, allowing
335    /// operations to continue or shutdown to be retried.
336    #[instrument(skip(self))]
337    pub fn shutdown(&self) -> Result<()> {
338        info!("Shutting down database");
339
340        // Acquire write lock - blocks until all read locks (operations) complete
341        let mut shutdown_guard = self.inner.shutdown.write();
342
343        // Try to flush - keep result but don't early return
344        // This ensures the write lock is always released via RAII
345        let flush_result = self.flush();
346
347        // Only mark as shut down if flush succeeded
348        if flush_result.is_ok() {
349            *shutdown_guard = true;
350            info!("Database shutdown complete");
351        } else {
352            error!("Shutdown failed: flush error, database remains operational");
353        }
354
355        // Lock is dropped here automatically, regardless of flush result
356        flush_result
357    }
358}
359
360// SAFETY: Database can be safely sent between threads and shared across threads because:
361// 1. RocksDB guarantees thread-safety when opened with multi-threaded column family mode (the default)
362// 2. All internal state (Arc<rocksdb::DB>, Arc<AtomicBool>) is Send + Sync
363// 3. RocksDB documentation confirms that DB instances can be safely shared across threads
364//    See: https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-safety
365// 4. The Arc wrapper ensures the DB outlives all references
366unsafe impl Send for Database {}
367unsafe impl Sync for Database {}
368
369/// Information about a database backup
370#[derive(Debug, Clone)]
371pub struct BackupInfo {
372    /// Unique backup identifier
373    pub backup_id: u32,
374    /// Unix timestamp when backup was created
375    pub timestamp: i64,
376    /// Size of the backup in bytes
377    pub size: u64,
378}
379
380/// A typed collection for storing and retrieving values
381///
382/// Collections are backed by RocksDB column families and provide type-safe
383/// access to stored data. All operations are synchronous and thread-safe.
384#[derive(Debug)]
385pub struct Collection<T: Storable> {
386    db: Arc<rocksdb::DB>,
387    cf_name: String,
388    shutdown: Arc<RwLock<bool>>,
389    _phantom: PhantomData<T>,
390}
391
392impl<T: Storable> Collection<T> {
393    fn new(db: Arc<rocksdb::DB>, name: &str, shutdown: Arc<RwLock<bool>>) -> Self {
394        Self {
395            db,
396            cf_name: name.to_string(),
397            shutdown,
398            _phantom: PhantomData,
399        }
400    }
401
402    fn cf<'a>(&'a self) -> Result<Arc<BoundColumnFamily<'a>>> {
403        // Just fetch the handle - RocksDB makes this very fast (simple hashmap lookup)
404        self.db
405            .cf_handle(&self.cf_name)
406            .ok_or_else(|| Error::Database(format!("Column family '{}' not found", self.cf_name)))
407    }
408
409    #[inline]
410    fn check_shutdown(&self) -> Result<parking_lot::RwLockReadGuard<'_, bool>> {
411        // Acquire read lock to prevent shutdown during the operation
412        // This eliminates the TOCTOU race condition
413        let guard = self.shutdown.read();
414
415        if *guard {
416            return Err(Error::Database("Database has been shut down".to_string()));
417        }
418
419        Ok(guard)
420    }
421
422    /// Store a value in the collection
423    ///
424    /// The value will be validated, serialized, and written to disk.
425    ///
426    /// # Arguments
427    ///
428    /// * `value` - The value to store
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if validation fails, serialization fails, or the write fails
433    ///
434    /// # Examples
435    ///
436    /// ```rust,no_run
437    /// # use ngdb::{Collection, Storable};
438    /// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
439    /// # struct User { id: u64, name: String }
440    /// # impl Storable for User {
441    /// #     type Key = u64;
442    /// #     fn key(&self) -> u64 { self.id }
443    /// # }
444    /// # fn example(collection: Collection<User>) -> Result<(), ngdb::Error> {
445    /// let user = User { id: 1, name: "Alice".to_string() };
446    /// collection.put(&user)?;
447    /// # Ok(())
448    /// # }
449    /// ```
450    #[instrument(skip(self, value))]
451    pub fn put(&self, value: &T) -> Result<()> {
452        let _guard = self.check_shutdown()?;
453
454        // Validate first
455        value.validate()?;
456
457        let key = value.key();
458        let key_bytes = key.to_bytes()?;
459        let value_bytes = helpers::serialize(value)?;
460
461        debug!("Putting value in collection '{}'", self.cf_name);
462
463        let cf = self.cf()?;
464        self.db.put_cf(&cf, key_bytes, value_bytes).map_err(|e| {
465            error!("Failed to put value: {}", e);
466            Error::Database(format!("Failed to put value: {}", e))
467        })?;
468
469        value.on_stored();
470        Ok(())
471    }
472
473    /// Retrieve a value from the collection by key
474    ///
475    /// # Arguments
476    ///
477    /// * `key` - The key to look up
478    ///
479    /// # Returns
480    ///
481    /// * `Some(T)` if the key exists
482    /// * `None` if the key doesn't exist
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if deserialization fails or there's a database error
487    #[instrument(skip(self))]
488    pub fn get(&self, key: &T::Key) -> Result<Option<T>> {
489        let _guard = self.check_shutdown()?;
490
491        let key_bytes = key.to_bytes()?;
492        let cf = self.cf()?;
493
494        match self.db.get_cf(&cf, key_bytes)? {
495            Some(value_bytes) => {
496                let value: T = helpers::deserialize(&value_bytes)?;
497                Ok(Some(value))
498            }
499            None => Ok(None),
500        }
501    }
502
503    /// Retrieve a value from the collection by key and automatically resolve references
504    ///
505    /// If the type `T` implements `Referable`, this method will automatically fetch and populate
506    /// all referenced objects from their respective collections.
507    ///
508    /// # Arguments
509    ///
510    /// * `key` - The key to look up
511    /// * `db` - The database handle to use for resolving references
512    ///
513    /// # Returns
514    ///
515    /// * `Some(T)` if the key exists (with all references resolved)
516    /// * `None` if the key doesn't exist
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if deserialization fails, reference resolution fails, or there's a database error
521    ///
522    /// # Examples
523    ///
524    /// ```rust,no_run
525    /// # use ngdb::{Collection, Database, Storable, Referable, Ref};
526    /// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
527    /// # struct Post { id: u64, title: String }
528    /// # impl Storable for Post {
529    /// #     type Key = u64;
530    /// #     fn key(&self) -> u64 { self.id }
531    /// # }
532    /// # impl Referable for Post {
533    /// #     fn resolve_all(&self, _db: &ngdb::Database) -> ngdb::Result<()> { Ok(()) }
534    /// # }
535    /// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
536    /// # struct Comment { id: u64, text: String, post: Ref<Post> }
537    /// # impl Storable for Comment {
538    /// #     type Key = u64;
539    /// #     fn key(&self) -> u64 { self.id }
540    /// # }
541    /// # impl Referable for Comment {
542    /// #     fn resolve_all(&self, db: &ngdb::Database) -> ngdb::Result<()> {
543    /// #         self.post.resolve(db, "posts")?;
544    /// #         Ok(())
545    /// #     }
546    /// # }
547    /// # fn example(collection: Collection<Comment>, db: Database) -> Result<(), ngdb::Error> {
548    /// let comment = collection.get_with_refs(&1, &db)?;
549    /// # Ok(())
550    /// # }
551    /// ```
552    #[instrument(skip(self, db))]
553    pub fn get_with_refs(&self, key: &T::Key, db: &crate::Database) -> Result<Option<T>>
554    where
555        T: crate::Referable,
556    {
557        let _guard = self.check_shutdown()?;
558
559        let key_bytes = key.to_bytes()?;
560        let cf = self.cf()?;
561
562        match self.db.get_cf(&cf, key_bytes)? {
563            Some(value_bytes) => {
564                let value: T = helpers::deserialize(&value_bytes)?;
565                value.resolve_all(db)?;
566                Ok(Some(value))
567            }
568            None => Ok(None),
569        }
570    }
571
572    /// Retrieve multiple values at once using optimized multi_get
573    ///
574    /// This is significantly faster than calling `get()` multiple times
575    /// as it performs a single batched operation.
576    ///
577    /// # Arguments
578    ///
579    /// * `keys` - Slice of keys to retrieve
580    ///
581    /// # Returns
582    ///
583    /// A vector of optional values in the same order as the input keys
584    #[instrument(skip(self, keys))]
585    pub fn get_many(&self, keys: &[T::Key]) -> Result<Vec<Option<T>>> {
586        let _guard = self.check_shutdown()?;
587
588        if keys.is_empty() {
589            return Ok(Vec::new());
590        }
591
592        // Convert all keys to bytes
593        let key_bytes: Result<Vec<Vec<u8>>> = keys.iter().map(|k| k.to_bytes()).collect();
594        let key_bytes = key_bytes?;
595
596        // Prepare column family references
597        let cf = self.cf()?;
598        let cf_refs: Vec<_> = key_bytes.iter().map(|k| (&cf, k.as_slice())).collect();
599
600        // Perform multi_get
601        let results = self.db.multi_get_cf(cf_refs);
602
603        // Process results
604        let mut output = Vec::with_capacity(keys.len());
605        for result in results {
606            match result {
607                Ok(Some(value_bytes)) => {
608                    let value: T = helpers::deserialize(&value_bytes)?;
609                    output.push(Some(value));
610                }
611                Ok(None) => output.push(None),
612                Err(e) => {
613                    return Err(Error::Database(format!("Multi-get failed: {}", e)));
614                }
615            }
616        }
617
618        Ok(output)
619    }
620
621    /// Retrieve multiple values at once with automatic reference resolution
622    ///
623    /// This is significantly faster than calling `get_with_refs()` multiple times
624    /// as it performs a single batched operation for the base objects, then resolves
625    /// all references.
626    ///
627    /// # Arguments
628    ///
629    /// * `keys` - Slice of keys to retrieve
630    /// * `db` - The database handle to use for resolving references
631    ///
632    /// # Returns
633    ///
634    /// A vector of optional values in the same order as the input keys (with all references resolved)
635    #[instrument(skip(self, keys, db))]
636    pub fn get_many_with_refs(
637        &self,
638        keys: &[T::Key],
639        db: &crate::Database,
640    ) -> Result<Vec<Option<T>>>
641    where
642        T: crate::Referable,
643    {
644        let _guard = self.check_shutdown()?;
645
646        if keys.is_empty() {
647            return Ok(Vec::new());
648        }
649
650        // Convert all keys to bytes
651        let key_bytes: Result<Vec<Vec<u8>>> = keys.iter().map(|k| k.to_bytes()).collect();
652        let key_bytes = key_bytes?;
653
654        // Prepare column family references
655        let cf = self.cf()?;
656        let cf_refs: Vec<_> = key_bytes.iter().map(|k| (&cf, k.as_slice())).collect();
657
658        // Perform multi_get
659        let results = self.db.multi_get_cf(cf_refs);
660
661        // Process results and resolve references
662        let mut output = Vec::with_capacity(keys.len());
663        for result in results {
664            match result {
665                Ok(Some(value_bytes)) => {
666                    let value: T = helpers::deserialize(&value_bytes)?;
667                    value.resolve_all(db)?;
668                    output.push(Some(value));
669                }
670                Ok(None) => output.push(None),
671                Err(e) => {
672                    return Err(Error::Database(format!("Multi-get failed: {}", e)));
673                }
674            }
675        }
676
677        Ok(output)
678    }
679
680    /// Delete a value from the collection by key
681    ///
682    /// # Arguments
683    ///
684    /// * `key` - The key to delete
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if the delete operation fails
689    #[instrument(skip(self))]
690    pub fn delete(&self, key: &T::Key) -> Result<()> {
691        let _guard = self.check_shutdown()?;
692
693        let key_bytes = key.to_bytes()?;
694
695        debug!("Deleting key from collection '{}'", self.cf_name);
696
697        let cf = self.cf()?;
698        self.db.delete_cf(&cf, key_bytes).map_err(|e| {
699            error!("Failed to delete: {}", e);
700            Error::Database(format!("Failed to delete: {}", e))
701        })
702    }
703
704    /// Check if a key exists in the collection
705    #[instrument(skip(self))]
706    pub fn exists(&self, key: &T::Key) -> Result<bool> {
707        let _guard = self.check_shutdown()?;
708        Ok(self.get(key)?.is_some())
709    }
710
711    /// Create a batch for multiple write operations
712    ///
713    /// Batches allow you to group multiple writes together for better performance.
714    /// All operations in a batch are applied atomically.
715    pub fn batch(&self) -> Batch<T> {
716        Batch::new(Arc::clone(&self.db), self.cf_name.clone())
717    }
718
719    /// Create a snapshot for consistent reads
720    ///
721    /// Snapshots provide a consistent view of the database at a point in time.
722    pub fn snapshot(&self) -> Snapshot<T> {
723        Snapshot::new(Arc::clone(&self.db), self.cf_name.clone())
724    }
725
726    /// Create an iterator over all items in the collection
727    ///
728    /// The iterator holds an Arc to the database, keeping it alive for the duration
729    /// of the iteration. Dropping the iterator won't invalidate the database.
730    pub fn iter(&self) -> Result<Iterator<T>> {
731        let _guard = self.check_shutdown()?;
732        Ok(Iterator::new(
733            Arc::clone(&self.db),
734            self.cf_name.clone(),
735            IteratorMode::Start,
736            Arc::clone(&self.shutdown),
737        ))
738    }
739
740    /// Create an iterator starting from a specific key
741    pub fn iter_from(&self, key: &T::Key) -> Result<Iterator<T>> {
742        let _guard = self.check_shutdown()?;
743        let key_bytes = key.to_bytes()?;
744        Ok(Iterator::new(
745            Arc::clone(&self.db),
746            self.cf_name.clone(),
747            IteratorMode::From(key_bytes),
748            Arc::clone(&self.shutdown),
749        ))
750    }
751
752    /// Estimate the number of keys in the collection
753    ///
754    /// This uses RocksDB's internal statistics and may not be exact.
755    pub fn estimate_num_keys(&self) -> Result<u64> {
756        let cf = self.cf()?;
757        self.db
758            .property_int_value_cf(&cf, "rocksdb.estimate-num-keys")
759            .map(|v| v.unwrap_or(0))
760            .map_err(|e| Error::Database(format!("Failed to get estimate: {}", e)))
761    }
762
763    /// Flush this collection's memtable to disk
764    #[instrument(skip(self))]
765    pub fn flush(&self) -> Result<()> {
766        info!("Flushing collection '{}'", self.cf_name);
767        let cf = self.cf()?;
768        self.db.flush_cf(&cf).map_err(|e| {
769            error!("Flush failed: {}", e);
770            Error::Database(format!("Flush failed: {}", e))
771        })
772    }
773
774    /// Compact a range of keys in this collection
775    #[instrument(skip(self, start, end))]
776    pub fn compact_range(&self, start: Option<&T::Key>, end: Option<&T::Key>) -> Result<()> {
777        let start_bytes = start.map(|k| k.to_bytes()).transpose()?;
778        let end_bytes = end.map(|k| k.to_bytes()).transpose()?;
779
780        info!("Compacting range in collection '{}'", self.cf_name);
781
782        let cf = self.cf()?;
783        self.db
784            .compact_range_cf(&cf, start_bytes.as_deref(), end_bytes.as_deref());
785        Ok(())
786    }
787
788    /// Get the name of this collection
789    pub fn name(&self) -> &str {
790        &self.cf_name
791    }
792}
793
794// SAFETY: Collection can be safely sent between threads and shared across threads because:
795// 1. All internal state is Send + Sync (Arc<DB>, String, Arc<AtomicBool>)
796// 2. T: Storable which requires T: Send + Sync (see traits.rs)
797// 3. RocksDB column family operations are thread-safe
798unsafe impl<T: Storable> Send for Collection<T> {}
799unsafe impl<T: Storable> Sync for Collection<T> {}
800
801/// A batch of write operations
802///
803/// Batches allow multiple writes to be applied atomically and efficiently.
804pub struct Batch<T: Storable> {
805    db: Arc<rocksdb::DB>,
806    cf_name: String,
807    batch: RocksWriteBatch,
808    _phantom: PhantomData<T>,
809}
810
811impl<T: Storable> Batch<T> {
812    fn new(db: Arc<rocksdb::DB>, cf_name: String) -> Self {
813        Self {
814            db,
815            cf_name,
816            batch: RocksWriteBatch::default(),
817            _phantom: PhantomData,
818        }
819    }
820
821    /// Add a put operation to the batch
822    pub fn put(&mut self, value: &T) -> Result<()> {
823        value.validate()?;
824
825        let key = value.key();
826        let key_bytes = key.to_bytes()?;
827        let value_bytes = helpers::serialize(value)?;
828
829        let cf = self.db.cf_handle(&self.cf_name).ok_or_else(|| {
830            Error::Database(format!("Column family '{}' not found", self.cf_name))
831        })?;
832        self.batch.put_cf(&cf, &key_bytes, &value_bytes);
833        Ok(())
834    }
835
836    /// Add a delete operation to the batch
837    pub fn delete(&mut self, key: &T::Key) -> Result<()> {
838        let key_bytes = key.to_bytes()?;
839
840        let cf = self.db.cf_handle(&self.cf_name).ok_or_else(|| {
841            Error::Database(format!("Column family '{}' not found", self.cf_name))
842        })?;
843        self.batch.delete_cf(&cf, &key_bytes);
844        Ok(())
845    }
846
847    /// Clear all operations from the batch
848    pub fn clear(&mut self) {
849        self.batch.clear();
850    }
851
852    /// Get the number of operations in the batch
853    pub fn len(&self) -> usize {
854        self.batch.len()
855    }
856
857    /// Check if the batch is empty
858    pub fn is_empty(&self) -> bool {
859        self.batch.is_empty()
860    }
861
862    /// Commit all operations in the batch atomically
863    #[instrument(skip(self))]
864    pub fn commit(self) -> Result<()> {
865        let op_count = self.batch.len();
866        debug!(
867            "Committing batch with {} operations to '{}'",
868            op_count, self.cf_name
869        );
870
871        self.db.write(self.batch).map_err(|e| {
872            error!("Batch commit failed: {}", e);
873            Error::Database(format!("Batch commit failed: {}", e))
874        })
875    }
876}
877
878/// A consistent snapshot of the database
879///
880/// Snapshots provide a point-in-time view of the data.
881///
882/// # Implementation Note
883///
884/// We store the raw snapshot pointer and manually manage the snapshot lifetime
885/// to avoid unsound lifetime transmutation. The snapshot is valid as long as
886/// the database exists, which we guarantee by holding an Arc<DB>.
887pub struct Snapshot<T: Storable> {
888    db: Arc<rocksdb::DB>,
889    // Store raw pointer to snapshot - we manage its lifetime manually
890    snapshot_ptr: *const rocksdb::SnapshotWithThreadMode<'static, rocksdb::DB>,
891    cf_name: String,
892    _phantom: PhantomData<T>,
893}
894
895impl<T: Storable> Snapshot<T> {
896    fn new(db: Arc<rocksdb::DB>, cf_name: String) -> Self {
897        // Create a snapshot - it's owned by the DB and lives as long as the DB
898        // SAFETY: We use transmute to extend the snapshot's lifetime to 'static so we can
899        // store it without borrowing from db. This is safe because:
900        // 1. We store Arc<DB> which keeps the database alive
901        // 2. We properly clean up the snapshot in Drop
902        // 3. The snapshot is only accessed while self (and thus Arc<DB>) is alive
903        let snapshot_ptr = unsafe {
904            let snapshot = db.snapshot();
905            // Transmute to break the lifetime connection - we'll manage lifetime manually
906            let static_snapshot: rocksdb::SnapshotWithThreadMode<'static, rocksdb::DB> =
907                std::mem::transmute(snapshot);
908            let boxed = Box::new(static_snapshot);
909            Box::into_raw(boxed) as *const _
910        };
911
912        Self {
913            db,
914            snapshot_ptr,
915            cf_name,
916            _phantom: PhantomData,
917        }
918    }
919
920    fn snapshot(&self) -> &rocksdb::SnapshotWithThreadMode<'_, rocksdb::DB> {
921        // SAFETY:
922        // 1. snapshot_ptr is valid because db is alive (we hold Arc<DB>)
923        // 2. The returned reference is tied to &self lifetime, not 'static
924        // 3. The snapshot was allocated and will be deallocated by us
925        unsafe { &*self.snapshot_ptr }
926    }
927
928    fn cf<'a>(&'a self) -> Result<Arc<BoundColumnFamily<'a>>> {
929        self.db
930            .cf_handle(&self.cf_name)
931            .ok_or_else(|| Error::Database(format!("Column family '{}' not found", self.cf_name)))
932    }
933
934    /// Get a value from the snapshot
935    ///
936    /// Reads from the consistent point-in-time snapshot held by this struct.
937    pub fn get(&self, key: &T::Key) -> Result<Option<T>> {
938        let key_bytes = key.to_bytes()?;
939        let cf = self.cf()?;
940
941        match self.snapshot().get_cf(&cf, key_bytes)? {
942            Some(value_bytes) => {
943                let value: T = helpers::deserialize(&value_bytes)?;
944                Ok(Some(value))
945            }
946            None => Ok(None),
947        }
948    }
949
950    /// Check if a key exists in the snapshot
951    pub fn exists(&self, key: &T::Key) -> Result<bool> {
952        Ok(self.get(key)?.is_some())
953    }
954}
955
956impl<T: Storable> Drop for Snapshot<T> {
957    fn drop(&mut self) {
958        // SAFETY: We created this snapshot with Box::leak, so we need to clean it up
959        // The pointer is valid because db is still alive (Arc is dropped after this)
960        unsafe {
961            let _ = Box::from_raw(
962                self.snapshot_ptr as *mut rocksdb::SnapshotWithThreadMode<'static, rocksdb::DB>,
963            );
964        }
965    }
966}
967
968/// Iterator mode
969enum IteratorMode {
970    Start,
971    From(Vec<u8>),
972}
973
974/// Result of an iteration operation
975#[derive(Debug, Clone, Copy, PartialEq, Eq)]
976pub enum IterationStatus {
977    /// Iteration completed successfully - all items were processed
978    Completed,
979    /// Iteration stopped early because the callback returned false
980    StoppedEarly,
981}
982
983/// Iterator over collection items
984///
985/// Provides methods to iterate through all items in a collection.
986pub struct Iterator<T: Storable> {
987    db: Arc<rocksdb::DB>,
988    cf_name: String,
989    mode: IteratorMode,
990    shutdown: Arc<RwLock<bool>>,
991    _phantom: PhantomData<T>,
992}
993
994impl<T: Storable> Iterator<T> {
995    fn new(
996        db: Arc<rocksdb::DB>,
997        cf_name: String,
998        mode: IteratorMode,
999        shutdown: Arc<RwLock<bool>>,
1000    ) -> Self {
1001        Self {
1002            db,
1003            cf_name,
1004            mode,
1005            shutdown,
1006            _phantom: PhantomData,
1007        }
1008    }
1009
1010    fn cf<'a>(&'a self) -> Result<Arc<BoundColumnFamily<'a>>> {
1011        self.db
1012            .cf_handle(&self.cf_name)
1013            .ok_or_else(|| Error::Database(format!("Column family '{}' not found", self.cf_name)))
1014    }
1015
1016    #[inline]
1017    fn check_shutdown(&self) -> Result<parking_lot::RwLockReadGuard<'_, bool>> {
1018        let guard = self.shutdown.read();
1019
1020        if *guard {
1021            return Err(Error::Database("Database has been shut down".to_string()));
1022        }
1023
1024        Ok(guard)
1025    }
1026
1027    /// Collect all items into a vector
1028    ///
1029    /// # Warning
1030    ///
1031    /// This will load all items into memory. Use with caution on large collections.
1032    pub fn collect_all(&self) -> Result<Vec<T>> {
1033        let _guard = self.check_shutdown()?;
1034
1035        let mut results = Vec::new();
1036        let cf = self.cf()?;
1037        let iter = match &self.mode {
1038            IteratorMode::Start => self.db.iterator_cf(&cf, rocksdb::IteratorMode::Start),
1039            IteratorMode::From(key) => self.db.iterator_cf(
1040                &cf,
1041                rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
1042            ),
1043        };
1044
1045        for item in iter {
1046            let (_key, value_bytes) =
1047                item.map_err(|e| Error::IteratorError(format!("Iterator error: {}", e)))?;
1048
1049            let value: T = helpers::deserialize(&value_bytes)?;
1050            results.push(value);
1051        }
1052
1053        Ok(results)
1054    }
1055
1056    /// Iterate and apply a function to each item
1057    ///
1058    /// This is more memory-efficient than `collect_all()` for large datasets.
1059    ///
1060    /// # Arguments
1061    ///
1062    /// * `f` - Function to apply to each item. Return `false` to stop iteration early.
1063    ///
1064    /// # Returns
1065    ///
1066    /// Returns `Ok(IterationStatus)` where the status indicates whether iteration
1067    /// completed or was stopped early by the callback.
1068    pub fn for_each<F>(&self, mut f: F) -> Result<IterationStatus>
1069    where
1070        F: FnMut(T) -> bool,
1071    {
1072        let _guard = self.check_shutdown()?;
1073
1074        let cf = self.cf()?;
1075        let iter = match &self.mode {
1076            IteratorMode::Start => self.db.iterator_cf(&cf, rocksdb::IteratorMode::Start),
1077            IteratorMode::From(key) => self.db.iterator_cf(
1078                &cf,
1079                rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
1080            ),
1081        };
1082
1083        for item in iter {
1084            let (_key, value_bytes) =
1085                item.map_err(|e| Error::IteratorError(format!("Iterator error: {}", e)))?;
1086
1087            let value: T = helpers::deserialize(&value_bytes)?;
1088
1089            if !f(value) {
1090                return Ok(IterationStatus::StoppedEarly);
1091            }
1092        }
1093
1094        Ok(IterationStatus::Completed)
1095    }
1096
1097    /// Count the total number of items
1098    pub fn count(&self) -> Result<usize> {
1099        let _guard = self.check_shutdown()?;
1100
1101        let cf = self.cf()?;
1102        let iter = match &self.mode {
1103            IteratorMode::Start => self.db.iterator_cf(&cf, rocksdb::IteratorMode::Start),
1104            IteratorMode::From(key) => self.db.iterator_cf(
1105                &cf,
1106                rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
1107            ),
1108        };
1109
1110        let mut count = 0;
1111        for item in iter {
1112            item.map_err(|e| Error::IteratorError(format!("Iterator error: {}", e)))?;
1113            count += 1;
1114        }
1115
1116        Ok(count)
1117    }
1118}
1119
1120/// A database transaction for atomic operations
1121///
1122/// Transactions buffer all writes in memory and apply them atomically on commit.
1123/// This implementation is thread-safe and can be shared across threads.
1124///
1125/// # Memory Limits
1126///
1127/// To prevent unbounded memory growth, transactions enforce the following limits:
1128/// - Maximum 100,000 operations
1129/// - Maximum 100MB total cached data
1130///
1131/// These limits help prevent out-of-memory errors while still allowing substantial
1132/// transactions. If you need larger batch operations, consider using `Batch` instead
1133/// or splitting your transaction into smaller chunks.
1134///
1135/// # Cloning
1136///
1137/// Transaction intentionally does NOT implement Clone. Cloning a transaction would be
1138/// confusing and error-prone because:
1139/// - It's unclear whether clones share state or have independent caches
1140/// - Multiple clones committing could lead to unexpected behavior
1141/// - The transaction cache is meant to provide isolation for a single logical transaction
1142///
1143/// If you need to share a transaction across threads, use `Arc<Transaction>` instead.
1144///
1145/// # Examples
1146///
1147/// ```rust,no_run
1148/// # use ngdb::{Database, Storable};
1149/// # #[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
1150/// # struct Account { id: u64, balance: i64 }
1151/// # impl Storable for Account {
1152/// #     type Key = u64;
1153/// #     fn key(&self) -> u64 { self.id }
1154/// # }
1155/// # fn example(db: Database) -> Result<(), ngdb::Error> {
1156/// let txn = db.transaction()?;
1157/// let accounts = txn.collection::<Account>("accounts")?;
1158///
1159/// accounts.put(&Account { id: 1, balance: 100 })?;
1160/// accounts.put(&Account { id: 2, balance: 200 })?;
1161///
1162/// txn.commit()?;
1163/// # Ok(())
1164/// # }
1165/// ```
1166pub struct Transaction {
1167    db: Arc<rocksdb::DB>,
1168    batch: Mutex<RocksWriteBatch>,
1169    // Cache for read isolation: (cf_name, key_bytes) -> Option<value_bytes>
1170    // None means deleted in this transaction
1171    cache: Mutex<TransactionCache>,
1172    shutdown: Arc<RwLock<bool>>,
1173}
1174
1175struct TransactionCache {
1176    data: HashMap<(String, Vec<u8>), Option<Vec<u8>>>,
1177    operation_count: usize,
1178    total_bytes: usize,
1179}
1180
1181impl TransactionCache {
1182    fn new() -> Self {
1183        Self {
1184            data: HashMap::new(),
1185            operation_count: 0,
1186            total_bytes: 0,
1187        }
1188    }
1189
1190    fn insert(&mut self, key: (String, Vec<u8>), value: Option<Vec<u8>>) -> Result<()> {
1191        const MAX_OPERATIONS: usize = 100_000;
1192        const MAX_BYTES: usize = 100 * 1024 * 1024; // 100MB
1193        const HASHMAP_OVERHEAD: usize = 32; // Approximate per-entry overhead on 64-bit systems
1194
1195        // Check operation limit
1196        if self.operation_count >= MAX_OPERATIONS {
1197            return Err(Error::Database(format!(
1198                "Transaction limit exceeded: maximum {} operations allowed",
1199                MAX_OPERATIONS
1200            )));
1201        }
1202
1203        // Calculate size of new entry (including HashMap overhead)
1204        let entry_size = key.0.len()
1205            + key.1.len()
1206            + value.as_ref().map(|v| v.len()).unwrap_or(0)
1207            + HASHMAP_OVERHEAD;
1208
1209        // If replacing an existing entry, subtract its old size first
1210        let size_delta = if let Some(old_value) = self.data.get(&key) {
1211            let old_size = key.0.len()
1212                + key.1.len()
1213                + old_value.as_ref().map(|v| v.len()).unwrap_or(0)
1214                + HASHMAP_OVERHEAD;
1215            entry_size as i64 - old_size as i64
1216        } else {
1217            entry_size as i64
1218        };
1219
1220        // Check memory limit
1221        let new_total = (self.total_bytes as i64 + size_delta) as usize;
1222        if new_total > MAX_BYTES {
1223            return Err(Error::Database(format!(
1224                "Transaction memory limit exceeded: maximum {}MB allowed",
1225                MAX_BYTES / (1024 * 1024)
1226            )));
1227        }
1228
1229        // Update counters
1230        let is_new_entry = !self.data.contains_key(&key);
1231        if is_new_entry {
1232            self.operation_count += 1;
1233        }
1234        self.total_bytes = new_total;
1235
1236        self.data.insert(key, value);
1237        Ok(())
1238    }
1239
1240    fn get(&self, key: &(String, Vec<u8>)) -> Option<&Option<Vec<u8>>> {
1241        self.data.get(key)
1242    }
1243
1244    fn clear(&mut self) {
1245        self.data.clear();
1246        self.operation_count = 0;
1247        self.total_bytes = 0;
1248    }
1249}
1250
1251impl Transaction {
1252    fn new(db: Arc<rocksdb::DB>, shutdown: Arc<RwLock<bool>>) -> Self {
1253        Self {
1254            db,
1255            batch: Mutex::new(RocksWriteBatch::default()),
1256            cache: Mutex::new(TransactionCache::new()),
1257            shutdown,
1258        }
1259    }
1260
1261    #[inline]
1262    fn check_shutdown(&self) -> Result<parking_lot::RwLockReadGuard<'_, bool>> {
1263        let guard = self.shutdown.read();
1264
1265        if *guard {
1266            return Err(Error::Database("Database has been shut down".to_string()));
1267        }
1268
1269        Ok(guard)
1270    }
1271
1272    /// Get a typed collection within this transaction
1273    #[instrument(skip(self))]
1274    pub fn collection<'txn, T: Storable>(
1275        &'txn self,
1276        name: &str,
1277    ) -> Result<TransactionCollection<'txn, T>> {
1278        let _guard = self.check_shutdown()?;
1279
1280        // Verify column family exists
1281        self.db.cf_handle(name).ok_or_else(|| {
1282            error!("Column family '{}' not found", name);
1283            Error::Database(format!("Column family '{}' not found", name))
1284        })?;
1285
1286        debug!("Created transaction collection for '{}'", name);
1287        Ok(TransactionCollection::new(
1288            Arc::clone(&self.db),
1289            name.to_string(),
1290            &self.batch,
1291            &self.cache,
1292        ))
1293    }
1294
1295    /// Commit the transaction
1296    ///
1297    /// All operations are applied atomically. If this fails, all changes are rolled back.
1298    #[instrument(skip(self))]
1299    pub fn commit(self) -> Result<()> {
1300        let guard = self.check_shutdown()?;
1301        drop(guard); // Drop guard before moving self's fields
1302
1303        let db = self.db;
1304        let batch = self.batch.into_inner();
1305        let op_count = batch.len();
1306
1307        info!("Committing transaction with {} operations", op_count);
1308
1309        db.write(batch).map_err(|e| {
1310            error!("Failed to commit transaction: {}", e);
1311            Error::Database(format!("Failed to commit transaction: {}", e))
1312        })
1313    }
1314
1315    /// Rollback the transaction
1316    ///
1317    /// All operations are discarded. This is done automatically by dropping the transaction.
1318    #[instrument(skip(self))]
1319    pub fn rollback(self) -> Result<()> {
1320        let op_count = self.batch.lock().len();
1321        warn!("Rolling back transaction with {} operations", op_count);
1322        Ok(())
1323    }
1324
1325    /// Clear all operations from the transaction
1326    pub fn clear(&self) -> Result<()> {
1327        self.batch.lock().clear();
1328        self.cache.lock().clear();
1329        Ok(())
1330    }
1331
1332    /// Get the number of operations in the transaction
1333    pub fn len(&self) -> Result<usize> {
1334        Ok(self.batch.lock().len())
1335    }
1336
1337    /// Check if the transaction is empty
1338    pub fn is_empty(&self) -> Result<bool> {
1339        Ok(self.batch.lock().is_empty())
1340    }
1341}
1342
1343// SAFETY: Transaction can be safely sent between threads and shared across threads because:
1344// 1. All internal state is Send + Sync (Arc<DB>, Mutex<WriteBatch>, Mutex<HashMap>, Arc<AtomicBool>)
1345// 2. Mutex provides interior mutability with proper synchronization
1346// 3. The transaction cache is protected by Mutex, preventing data races
1347// 4. RocksDB WriteBatch is thread-safe when properly synchronized (which we do with Mutex)
1348unsafe impl Send for Transaction {}
1349unsafe impl Sync for Transaction {}
1350
1351/// A typed collection view within a transaction
1352///
1353/// Provides read isolation - reads will see uncommitted writes made
1354/// within the same transaction.
1355pub struct TransactionCollection<'txn, T: Storable> {
1356    db: Arc<rocksdb::DB>,
1357    cf_name: String,
1358    batch: &'txn Mutex<RocksWriteBatch>,
1359    cache: &'txn Mutex<TransactionCache>,
1360    _phantom: PhantomData<T>,
1361}
1362
1363impl<'txn, T: Storable> TransactionCollection<'txn, T> {
1364    fn new(
1365        db: Arc<rocksdb::DB>,
1366        cf_name: String,
1367        batch: &'txn Mutex<RocksWriteBatch>,
1368        cache: &'txn Mutex<TransactionCache>,
1369    ) -> Self {
1370        Self {
1371            db,
1372            cf_name,
1373            batch,
1374            cache,
1375            _phantom: PhantomData,
1376        }
1377    }
1378
1379    fn cf<'a>(&'a self) -> Result<Arc<BoundColumnFamily<'a>>> {
1380        self.db
1381            .cf_handle(&self.cf_name)
1382            .ok_or_else(|| Error::Database(format!("Column family '{}' not found", self.cf_name)))
1383    }
1384
1385    /// Store a value in the transaction
1386    ///
1387    /// The value is cached locally and will be visible to subsequent reads within
1388    /// the same transaction.
1389    #[instrument(skip(self, value))]
1390    pub fn put(&self, value: &T) -> Result<()> {
1391        // Validate first
1392        value.validate()?;
1393
1394        let key = value.key();
1395        let key_bytes = key.to_bytes()?;
1396        let value_bytes = helpers::serialize(value)?;
1397
1398        debug!("Transaction put in collection '{}'", self.cf_name);
1399
1400        // Acquire locks in consistent order: batch first, then cache (prevents deadlock)
1401        let mut batch = self.batch.lock();
1402        let mut cache = self.cache.lock();
1403
1404        // Add to batch for commit
1405        let cf = self.cf()?;
1406        batch.put_cf(&cf, &key_bytes, &value_bytes);
1407
1408        // Add to cache for read isolation (checks limits)
1409        cache.insert((self.cf_name.clone(), key_bytes), Some(value_bytes))?;
1410
1411        value.on_stored();
1412        Ok(())
1413    }
1414
1415    /// Get a value from the transaction
1416    ///
1417    /// This provides proper isolation - reads will see uncommitted writes made
1418    /// in the same transaction.
1419    #[instrument(skip(self))]
1420    pub fn get(&self, key: &T::Key) -> Result<Option<T>> {
1421        let key_bytes = key.to_bytes()?;
1422
1423        // Check cache first (uncommitted writes)
1424        let cache_key = (self.cf_name.clone(), key_bytes.clone());
1425        let cached_value = self.cache.lock().get(&cache_key).cloned();
1426
1427        if let Some(cached) = cached_value {
1428            debug!("Transaction cache hit for key in '{}'", self.cf_name);
1429            return match cached {
1430                Some(value_bytes) => {
1431                    let value: T = helpers::deserialize(&value_bytes)?;
1432                    Ok(Some(value))
1433                }
1434                None => Ok(None), // Deleted in transaction
1435            };
1436        }
1437
1438        // Not in cache, read from committed state
1439        let cf = self.cf()?;
1440        match self.db.get_cf(&cf, key_bytes)? {
1441            Some(value_bytes) => {
1442                let value: T = helpers::deserialize(&value_bytes)?;
1443                Ok(Some(value))
1444            }
1445            None => Ok(None),
1446        }
1447    }
1448
1449    /// Delete a value in the transaction
1450    ///
1451    /// The deletion is cached locally and subsequent reads will return None.
1452    #[instrument(skip(self))]
1453    pub fn delete(&self, key: &T::Key) -> Result<()> {
1454        let key_bytes = key.to_bytes()?;
1455
1456        debug!("Transaction delete in collection '{}'", self.cf_name);
1457
1458        // Acquire locks in consistent order: batch first, then cache (prevents deadlock)
1459        let mut batch = self.batch.lock();
1460        let mut cache = self.cache.lock();
1461
1462        // Add to batch for commit
1463        let cf = self.cf()?;
1464        batch.delete_cf(&cf, &key_bytes);
1465
1466        // Add to cache as deleted (None value) - checks limits
1467        cache.insert((self.cf_name.clone(), key_bytes), None)?;
1468
1469        Ok(())
1470    }
1471
1472    /// Check if a key exists
1473    pub fn exists(&self, key: &T::Key) -> Result<bool> {
1474        Ok(self.get(key)?.is_some())
1475    }
1476
1477    /// Retrieve multiple values at once using optimized multi_get
1478    ///
1479    /// This provides proper transaction isolation - reads will see uncommitted writes
1480    /// made in the same transaction. This is significantly faster than calling `get()`
1481    /// multiple times for keys not in the transaction cache.
1482    ///
1483    /// # Arguments
1484    ///
1485    /// * `keys` - Slice of keys to retrieve
1486    ///
1487    /// # Returns
1488    ///
1489    /// A vector of optional values in the same order as the input keys
1490    #[instrument(skip(self, keys))]
1491    pub fn get_many(&self, keys: &[T::Key]) -> Result<Vec<Option<T>>> {
1492        if keys.is_empty() {
1493            return Ok(Vec::new());
1494        }
1495
1496        // Convert all keys to bytes first (no lock needed)
1497        let key_bytes: Vec<Vec<u8>> = keys
1498            .iter()
1499            .map(|k| k.to_bytes())
1500            .collect::<Result<Vec<Vec<u8>>>>()?;
1501
1502        // Pre-allocate results with exact size
1503        let mut results: Vec<Option<T>> = (0..keys.len()).map(|_| None).collect();
1504        let mut uncached_indices = Vec::new();
1505        let mut uncached_keys = Vec::new();
1506
1507        // First pass: check cache for all keys
1508        // We intentionally drop the lock before DB access to avoid holding it during I/O
1509        {
1510            let cache = self.cache.lock();
1511
1512            for (i, kb) in key_bytes.iter().enumerate() {
1513                let cache_key = (self.cf_name.clone(), kb.clone());
1514
1515                if let Some(cached) = cache.get(&cache_key) {
1516                    // In cache - resolve immediately
1517                    results[i] = match cached {
1518                        Some(value_bytes) => Some(helpers::deserialize(value_bytes)?),
1519                        None => None, // Deleted in transaction
1520                    };
1521                } else {
1522                    // Not in cache - need to fetch from DB
1523                    uncached_indices.push(i);
1524                    uncached_keys.push(kb.clone());
1525                }
1526            }
1527        } // Release cache lock before DB access
1528
1529        // Second pass: batch fetch uncached keys from DB
1530        if !uncached_keys.is_empty() {
1531            let cf = self.cf()?;
1532            let cf_refs: Vec<_> = uncached_keys.iter().map(|k| (&cf, k.as_slice())).collect();
1533            let db_results = self.db.multi_get_cf(cf_refs);
1534
1535            // Verify we got the expected number of results (RocksDB guarantees this)
1536            debug_assert_eq!(
1537                db_results.len(),
1538                uncached_keys.len(),
1539                "RocksDB multi_get violated contract: got {} results but expected {}",
1540                db_results.len(),
1541                uncached_keys.len()
1542            );
1543
1544            for (result_idx, db_result) in db_results.into_iter().enumerate() {
1545                let original_idx = uncached_indices[result_idx];
1546                results[original_idx] = match db_result {
1547                    Ok(Some(value_bytes)) => Some(helpers::deserialize(&value_bytes)?),
1548                    Ok(None) => None,
1549                    Err(e) => return Err(Error::Database(format!("Multi-get failed: {}", e))),
1550                };
1551            }
1552        }
1553
1554        Ok(results)
1555    }
1556}
1557
1558// SAFETY: TransactionCollection can be safely sent between threads and shared across threads because:
1559// 1. All internal state is Send + Sync (Arc<DB>, String, &'txn Mutex<...>)
1560// 2. T: Storable which requires T: Send + Sync
1561// 3. The 'txn lifetime ensures the Transaction outlives this collection
1562// 4. All mutations go through the parent Transaction's Mutex-protected state
1563unsafe impl<'txn, T: Storable> Send for TransactionCollection<'txn, T> {}
1564unsafe impl<'txn, T: Storable> Sync for TransactionCollection<'txn, T> {}
1565
1566#[cfg(test)]
1567mod tests {
1568    use borsh::{BorshDeserialize, BorshSerialize};
1569
1570    use super::*;
1571    use crate::DatabaseConfig;
1572
1573    #[derive(Debug, Clone, PartialEq, BorshSerialize, BorshDeserialize)]
1574    struct TestItem {
1575        id: u64,
1576        data: String,
1577    }
1578
1579    impl Storable for TestItem {
1580        type Key = u64;
1581        fn key(&self) -> Self::Key {
1582            self.id
1583        }
1584    }
1585
1586    fn create_test_db() -> Database {
1587        use std::sync::atomic::{AtomicU64, Ordering};
1588        static COUNTER: AtomicU64 = AtomicU64::new(0);
1589
1590        let id = COUNTER.fetch_add(1, Ordering::SeqCst);
1591        let path = std::env::temp_dir().join(format!("ngdb_test_{}", id));
1592        let _ = std::fs::remove_dir_all(&path);
1593
1594        DatabaseConfig::new(&path)
1595            .create_if_missing(true)
1596            .add_column_family("test")
1597            .open()
1598            .expect("Failed to create test database")
1599    }
1600
1601    #[test]
1602    fn test_collection_put_and_get() {
1603        let db = create_test_db();
1604        let collection = db.collection::<TestItem>("test").unwrap();
1605
1606        let item = TestItem {
1607            id: 1,
1608            data: "test".to_string(),
1609        };
1610
1611        collection.put(&item).unwrap();
1612        let retrieved = collection.get(&1).unwrap();
1613
1614        assert_eq!(Some(item), retrieved);
1615    }
1616
1617    #[test]
1618    fn test_collection_delete() {
1619        let db = create_test_db();
1620        let collection = db.collection::<TestItem>("test").unwrap();
1621
1622        let item = TestItem {
1623            id: 1,
1624            data: "test".to_string(),
1625        };
1626
1627        collection.put(&item).unwrap();
1628        collection.delete(&1).unwrap();
1629
1630        assert_eq!(None, collection.get(&1).unwrap());
1631    }
1632
1633    #[test]
1634    fn test_batch() {
1635        let db = create_test_db();
1636        let collection = db.collection::<TestItem>("test").unwrap();
1637
1638        let mut batch = collection.batch();
1639        for i in 0..10 {
1640            batch
1641                .put(&TestItem {
1642                    id: i,
1643                    data: format!("item_{}", i),
1644                })
1645                .unwrap();
1646        }
1647        batch.commit().unwrap();
1648
1649        for i in 0..10 {
1650            let item = collection.get(&i).unwrap().unwrap();
1651            assert_eq!(i, item.id);
1652        }
1653    }
1654
1655    #[test]
1656    fn test_iterator() {
1657        let db = create_test_db();
1658        let collection = db.collection::<TestItem>("test").unwrap();
1659
1660        for i in 0..5 {
1661            collection
1662                .put(&TestItem {
1663                    id: i,
1664                    data: format!("item_{}", i),
1665                })
1666                .unwrap();
1667        }
1668
1669        let items = collection.iter().unwrap().collect_all().unwrap();
1670        assert_eq!(5, items.len());
1671    }
1672
1673    #[test]
1674    fn test_get_many() {
1675        let db = create_test_db();
1676        let collection = db.collection::<TestItem>("test").unwrap();
1677
1678        // Insert test data
1679        for i in 0..10 {
1680            collection
1681                .put(&TestItem {
1682                    id: i,
1683                    data: format!("item_{}", i),
1684                })
1685                .unwrap();
1686        }
1687
1688        // Test multi-get
1689        let keys = vec![1, 3, 5, 99]; // 99 doesn't exist
1690        let results = collection.get_many(&keys).unwrap();
1691
1692        assert_eq!(4, results.len());
1693        assert!(results[0].is_some());
1694        assert_eq!(1, results[0].as_ref().unwrap().id);
1695        assert!(results[1].is_some());
1696        assert_eq!(3, results[1].as_ref().unwrap().id);
1697        assert!(results[2].is_some());
1698        assert_eq!(5, results[2].as_ref().unwrap().id);
1699        assert!(results[3].is_none());
1700    }
1701
1702    #[test]
1703    fn test_transaction() {
1704        let db = create_test_db();
1705        let txn = db.transaction().unwrap();
1706        let collection = txn.collection::<TestItem>("test").unwrap();
1707
1708        collection
1709            .put(&TestItem {
1710                id: 1,
1711                data: "test".to_string(),
1712            })
1713            .unwrap();
1714
1715        // Should see uncommitted write
1716        assert!(collection.get(&1).unwrap().is_some());
1717
1718        txn.commit().unwrap();
1719
1720        // Should see committed write
1721        let regular_collection = db.collection::<TestItem>("test").unwrap();
1722        assert!(regular_collection.get(&1).unwrap().is_some());
1723    }
1724
1725    #[test]
1726    fn test_transaction_get_many() {
1727        let db = create_test_db();
1728        let collection = db.collection::<TestItem>("test").unwrap();
1729
1730        // Pre-populate some data
1731        collection
1732            .put(&TestItem {
1733                id: 1,
1734                data: "one".to_string(),
1735            })
1736            .unwrap();
1737        collection
1738            .put(&TestItem {
1739                id: 2,
1740                data: "two".to_string(),
1741            })
1742            .unwrap();
1743        collection
1744            .put(&TestItem {
1745                id: 5,
1746                data: "five".to_string(),
1747            })
1748            .unwrap();
1749
1750        let txn = db.transaction().unwrap();
1751        let txn_collection = txn.collection::<TestItem>("test").unwrap();
1752
1753        // Write new items in transaction
1754        txn_collection
1755            .put(&TestItem {
1756                id: 3,
1757                data: "three".to_string(),
1758            })
1759            .unwrap();
1760        txn_collection
1761            .put(&TestItem {
1762                id: 4,
1763                data: "four".to_string(),
1764            })
1765            .unwrap();
1766
1767        // Delete an existing item
1768        txn_collection.delete(&5).unwrap();
1769
1770        // Test get_many with mix of: committed, uncommitted, deleted, non-existent
1771        let keys = vec![1, 2, 3, 4, 5, 6];
1772        let results = txn_collection.get_many(&keys).unwrap();
1773
1774        // Verify results
1775        assert!(results[0].is_some()); // 1: exists (committed)
1776        assert_eq!(results[0].as_ref().unwrap().data, "one");
1777
1778        assert!(results[1].is_some()); // 2: exists (committed)
1779        assert_eq!(results[1].as_ref().unwrap().data, "two");
1780
1781        assert!(results[2].is_some()); // 3: exists (uncommitted in txn)
1782        assert_eq!(results[2].as_ref().unwrap().data, "three");
1783
1784        assert!(results[3].is_some()); // 4: exists (uncommitted in txn)
1785        assert_eq!(results[3].as_ref().unwrap().data, "four");
1786
1787        assert!(results[4].is_none()); // 5: deleted in transaction
1788        assert!(results[5].is_none()); // 6: never existed
1789
1790        // Verify transaction hasn't affected committed state
1791        let committed_results = collection.get_many(&keys).unwrap();
1792        assert!(committed_results[2].is_none()); // 3 doesn't exist yet
1793        assert!(committed_results[3].is_none()); // 4 doesn't exist yet
1794        assert!(committed_results[4].is_some()); // 5 still exists
1795
1796        // Commit and verify
1797        txn.commit().unwrap();
1798
1799        let final_results = collection.get_many(&keys).unwrap();
1800        assert!(final_results[2].is_some()); // 3 now exists
1801        assert!(final_results[3].is_some()); // 4 now exists
1802        assert!(final_results[4].is_none()); // 5 now deleted
1803    }
1804
1805    #[test]
1806    fn test_transaction_limits() {
1807        let db = create_test_db();
1808        let txn = db.transaction().unwrap();
1809        let collection = txn.collection::<TestItem>("test").unwrap();
1810
1811        // Test operation limit - try to add 100,001 items
1812        for i in 0..100_001 {
1813            let result = collection.put(&TestItem {
1814                id: i,
1815                data: format!("item_{}", i),
1816            });
1817
1818            if i < 100_000 {
1819                assert!(result.is_ok());
1820            } else {
1821                // Should fail on the 100,001st operation
1822                assert!(result.is_err());
1823                assert!(result.unwrap_err().to_string().contains("limit exceeded"));
1824                break;
1825            }
1826        }
1827    }
1828
1829    #[test]
1830    fn test_shutdown_prevents_operations() {
1831        let db = create_test_db();
1832        let collection = db.collection::<TestItem>("test").unwrap();
1833
1834        // Operation should work before shutdown
1835        let item = TestItem {
1836            id: 1,
1837            data: "test".to_string(),
1838        };
1839        assert!(collection.put(&item).is_ok());
1840
1841        // Shutdown the database
1842        db.shutdown().unwrap();
1843
1844        // Operations should fail after shutdown
1845        let item2 = TestItem {
1846            id: 2,
1847            data: "test2".to_string(),
1848        };
1849        let result = collection.put(&item2);
1850        assert!(result.is_err());
1851        assert!(result.unwrap_err().to_string().contains("shut down"));
1852
1853        // Getting collection should also fail
1854        let result = db.collection::<TestItem>("test");
1855        assert!(result.is_err());
1856        assert!(result.unwrap_err().to_string().contains("shut down"));
1857    }
1858
1859    #[test]
1860    fn test_iterator_checks_shutdown() {
1861        let db = create_test_db();
1862        let collection = db.collection::<TestItem>("test").unwrap();
1863
1864        // Add some items
1865        for i in 0..5 {
1866            collection
1867                .put(&TestItem {
1868                    id: i,
1869                    data: format!("item_{}", i),
1870                })
1871                .unwrap();
1872        }
1873
1874        // Create iterator before shutdown
1875        let iter = collection.iter().unwrap();
1876
1877        // Shutdown the database
1878        db.shutdown().unwrap();
1879
1880        // Iterator operations should fail
1881        let result = iter.collect_all();
1882        assert!(result.is_err());
1883        assert!(result.unwrap_err().to_string().contains("shut down"));
1884    }
1885
1886    #[test]
1887    fn test_shutdown_lock_is_released() {
1888        use std::sync::Arc;
1889        use std::thread;
1890        use std::time::Duration;
1891
1892        let db = Arc::new(create_test_db());
1893        let collection = db.collection::<TestItem>("test").unwrap();
1894
1895        // Add an item to ensure operations work
1896        collection
1897            .put(&TestItem {
1898                id: 1,
1899                data: "test".to_string(),
1900            })
1901            .unwrap();
1902
1903        // Spawn a thread that will try to shutdown after a delay
1904        let db_clone = Arc::clone(&db);
1905        let shutdown_handle = thread::spawn(move || {
1906            thread::sleep(Duration::from_millis(50));
1907            // This should succeed and release the lock even if flush has issues
1908            db_clone.shutdown()
1909        });
1910
1911        // Give the shutdown time to complete
1912        thread::sleep(Duration::from_millis(100));
1913
1914        // Wait for shutdown to complete
1915        let shutdown_result = shutdown_handle.join().unwrap();
1916
1917        // Shutdown should have succeeded
1918        assert!(shutdown_result.is_ok());
1919
1920        // Verify operations are now blocked (proving lock was released and shutdown succeeded)
1921        let result = collection.put(&TestItem {
1922            id: 2,
1923            data: "test2".to_string(),
1924        });
1925        assert!(result.is_err());
1926        assert!(result.unwrap_err().to_string().contains("shut down"));
1927
1928        // Verify we can still query the shutdown state (lock is not deadlocked)
1929        let result = db.collection::<TestItem>("another");
1930        assert!(result.is_err());
1931        assert!(result.unwrap_err().to_string().contains("shut down"));
1932    }
1933
1934    #[test]
1935    fn test_list_collections_checks_shutdown() {
1936        let db = create_test_db();
1937
1938        // Should work before shutdown
1939        let collections = db.list_collections();
1940        assert!(collections.is_ok());
1941
1942        // Shutdown the database
1943        db.shutdown().unwrap();
1944
1945        // list_collections should fail after shutdown
1946        let result = db.list_collections();
1947        assert!(result.is_err());
1948        assert!(result.unwrap_err().to_string().contains("shut down"));
1949    }
1950}