manifold/column_family/
database.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::io;
4use std::mem::ManuallyDrop;
5#[cfg(not(target_arch = "wasm32"))]
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, RwLock};
8
9#[cfg(not(target_arch = "wasm32"))]
10use crate::backends::FileBackend;
11use crate::db::ReadableDatabase;
12use crate::transaction_tracker::TransactionId;
13#[cfg(not(target_arch = "wasm32"))]
14use crate::tree_store::BtreeHeader;
15use crate::{
16    Database, DatabaseError, ReadTransaction, StorageBackend, StorageError, TransactionError,
17    WriteTransaction,
18};
19
20#[cfg(not(target_arch = "wasm32"))]
21use super::builder::ColumnFamilyDatabaseBuilder;
22#[cfg(not(target_arch = "wasm32"))]
23use super::file_handle_pool::FileHandlePool;
24use super::header::{ColumnFamilyMetadata, FreeSegment, MasterHeader, Segment, PAGE_SIZE};
25use super::partitioned_backend::PartitionedStorageBackend;
26use super::state::ColumnFamilyState;
27use super::wal::checkpoint::CheckpointManager;
28#[cfg(not(target_arch = "wasm32"))]
29use super::wal::config::CheckpointConfig;
30use super::wal::journal::WALJournal;
31
32/// Default size allocated to a new column family (1 GB).
33const DEFAULT_COLUMN_FAMILY_SIZE: u64 = 1024 * 1024 * 1024;
34
35/// Errors that can occur when working with column families.
36#[derive(Debug)]
37pub enum ColumnFamilyError {
38    /// A column family with this name already exists.
39    AlreadyExists(String),
40    /// The requested column family was not found.
41    NotFound(String),
42    /// An underlying database error occurred.
43    Database(DatabaseError),
44    /// An I/O error occurred.
45    Io(io::Error),
46}
47
48impl fmt::Display for ColumnFamilyError {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            ColumnFamilyError::AlreadyExists(name) => {
52                write!(f, "column family '{name}' already exists")
53            }
54            ColumnFamilyError::NotFound(name) => {
55                write!(f, "column family '{name}' not found")
56            }
57            ColumnFamilyError::Database(e) => write!(f, "database error: {e}"),
58            ColumnFamilyError::Io(e) => write!(f, "I/O error: {e}"),
59        }
60    }
61}
62
63impl std::error::Error for ColumnFamilyError {
64    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
65        match self {
66            ColumnFamilyError::Database(e) => Some(e),
67            ColumnFamilyError::Io(e) => Some(e),
68            _ => None,
69        }
70    }
71}
72
73impl From<DatabaseError> for ColumnFamilyError {
74    fn from(err: DatabaseError) -> Self {
75        ColumnFamilyError::Database(err)
76    }
77}
78
79impl From<io::Error> for ColumnFamilyError {
80    fn from(err: io::Error) -> Self {
81        ColumnFamilyError::Io(err)
82    }
83}
84
85/// A high-performance database that manages multiple independent column families within a single file.
86///
87/// **This is the recommended interface for most use cases**, providing excellent concurrent
88/// write performance (451K ops/sec at 8 threads) through Write-Ahead Log (WAL) group commit
89/// batching, which is enabled by default.
90///
91/// Each column family operates as a complete redb database with its own transaction
92/// isolation, enabling concurrent writes to different column families while maintaining
93/// ACID guarantees.
94///
95/// # Performance
96///
97/// - **451K ops/sec** at 8 concurrent threads (with WAL, enabled by default)
98/// - **4.7x faster** than vanilla redb
99/// - **WAL enabled by default** for optimal performance
100/// - Near-linear scaling from 1 to 8 threads
101///
102/// # Simplified API
103///
104/// Column families are auto-created on first access - no need to pre-create them!
105///
106/// ```ignore
107/// use manifold::column_family::ColumnFamilyDatabase;
108/// use manifold::TableDefinition;
109///
110/// // Open database - WAL enabled by default for great performance
111/// let db = ColumnFamilyDatabase::open("my_database.manifold")?;
112///
113/// // Auto-creates "users" CF on first access - no setup needed!
114/// let users_cf = db.column_family_or_create("users")?;
115/// let txn = users_cf.begin_write()?;
116/// // ... write data
117/// txn.commit()?;
118/// ```
119///
120/// # Concurrent Writes
121///
122/// ```ignore
123/// use std::thread;
124///
125/// let db = ColumnFamilyDatabase::open("my.db")?;
126///
127/// thread::scope(|s| {
128///     s.spawn(|| {
129///         let users = db.column_family_or_create("users")?;
130///         let txn = users.begin_write()?;
131///         // ... write user data
132///         txn.commit()
133///     });
134///
135///     s.spawn(|| {
136///         let products = db.column_family_or_create("products")?;
137///         let txn = products.begin_write()?;
138///         // ... write product data
139///         txn.commit()
140///     });
141/// });
142/// ```
143///
144/// # Advanced Configuration
145///
146/// ```ignore
147/// // Disable WAL (not recommended - reduces performance by ~45%)
148/// let db = ColumnFamilyDatabase::builder()
149///     .without_wal()
150///     .open("my.db")?;
151///
152/// // Custom pool size
153/// let db = ColumnFamilyDatabase::builder()
154///     .pool_size(128)
155///     .open("my.db")?;
156/// ```
157pub struct ColumnFamilyDatabase {
158    #[cfg(not(target_arch = "wasm32"))]
159    path: PathBuf,
160    #[cfg(not(target_arch = "wasm32"))]
161    header_backend: Arc<FileBackend>,
162    #[cfg(not(target_arch = "wasm32"))]
163    handle_pool: Arc<FileHandlePool>,
164    #[cfg(target_arch = "wasm32")]
165    header_backend: Arc<dyn StorageBackend>,
166    #[cfg(target_arch = "wasm32")]
167    file_name: String,
168    #[cfg(target_arch = "wasm32")]
169    file_growth_lock: Arc<std::sync::Mutex<()>>,
170    column_families: Arc<RwLock<HashMap<String, Arc<ColumnFamilyState>>>>,
171    header: Arc<RwLock<MasterHeader>>,
172    wal_journal: Option<Arc<WALJournal>>,
173    checkpoint_manager: Option<Arc<CheckpointManager>>,
174}
175
176impl ColumnFamilyDatabase {
177    /// Returns a builder for configuring and opening a column family database.
178    ///
179    /// Most users should use `ColumnFamilyDatabase::open()` which provides
180    /// excellent defaults (WAL enabled with `pool_size=64`).
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// // Recommended: use default settings
186    /// let db = ColumnFamilyDatabase::open("my.db")?;
187    ///
188    /// // Advanced: customize settings
189    /// let db = ColumnFamilyDatabase::builder()
190    ///     .pool_size(128)  // Larger pool for many CFs
191    ///     .open("my.db")?;
192    ///
193    /// // Opt-out of WAL (not recommended)
194    /// let db = ColumnFamilyDatabase::builder()
195    ///     .without_wal()
196    ///     .open("my.db")?;
197    /// ```
198    #[cfg(not(target_arch = "wasm32"))]
199    pub fn builder() -> ColumnFamilyDatabaseBuilder {
200        ColumnFamilyDatabaseBuilder::new()
201    }
202
203    /// Opens or creates a column family database at the specified path with optimal defaults.
204    ///
205    /// **This is the recommended way to open a database.** Default settings provide:
206    /// - WAL enabled (`pool_size=64`) for excellent performance (451K ops/sec at 8 threads)
207    /// - Group commit batching for high concurrent write throughput
208    /// - Auto-creating column families on first access via `column_family_or_create()`
209    ///
210    /// This is equivalent to `ColumnFamilyDatabase::builder().open(path)`.
211    ///
212    /// # Example
213    ///
214    /// ```ignore
215    /// let db = ColumnFamilyDatabase::open("my.db")?;
216    /// let users = db.column_family_or_create("users")?;
217    /// let txn = users.begin_write()?;
218    /// // ... write data
219    /// txn.commit()?;
220    /// ```
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the file cannot be opened or the header is invalid.
225    #[cfg(not(target_arch = "wasm32"))]
226    pub fn open(path: impl AsRef<Path>) -> Result<Self, DatabaseError> {
227        Self::builder().open(path)
228    }
229
230    /// Opens or creates a column family database with a WASM backend.
231    ///
232    /// This is the WASM-specific initialization that accepts a `WasmStorageBackend`
233    /// instead of a file path.
234    ///
235    /// # Arguments
236    ///
237    /// * `file_name` - Name of the OPFS file (for identification in errors)
238    /// * `backend` - The WASM storage backend to use
239    /// * `pool_size` - WAL pool size (0 to disable WAL)
240    ///
241    /// # Example
242    ///
243    /// ```ignore
244    /// use manifold::wasm::WasmStorageBackend;
245    /// use manifold::column_family::ColumnFamilyDatabase;
246    ///
247    /// // In a Web Worker context:
248    /// let backend = WasmStorageBackend::new("my-database.db").await?;
249    /// let db = ColumnFamilyDatabase::open_with_backend(
250    ///     "my-database.db",
251    ///     Arc::new(backend),
252    ///     64, // WAL enabled
253    /// )?;
254    /// ```
255    #[cfg(target_arch = "wasm32")]
256    pub(crate) fn open_with_backend_internal(
257        file_name: String,
258        backend: Arc<dyn StorageBackend>,
259        wal_journal: Option<Arc<WALJournal>>,
260        checkpoint_manager: Option<Arc<CheckpointManager>>,
261    ) -> Result<Self, DatabaseError> {
262        let file_name = file_name.into();
263
264        let is_new = backend
265            .len()
266            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
267            == 0;
268
269        let header = if is_new {
270            let header = MasterHeader::new();
271            let header_bytes = header
272                .to_bytes()
273                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
274
275            backend
276                .write(0, &header_bytes)
277                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
278            backend
279                .sync_data()
280                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
281
282            header
283        } else {
284            let mut header_bytes = vec![0u8; PAGE_SIZE];
285            backend
286                .read(0, &mut header_bytes)
287                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
288
289            MasterHeader::from_bytes(&header_bytes)
290                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
291        };
292
293        let header = Arc::new(RwLock::new(header));
294
295        let mut column_families = HashMap::new();
296        for cf_meta in &header.read().unwrap().column_families {
297            let state = ColumnFamilyState::new(cf_meta.name.clone(), cf_meta.segments.clone());
298            column_families.insert(cf_meta.name.clone(), Arc::new(state));
299        }
300
301        Ok(Self {
302            file_name,
303            header_backend: backend,
304            file_growth_lock: Arc::new(std::sync::Mutex::new(())),
305            column_families: Arc::new(RwLock::new(column_families)),
306            header,
307            wal_journal,
308            checkpoint_manager,
309        })
310    }
311
312    /// Performs WAL recovery without creating Database instances.
313    /// Operates entirely at the `TransactionalMemory` layer to avoid Drop cleanup issues.
314    ///
315    /// # Arguments
316    /// * `column_families` - Map of column family names to their states
317    /// * `handle_pool` - File handle pool for acquiring storage backends
318    /// * `journal` - WAL journal to read entries from
319    ///
320    /// # Returns
321    /// Ok(()) if recovery succeeded, Err otherwise
322    #[cfg(not(target_arch = "wasm32"))]
323    fn perform_wal_recovery(
324        column_families: &HashMap<String, Arc<ColumnFamilyState>>,
325        handle_pool: &FileHandlePool,
326        journal: &WALJournal,
327    ) -> Result<(), DatabaseError> {
328        // Read all WAL entries
329        let entries = journal
330            .read_from(0)
331            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
332
333        if entries.is_empty() {
334            return Ok(());
335        }
336
337        #[cfg(feature = "logging")]
338        log::info!("Performing WAL recovery for {} entries", entries.len());
339
340        // Group entries by column family
341        let mut cf_entries: HashMap<String, Vec<&super::wal::entry::WALEntry>> = HashMap::new();
342        for entry in &entries {
343            cf_entries
344                .entry(entry.cf_name.clone())
345                .or_default()
346                .push(entry);
347        }
348
349        // Create Database instances for recovery using ManuallyDrop to prevent Drop cleanup
350        // This gives us proper initialization (allocator state, repair if needed)
351        // but prevents Database::drop from running cleanup that would corrupt recovery
352        let mut recovery_dbs: HashMap<String, ManuallyDrop<Database>> = HashMap::new();
353
354        for (cf_name, cf_state) in column_families {
355            if !cf_entries.contains_key(cf_name) {
356                continue; // Skip CFs not in WAL
357            }
358
359            // Get storage backend directly from ColumnFamilyState
360            let backend = handle_pool.acquire(cf_name)?;
361
362            // Create PartitionedStorageBackend
363            let segments = cf_state.segments.read().unwrap().clone();
364            let file_growth_lock = handle_pool.file_growth_lock();
365
366            let partition_backend = PartitionedStorageBackend::with_segments(
367                backend,
368                segments,
369                None, // No expansion callback during recovery
370                file_growth_lock,
371            );
372
373            // Create Database with proper initialization (handles repair, allocator state, etc.)
374            // Wrap in ManuallyDrop to prevent Database::drop cleanup from running
375            let db = ManuallyDrop::new(Database::builder().create_with_backend(partition_backend)?);
376
377            recovery_dbs.insert(cf_name.clone(), db);
378        }
379
380        // Apply WAL entries to each Database
381        for (cf_name, entries_for_cf) in &cf_entries {
382            let db = recovery_dbs.get(cf_name).ok_or_else(|| {
383                DatabaseError::Storage(StorageError::from(io::Error::new(
384                    io::ErrorKind::NotFound,
385                    format!("No Database for CF '{cf_name}'"),
386                )))
387            })?;
388
389            let mem = db.get_memory();
390
391            for entry in entries_for_cf {
392                // Convert WAL payload to BtreeHeader format
393                let data_root =
394                    entry
395                        .payload
396                        .user_root
397                        .map(|(page_num, checksum, length)| BtreeHeader {
398                            root: page_num,
399                            checksum,
400                            length,
401                        });
402
403                let system_root = entry
404                    .payload
405                    .system_root
406                    .map(|(page_num, checksum, length)| BtreeHeader {
407                        root: page_num,
408                        checksum,
409                        length,
410                    });
411
412                // Apply WAL transaction (updates secondary slot)
413                mem.apply_wal_transaction(
414                    data_root,
415                    system_root,
416                    TransactionId::new(entry.transaction_id),
417                )?;
418            }
419        }
420
421        // Commit all recovered state at TransactionalMemory level
422        // This promotes secondary → primary and fsyncs
423        for (cf_name, db) in &recovery_dbs {
424            // Get the last WAL entry for this CF to use its transaction ID
425            let last_entry = cf_entries
426                .get(cf_name)
427                .and_then(|entries| entries.last())
428                .ok_or_else(|| {
429                    DatabaseError::Storage(StorageError::from(io::Error::new(
430                        io::ErrorKind::NotFound,
431                        format!("No entries for CF '{cf_name}'"),
432                    )))
433                })?;
434
435            let mem = db.get_memory();
436            let data_root = mem.get_data_root();
437            let system_root = mem.get_system_root();
438            let txn_id = TransactionId::new(last_entry.transaction_id);
439
440            // Directly commit: swap secondary to primary and fsync
441            // Use two_phase=false and shrink_policy=Never for simplicity
442            mem.commit(
443                data_root,
444                system_root,
445                txn_id,
446                false,
447                crate::tree_store::ShrinkPolicy::Never,
448            )
449            .map_err(|e| {
450                DatabaseError::Storage(StorageError::from(io::Error::other(format!(
451                    "recovery commit failed for '{cf_name}': {e}"
452                ))))
453            })?;
454
455            #[cfg(feature = "logging")]
456            log::debug!(
457                "Recovered CF '{cf_name}' to transaction {}",
458                txn_id.raw_id()
459            );
460        }
461
462        // Truncate WAL after successful recovery
463        let latest_seq = entries.last().unwrap().sequence;
464        journal
465            .truncate(latest_seq + 1)
466            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
467
468        #[cfg(feature = "logging")]
469        log::info!("WAL recovery completed successfully");
470
471        // All ManuallyDrop<Database> instances drop here
472        // ManuallyDrop prevents Database::drop from running, so NO cleanup, NO corruption
473        Ok(())
474    }
475
476    /// Internal implementation of open, called by the builder (native platforms).
477    #[cfg(not(target_arch = "wasm32"))]
478    pub(crate) fn open_with_builder(
479        path: PathBuf,
480        pool_size: usize,
481    ) -> Result<Self, DatabaseError> {
482        let file = std::fs::OpenOptions::new()
483            .read(true)
484            .write(true)
485            .create(true)
486            .truncate(false)
487            .open(&path)
488            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
489
490        let header_backend = Arc::new(FileBackend::new(file)?);
491
492        let is_new = header_backend
493            .len()
494            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
495            == 0;
496
497        let header = if is_new {
498            let header = MasterHeader::new();
499            let header_bytes = header
500                .to_bytes()
501                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
502
503            header_backend
504                .write(0, &header_bytes)
505                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
506            header_backend
507                .sync_data()
508                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
509
510            header
511        } else {
512            let mut header_bytes = vec![0u8; PAGE_SIZE];
513            header_backend
514                .read(0, &mut header_bytes)
515                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
516
517            MasterHeader::from_bytes(&header_bytes)
518                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
519        };
520
521        let handle_pool = Arc::new(FileHandlePool::new(path.clone(), pool_size));
522        let header = Arc::new(RwLock::new(header));
523
524        let mut column_families = HashMap::new();
525        for cf_meta in &header.read().unwrap().column_families {
526            let state = ColumnFamilyState::new(cf_meta.name.clone(), cf_meta.segments.clone());
527            column_families.insert(cf_meta.name.clone(), Arc::new(state));
528        }
529
530        // Initialize WAL journal and perform recovery if needed
531        let wal_journal = if pool_size > 0 {
532            let wal_path = path.with_extension("wal");
533            let journal = WALJournal::open(&wal_path)
534                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
535
536            // Perform WAL recovery without creating Database instances
537            // This operates entirely at the TransactionalMemory layer to avoid Drop cleanup issues
538            let entries = journal
539                .read_from(0)
540                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
541
542            if !entries.is_empty() {
543                Self::perform_wal_recovery(&column_families, &handle_pool, &journal)?;
544            }
545
546            Some(Arc::new(journal))
547        } else {
548            None
549        };
550
551        // Start checkpoint manager if WAL is enabled
552        let checkpoint_manager = if let Some(ref journal_arc) = wal_journal {
553            let config = CheckpointConfig {
554                interval: std::time::Duration::from_secs(60),
555                max_wal_size: 64 * 1024 * 1024,
556            };
557
558            // Create database Arc for checkpoint manager (temporary, will be replaced by self)
559            let db_arc = Arc::new(Self {
560                path: path.clone(),
561                header_backend: Arc::clone(&header_backend),
562                handle_pool: Arc::clone(&handle_pool),
563                column_families: Arc::new(RwLock::new(column_families.clone())),
564                header: Arc::clone(&header),
565                wal_journal: Some(Arc::clone(journal_arc)),
566                checkpoint_manager: None, // Will be set after creation
567            });
568
569            let manager = CheckpointManager::start(Arc::clone(journal_arc), db_arc, config);
570
571            Some(Arc::new(manager))
572        } else {
573            None
574        };
575
576        Ok(Self {
577            path,
578            header_backend,
579            handle_pool,
580            column_families: Arc::new(RwLock::new(column_families)),
581            header,
582            wal_journal,
583            checkpoint_manager,
584        })
585    }
586
587    /// Creates a new column family with the specified name and optional size.
588    ///
589    /// The column family is created cheaply with no file descriptor allocated.
590    /// The Database instance and file handle are lazily initialized on first write.
591    ///
592    /// # Arguments
593    ///
594    /// * `name` - Name of the column family
595    /// * `size` - Initial size in bytes. If None, defaults to 1GB.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if a column family with this name already exists or
600    /// the header cannot be updated.
601    pub fn create_column_family(
602        &self,
603        name: impl Into<String>,
604        size: Option<u64>,
605    ) -> Result<ColumnFamily, ColumnFamilyError> {
606        let name = name.into();
607        let size = size.unwrap_or(DEFAULT_COLUMN_FAMILY_SIZE);
608
609        let mut cfs = self.column_families.write().unwrap();
610
611        if cfs.contains_key(&name) {
612            return Err(ColumnFamilyError::AlreadyExists(name));
613        }
614
615        let (segments, cf_name) = {
616            let mut header = self.header.write().unwrap();
617            let offset = header.end_of_file();
618            let metadata = ColumnFamilyMetadata::new(name.clone(), offset, size);
619
620            header.column_families.push(metadata.clone());
621
622            let header_bytes = header.to_bytes()?;
623            self.header_backend.write(0, &header_bytes)?;
624            self.header_backend.sync_data()?;
625
626            // PRE-ALLOCATE FILE SPACE for this partition
627            // CRITICAL: This eliminates filesystem metadata update contention
628            // By extending the file to cover all partitions upfront, we avoid:
629            // 1. File extension syscalls during Database writes
630            // 2. Kernel-level serialization on file size changes
631            // 3. Filesystem journal updates
632            let new_file_size = offset + size;
633            let current_file_size = self.header_backend.len().map_err(ColumnFamilyError::Io)?;
634
635            if new_file_size > current_file_size {
636                // Extend file to reserve space for this partition
637                self.header_backend
638                    .set_len(new_file_size)
639                    .map_err(ColumnFamilyError::Io)?;
640
641                // Important: Don't sync here - let the OS handle it lazily
642                // This keeps create_column_family() fast
643            }
644
645            (metadata.segments, metadata.name.clone())
646        };
647
648        let state = Arc::new(ColumnFamilyState::new(name.clone(), segments));
649        cfs.insert(name.clone(), Arc::clone(&state));
650
651        #[cfg(not(target_arch = "wasm32"))]
652        {
653            Ok(ColumnFamily {
654                name: cf_name,
655                state,
656                pool: self.handle_pool.clone(),
657                path: self.path.clone(),
658                header: self.header.clone(),
659                header_backend: self.header_backend.clone(),
660                wal_journal: self.wal_journal.clone(),
661                checkpoint_manager: self.checkpoint_manager.clone(),
662            })
663        }
664        #[cfg(target_arch = "wasm32")]
665        {
666            Ok(ColumnFamily {
667                name: cf_name,
668                state,
669                backend: self.header_backend.clone(),
670                header: self.header.clone(),
671                header_backend: self.header_backend.clone(),
672                file_growth_lock: self.file_growth_lock.clone(),
673                wal_journal: self.wal_journal.clone(),
674                checkpoint_manager: self.checkpoint_manager.clone(),
675            })
676        }
677    }
678
679    /// Retrieves a handle to an existing column family.
680    ///
681    /// The returned handle is lightweight and can be cloned cheaply.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if no column family with the given name exists.
686    pub fn column_family(&self, name: &str) -> Result<ColumnFamily, ColumnFamilyError> {
687        let cfs = self.column_families.read().unwrap();
688
689        match cfs.get(name) {
690            Some(state) => {
691                #[cfg(not(target_arch = "wasm32"))]
692                {
693                    Ok(ColumnFamily {
694                        name: name.to_string(),
695                        state: state.clone(),
696                        pool: self.handle_pool.clone(),
697                        path: self.path.clone(),
698                        header: self.header.clone(),
699                        header_backend: self.header_backend.clone(),
700                        wal_journal: self.wal_journal.clone(),
701                        checkpoint_manager: self.checkpoint_manager.clone(),
702                    })
703                }
704                #[cfg(target_arch = "wasm32")]
705                {
706                    Ok(ColumnFamily {
707                        name: name.to_string(),
708                        state: state.clone(),
709                        backend: self.header_backend.clone(),
710                        header: self.header.clone(),
711                        header_backend: self.header_backend.clone(),
712                        file_growth_lock: self.file_growth_lock.clone(),
713                        wal_journal: self.wal_journal.clone(),
714                        checkpoint_manager: self.checkpoint_manager.clone(),
715                    })
716                }
717            }
718            None => Err(ColumnFamilyError::NotFound(name.to_string())),
719        }
720    }
721
722    /// Retrieves a handle to a column family, creating it if it doesn't exist.
723    ///
724    /// This is a convenience method that combines `column_family()` and `create_column_family()`.
725    /// If the column family exists, it returns a handle to it. Otherwise, it creates a new
726    /// column family with the default size (1GB) and returns a handle.
727    ///
728    /// This is the recommended way to access column families for most use cases.
729    ///
730    /// # Example
731    ///
732    /// ```ignore
733    /// let db = ColumnFamilyDatabase::open("my_database.manifold")?;
734    ///
735    /// // Auto-creates "users" if it doesn't exist
736    /// let users = db.column_family_or_create("users")?;
737    /// let txn = users.begin_write()?;
738    /// // ... write data
739    /// txn.commit()?;
740    /// ```
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if the column family cannot be created (e.g., I/O error).
745    pub fn column_family_or_create(&self, name: &str) -> Result<ColumnFamily, ColumnFamilyError> {
746        // Try to get existing CF first (read lock only)
747        {
748            let cfs = self.column_families.read().unwrap();
749            if let Some(state) = cfs.get(name) {
750                #[cfg(not(target_arch = "wasm32"))]
751                {
752                    return Ok(ColumnFamily {
753                        name: name.to_string(),
754                        state: state.clone(),
755                        pool: self.handle_pool.clone(),
756                        path: self.path.clone(),
757                        header: self.header.clone(),
758                        header_backend: self.header_backend.clone(),
759                        wal_journal: self.wal_journal.clone(),
760                        checkpoint_manager: self.checkpoint_manager.clone(),
761                    });
762                }
763                #[cfg(target_arch = "wasm32")]
764                {
765                    return Ok(ColumnFamily {
766                        name: name.to_string(),
767                        state: state.clone(),
768                        backend: self.header_backend.clone(),
769                        header: self.header.clone(),
770                        header_backend: self.header_backend.clone(),
771                        wal_journal: self.wal_journal.clone(),
772                        checkpoint_manager: self.checkpoint_manager.clone(),
773                    });
774                }
775            }
776        }
777
778        // Doesn't exist - create it with default size
779        self.create_column_family(name, None)
780    }
781
782    /// Returns a list of all column family names in the database.
783    pub fn list_column_families(&self) -> Vec<String> {
784        let header = self.header.read().unwrap();
785        header
786            .column_families
787            .iter()
788            .map(|cf| cf.name.clone())
789            .collect()
790    }
791
792    /// Enable WAL with the given backend (WASM only).
793    ///
794    /// This must be called immediately after creation to initialize WAL support.
795    /// Returns the checkpoint manager for tracking.
796    #[cfg(target_arch = "wasm32")]
797    pub fn enable_wal(
798        &mut self,
799        wal_backend: Arc<dyn StorageBackend>,
800    ) -> Result<(), DatabaseError> {
801        use crate::column_family::wal::checkpoint::CheckpointManager;
802        use crate::column_family::wal::config::CheckpointConfig;
803        use crate::column_family::wal::journal::WALJournal;
804
805        // Create WAL journal with the provided backend
806        let journal = WALJournal::new(wal_backend)
807            .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
808        let journal_arc = Arc::new(journal);
809
810        // Start checkpoint manager
811        let config = CheckpointConfig {
812            interval: std::time::Duration::from_secs(15), // WASM default: 15s
813            max_wal_size: 32 * 1024 * 1024,               // WASM default: 32 MB
814        };
815
816        // We need Arc<Self> for checkpoint manager, but we have &mut self
817        // Store journal first, then create manager
818        self.wal_journal = Some(Arc::clone(&journal_arc));
819
820        // Create a temporary Arc to Self for checkpoint manager
821        // This is safe because checkpoint manager only needs read access
822        let db_arc = unsafe { Arc::from_raw(self as *const Self) };
823
824        let checkpoint_mgr =
825            CheckpointManager::start(Arc::clone(&journal_arc), Arc::clone(&db_arc), config);
826
827        // Don't drop the Arc - it's just a reference
828        std::mem::forget(db_arc);
829
830        self.checkpoint_manager = Some(Arc::new(checkpoint_mgr));
831
832        Ok(())
833    }
834
835    /// Manually triggers a checkpoint to flush WAL to main database.
836    ///
837    /// This ensures all pending WAL entries are applied to the database and persisted.
838    /// If WAL is disabled (`pool_size` = 0), this is a no-op.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the checkpoint operation fails.
843    #[cfg(not(target_arch = "wasm32"))]
844    pub fn checkpoint(&self) -> Result<(), DatabaseError> {
845        if let Some(checkpoint_mgr) = self.checkpoint_manager.as_ref() {
846            checkpoint_mgr
847                .checkpoint_now()
848                .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
849        }
850        Ok(())
851    }
852
853    /// Manually triggers a checkpoint to flush WAL to main database (WASM version).
854    ///
855    /// This ensures all pending WAL entries are applied to the database and persisted.
856    /// If WAL is disabled (`pool_size` = 0), this is a no-op.
857    ///
858    /// # Errors
859    ///
860    /// Returns an error if the checkpoint operation fails.
861    #[cfg(target_arch = "wasm32")]
862    pub fn checkpoint(&self) -> Result<(), DatabaseError> {
863        // WASM checkpoint manager placeholder - will be implemented when we have proper async support
864        Ok(())
865    }
866
867    /// Returns the path to the database file (native platforms).
868    #[cfg(not(target_arch = "wasm32"))]
869    pub fn path(&self) -> &Path {
870        &self.path
871    }
872
873    /// Returns the file name (WASM).
874    #[cfg(target_arch = "wasm32")]
875    pub fn file_name(&self) -> &str {
876        &self.file_name
877    }
878
879    /// Deletes a column family and adds its segments to the free list for reuse.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if the column family does not exist or the header
884    /// cannot be updated.
885    pub fn delete_column_family(&self, name: &str) -> Result<(), ColumnFamilyError> {
886        let mut cfs = self.column_families.write().unwrap();
887
888        if !cfs.contains_key(name) {
889            return Err(ColumnFamilyError::NotFound(name.to_string()));
890        }
891
892        cfs.remove(name);
893
894        let mut header = self.header.write().unwrap();
895
896        let cf_idx = header
897            .column_families
898            .iter()
899            .position(|cf| cf.name == name)
900            .ok_or_else(|| ColumnFamilyError::NotFound(name.to_string()))?;
901
902        let cf_meta = header.column_families.remove(cf_idx);
903        for segment in cf_meta.segments {
904            header
905                .free_segments
906                .push(FreeSegment::new(segment.offset, segment.size));
907        }
908
909        let header_bytes = header.to_bytes()?;
910        self.header_backend.write(0, &header_bytes)?;
911        self.header_backend.sync_data()?;
912
913        Ok(())
914    }
915
916    /// Internal segment allocation function used by expansion callbacks (native platforms).
917    #[cfg(not(target_arch = "wasm32"))]
918    fn allocate_segment_internal(
919        cf_name: &str,
920        size: u64,
921        header: &Arc<RwLock<MasterHeader>>,
922        _header_backend: &Arc<FileBackend>,
923        state: &Arc<ColumnFamilyState>,
924    ) -> io::Result<Segment> {
925        // Allocate segment from free list or end of file - keep lock minimal
926        let allocated_segment = {
927            let mut hdr = header.write().unwrap();
928
929            let mut best_fit_idx = None;
930            let mut best_fit_size = u64::MAX;
931
932            for (idx, free_seg) in hdr.free_segments.iter().enumerate() {
933                if free_seg.size >= size && free_seg.size < best_fit_size {
934                    best_fit_idx = Some(idx);
935                    best_fit_size = free_seg.size;
936                }
937            }
938
939            let allocated_segment = if let Some(idx) = best_fit_idx {
940                let free_seg = hdr.free_segments.remove(idx);
941
942                if free_seg.size == size {
943                    Segment::new(free_seg.offset, free_seg.size)
944                } else {
945                    let allocated = Segment::new(free_seg.offset, size);
946                    let remaining = FreeSegment::new(free_seg.offset + size, free_seg.size - size);
947                    hdr.free_segments.push(remaining);
948                    allocated
949                }
950            } else {
951                let offset = hdr.end_of_file();
952                let aligned_offset = offset.div_ceil(PAGE_SIZE as u64) * PAGE_SIZE as u64;
953                Segment::new(aligned_offset, size)
954            };
955
956            if let Some(cf_meta) = hdr.column_families.iter_mut().find(|cf| cf.name == cf_name) {
957                cf_meta.segments.push(allocated_segment.clone());
958            }
959
960            allocated_segment
961        }; // Header lock released here - no disk I/O while holding lock
962
963        // Update state outside of header lock
964        let mut state_segments = state.segments.write().unwrap();
965        state_segments.push(allocated_segment.clone());
966
967        // Don't write/fsync header on every allocation - eliminates serialization bottleneck
968        // Header persisted on clean shutdown or periodically
969        // Trade-off: crash may lose segment allocations (wasted space, not data loss)
970
971        Ok(allocated_segment)
972    }
973
974    /// Internal segment allocation function used by expansion callbacks (WASM).
975    #[cfg(target_arch = "wasm32")]
976    fn allocate_segment_internal(
977        cf_name: &str,
978        size: u64,
979        header: &Arc<RwLock<MasterHeader>>,
980        _header_backend: &Arc<dyn StorageBackend>,
981        state: &Arc<ColumnFamilyState>,
982    ) -> io::Result<Segment> {
983        // Allocate segment from free list or end of file - keep lock minimal
984        let allocated_segment = {
985            let mut hdr = header.write().unwrap();
986
987            let mut best_fit_idx = None;
988            let mut best_fit_size = u64::MAX;
989
990            for (idx, free_seg) in hdr.free_segments.iter().enumerate() {
991                if free_seg.size >= size && free_seg.size < best_fit_size {
992                    best_fit_idx = Some(idx);
993                    best_fit_size = free_seg.size;
994                }
995            }
996
997            let allocated_segment = if let Some(idx) = best_fit_idx {
998                let free_seg = hdr.free_segments.remove(idx);
999
1000                if free_seg.size == size {
1001                    Segment::new(free_seg.offset, free_seg.size)
1002                } else {
1003                    let allocated = Segment::new(free_seg.offset, size);
1004                    let remaining = FreeSegment::new(free_seg.offset + size, free_seg.size - size);
1005                    hdr.free_segments.push(remaining);
1006                    allocated
1007                }
1008            } else {
1009                let offset = hdr.end_of_file();
1010                let aligned_offset = offset.div_ceil(PAGE_SIZE as u64) * PAGE_SIZE as u64;
1011                Segment::new(aligned_offset, size)
1012            };
1013
1014            if let Some(cf_meta) = hdr.column_families.iter_mut().find(|cf| cf.name == cf_name) {
1015                cf_meta.segments.push(allocated_segment.clone());
1016            }
1017
1018            allocated_segment
1019        }; // Header lock released here
1020
1021        // Update segments in state
1022        let mut segments = state.segments.write().unwrap();
1023        segments.push(allocated_segment.clone());
1024
1025        Ok(allocated_segment)
1026    }
1027}
1028
1029/// A handle to a column family within a [`ColumnFamilyDatabase`].
1030///
1031/// This is a lightweight structure that can be cheaply cloned and passed between threads.
1032/// The underlying Database instance is lazily initialized on first write, acquiring a
1033/// file handle from the pool.
1034#[derive(Clone)]
1035pub struct ColumnFamily {
1036    name: String,
1037    state: Arc<ColumnFamilyState>,
1038    #[cfg(not(target_arch = "wasm32"))]
1039    pool: Arc<FileHandlePool>,
1040    #[cfg(not(target_arch = "wasm32"))]
1041    path: PathBuf,
1042    #[cfg(not(target_arch = "wasm32"))]
1043    header_backend: Arc<FileBackend>,
1044    #[cfg(target_arch = "wasm32")]
1045    header_backend: Arc<dyn StorageBackend>,
1046    #[cfg(target_arch = "wasm32")]
1047    backend: Arc<dyn StorageBackend>,
1048    #[cfg(target_arch = "wasm32")]
1049    file_growth_lock: Arc<std::sync::Mutex<()>>,
1050    header: Arc<RwLock<MasterHeader>>,
1051    wal_journal: Option<Arc<WALJournal>>,
1052    checkpoint_manager: Option<Arc<CheckpointManager>>,
1053}
1054
1055impl ColumnFamily {
1056    /// Returns the name of this column family.
1057    pub fn name(&self) -> &str {
1058        &self.name
1059    }
1060
1061    /// Begins a write transaction for this column family.
1062    ///
1063    /// On first call, this acquires a file handle from the pool and initializes
1064    /// the Database instance. Subsequent calls reuse the cached instance.
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns an error if a write transaction is already in progress for this
1069    /// column family or if the Database cannot be initialized.
1070    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1071        let db = self.ensure_database().map_err(|e| match e {
1072            DatabaseError::Storage(s) => TransactionError::Storage(s),
1073            _ => TransactionError::Storage(StorageError::from(io::Error::other(format!(
1074                "database initialization error: {e}"
1075            )))),
1076        })?;
1077
1078        let mut txn = db.begin_write()?;
1079
1080        // Inject WAL context if enabled (native platforms only)
1081        #[cfg(not(target_arch = "wasm32"))]
1082        if let Some(wal_journal) = &self.wal_journal {
1083            txn.set_wal_context(
1084                self.name.clone(),
1085                Arc::clone(wal_journal),
1086                self.checkpoint_manager.as_ref().map(Arc::clone),
1087            );
1088        }
1089
1090        Ok(txn)
1091    }
1092
1093    /// Begins a read transaction for this column family.
1094    ///
1095    /// Multiple read transactions may be active concurrently.
1096    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
1097        let db = self.ensure_database().map_err(|e| match e {
1098            DatabaseError::Storage(s) => TransactionError::Storage(s),
1099            _ => TransactionError::Storage(StorageError::from(io::Error::other(format!(
1100                "database initialization error: {e}"
1101            )))),
1102        })?;
1103        db.begin_read()
1104    }
1105
1106    /// Releases this column family's file handle back to the pool.
1107    ///
1108    /// After calling this, the next operation on this column family will
1109    /// re-acquire a file handle from the pool. This is useful for explicitly
1110    /// managing file descriptor usage in scenarios with many column families.
1111    ///
1112    /// This is automatically called by LRU eviction, but can be called manually
1113    /// for explicit control (e.g., in tests or after bulk operations).
1114    #[cfg(not(target_arch = "wasm32"))]
1115    pub fn release_handle(&self) {
1116        self.pool.release(&self.name);
1117        self.state.evict_database();
1118    }
1119
1120    /// WASM version - no-op since WASM doesn't have file handle pooling
1121    #[cfg(target_arch = "wasm32")]
1122    pub fn release_handle(&self) {
1123        self.state.evict_database();
1124    }
1125
1126    /// Ensures the Database instance exists, creating it if necessary (native platforms).
1127    #[cfg(not(target_arch = "wasm32"))]
1128    pub(crate) fn ensure_database(&self) -> Result<Arc<Database>, DatabaseError> {
1129        let name = self.name.clone();
1130        let header = self.header.clone();
1131        let header_backend = self.header_backend.clone();
1132
1133        let state = self.state.clone();
1134
1135        let expansion_callback = Arc::new(move |requested_size: u64| -> io::Result<Segment> {
1136            ColumnFamilyDatabase::allocate_segment_internal(
1137                &name,
1138                requested_size,
1139                &header,
1140                &header_backend,
1141                &state,
1142            )
1143        });
1144
1145        self.state
1146            .ensure_database(&self.pool, &self.path, expansion_callback)
1147    }
1148
1149    /// Ensures the Database instance exists, creating it if necessary (WASM).
1150    #[cfg(target_arch = "wasm32")]
1151    pub(crate) fn ensure_database(&self) -> Result<Arc<Database>, DatabaseError> {
1152        let name = self.name.clone();
1153        let header = self.header.clone();
1154        let header_backend = self.header_backend.clone();
1155        let state = self.state.clone();
1156
1157        let expansion_callback = Arc::new(move |requested_size: u64| -> io::Result<Segment> {
1158            ColumnFamilyDatabase::allocate_segment_internal(
1159                &name,
1160                requested_size,
1161                &header,
1162                &header_backend,
1163                &state,
1164            )
1165        });
1166
1167        self.state.ensure_database_wasm(
1168            &self.backend,
1169            expansion_callback,
1170            self.file_growth_lock.clone(),
1171        )
1172    }
1173}
1174
1175impl Drop for ColumnFamilyDatabase {
1176    fn drop(&mut self) {
1177        // Run final checkpoint to flush dirty data if WAL is enabled
1178        #[cfg(not(target_arch = "wasm32"))]
1179        if self.wal_journal.is_some() {
1180            // Checkpoint all column families to persist dirty data
1181            for cf_name in self.list_column_families() {
1182                if let Ok(cf) = self.column_family(&cf_name)
1183                    && let Ok(db) = cf.ensure_database()
1184                {
1185                    let mem = db.get_memory();
1186                    if let Ok((data_root, system_root, txn_id)) = mem.get_current_secondary_state()
1187                    {
1188                        let _ = mem.checkpoint_commit(data_root, system_root, txn_id);
1189                    }
1190                }
1191            }
1192        }
1193
1194        // Shutdown checkpoint manager if it exists (native platforms only)
1195        #[cfg(not(target_arch = "wasm32"))]
1196        if let Some(checkpoint_mgr) = self.checkpoint_manager.take() {
1197            // Try to unwrap the Arc - if we're the last owner, we can shutdown gracefully
1198            if let Ok(manager) = Arc::try_unwrap(checkpoint_mgr) {
1199                let _ = manager.shutdown();
1200            }
1201            // If Arc::try_unwrap fails, other references exist and Drop on CheckpointManager
1202            // will handle shutdown when they're dropped
1203        }
1204
1205        // Close the header backend to release the file lock (or OPFS handle)
1206        let _ = self.header_backend.close();
1207    }
1208}