1use std::collections::HashMap;
2use std::fmt;
3use std::io;
4use std::mem::ManuallyDrop;
5#[cfg(not(target_arch = "wasm32"))]
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, RwLock};
8
9#[cfg(not(target_arch = "wasm32"))]
10use crate::backends::FileBackend;
11use crate::db::ReadableDatabase;
12use crate::transaction_tracker::TransactionId;
13#[cfg(not(target_arch = "wasm32"))]
14use crate::tree_store::BtreeHeader;
15use crate::{
16 Database, DatabaseError, ReadTransaction, StorageBackend, StorageError, TransactionError,
17 WriteTransaction,
18};
19
20#[cfg(not(target_arch = "wasm32"))]
21use super::builder::ColumnFamilyDatabaseBuilder;
22#[cfg(not(target_arch = "wasm32"))]
23use super::file_handle_pool::FileHandlePool;
24use super::header::{ColumnFamilyMetadata, FreeSegment, MasterHeader, Segment, PAGE_SIZE};
25use super::partitioned_backend::PartitionedStorageBackend;
26use super::state::ColumnFamilyState;
27use super::wal::checkpoint::CheckpointManager;
28#[cfg(not(target_arch = "wasm32"))]
29use super::wal::config::CheckpointConfig;
30use super::wal::journal::WALJournal;
31
32const DEFAULT_COLUMN_FAMILY_SIZE: u64 = 1024 * 1024 * 1024;
34
35#[derive(Debug)]
37pub enum ColumnFamilyError {
38 AlreadyExists(String),
40 NotFound(String),
42 Database(DatabaseError),
44 Io(io::Error),
46}
47
48impl fmt::Display for ColumnFamilyError {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 ColumnFamilyError::AlreadyExists(name) => {
52 write!(f, "column family '{name}' already exists")
53 }
54 ColumnFamilyError::NotFound(name) => {
55 write!(f, "column family '{name}' not found")
56 }
57 ColumnFamilyError::Database(e) => write!(f, "database error: {e}"),
58 ColumnFamilyError::Io(e) => write!(f, "I/O error: {e}"),
59 }
60 }
61}
62
63impl std::error::Error for ColumnFamilyError {
64 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
65 match self {
66 ColumnFamilyError::Database(e) => Some(e),
67 ColumnFamilyError::Io(e) => Some(e),
68 _ => None,
69 }
70 }
71}
72
73impl From<DatabaseError> for ColumnFamilyError {
74 fn from(err: DatabaseError) -> Self {
75 ColumnFamilyError::Database(err)
76 }
77}
78
79impl From<io::Error> for ColumnFamilyError {
80 fn from(err: io::Error) -> Self {
81 ColumnFamilyError::Io(err)
82 }
83}
84
85pub struct ColumnFamilyDatabase {
158 #[cfg(not(target_arch = "wasm32"))]
159 path: PathBuf,
160 #[cfg(not(target_arch = "wasm32"))]
161 header_backend: Arc<FileBackend>,
162 #[cfg(not(target_arch = "wasm32"))]
163 handle_pool: Arc<FileHandlePool>,
164 #[cfg(target_arch = "wasm32")]
165 header_backend: Arc<dyn StorageBackend>,
166 #[cfg(target_arch = "wasm32")]
167 file_name: String,
168 #[cfg(target_arch = "wasm32")]
169 file_growth_lock: Arc<std::sync::Mutex<()>>,
170 column_families: Arc<RwLock<HashMap<String, Arc<ColumnFamilyState>>>>,
171 header: Arc<RwLock<MasterHeader>>,
172 wal_journal: Option<Arc<WALJournal>>,
173 checkpoint_manager: Option<Arc<CheckpointManager>>,
174}
175
176impl ColumnFamilyDatabase {
177 #[cfg(not(target_arch = "wasm32"))]
199 pub fn builder() -> ColumnFamilyDatabaseBuilder {
200 ColumnFamilyDatabaseBuilder::new()
201 }
202
203 #[cfg(not(target_arch = "wasm32"))]
226 pub fn open(path: impl AsRef<Path>) -> Result<Self, DatabaseError> {
227 Self::builder().open(path)
228 }
229
230 #[cfg(target_arch = "wasm32")]
256 pub(crate) fn open_with_backend_internal(
257 file_name: String,
258 backend: Arc<dyn StorageBackend>,
259 wal_journal: Option<Arc<WALJournal>>,
260 checkpoint_manager: Option<Arc<CheckpointManager>>,
261 ) -> Result<Self, DatabaseError> {
262 let file_name = file_name.into();
263
264 let is_new = backend
265 .len()
266 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
267 == 0;
268
269 let header = if is_new {
270 let header = MasterHeader::new();
271 let header_bytes = header
272 .to_bytes()
273 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
274
275 backend
276 .write(0, &header_bytes)
277 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
278 backend
279 .sync_data()
280 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
281
282 header
283 } else {
284 let mut header_bytes = vec![0u8; PAGE_SIZE];
285 backend
286 .read(0, &mut header_bytes)
287 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
288
289 MasterHeader::from_bytes(&header_bytes)
290 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
291 };
292
293 let header = Arc::new(RwLock::new(header));
294
295 let mut column_families = HashMap::new();
296 for cf_meta in &header.read().unwrap().column_families {
297 let state = ColumnFamilyState::new(cf_meta.name.clone(), cf_meta.segments.clone());
298 column_families.insert(cf_meta.name.clone(), Arc::new(state));
299 }
300
301 Ok(Self {
302 file_name,
303 header_backend: backend,
304 file_growth_lock: Arc::new(std::sync::Mutex::new(())),
305 column_families: Arc::new(RwLock::new(column_families)),
306 header,
307 wal_journal,
308 checkpoint_manager,
309 })
310 }
311
312 #[cfg(not(target_arch = "wasm32"))]
323 fn perform_wal_recovery(
324 column_families: &HashMap<String, Arc<ColumnFamilyState>>,
325 handle_pool: &FileHandlePool,
326 journal: &WALJournal,
327 ) -> Result<(), DatabaseError> {
328 let entries = journal
330 .read_from(0)
331 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
332
333 if entries.is_empty() {
334 return Ok(());
335 }
336
337 #[cfg(feature = "logging")]
338 log::info!("Performing WAL recovery for {} entries", entries.len());
339
340 let mut cf_entries: HashMap<String, Vec<&super::wal::entry::WALEntry>> = HashMap::new();
342 for entry in &entries {
343 cf_entries
344 .entry(entry.cf_name.clone())
345 .or_default()
346 .push(entry);
347 }
348
349 let mut recovery_dbs: HashMap<String, ManuallyDrop<Database>> = HashMap::new();
353
354 for (cf_name, cf_state) in column_families {
355 if !cf_entries.contains_key(cf_name) {
356 continue; }
358
359 let backend = handle_pool.acquire(cf_name)?;
361
362 let segments = cf_state.segments.read().unwrap().clone();
364 let file_growth_lock = handle_pool.file_growth_lock();
365
366 let partition_backend = PartitionedStorageBackend::with_segments(
367 backend,
368 segments,
369 None, file_growth_lock,
371 );
372
373 let db = ManuallyDrop::new(Database::builder().create_with_backend(partition_backend)?);
376
377 recovery_dbs.insert(cf_name.clone(), db);
378 }
379
380 for (cf_name, entries_for_cf) in &cf_entries {
382 let db = recovery_dbs.get(cf_name).ok_or_else(|| {
383 DatabaseError::Storage(StorageError::from(io::Error::new(
384 io::ErrorKind::NotFound,
385 format!("No Database for CF '{cf_name}'"),
386 )))
387 })?;
388
389 let mem = db.get_memory();
390
391 for entry in entries_for_cf {
392 let data_root =
394 entry
395 .payload
396 .user_root
397 .map(|(page_num, checksum, length)| BtreeHeader {
398 root: page_num,
399 checksum,
400 length,
401 });
402
403 let system_root = entry
404 .payload
405 .system_root
406 .map(|(page_num, checksum, length)| BtreeHeader {
407 root: page_num,
408 checksum,
409 length,
410 });
411
412 mem.apply_wal_transaction(
414 data_root,
415 system_root,
416 TransactionId::new(entry.transaction_id),
417 )?;
418 }
419 }
420
421 for (cf_name, db) in &recovery_dbs {
424 let last_entry = cf_entries
426 .get(cf_name)
427 .and_then(|entries| entries.last())
428 .ok_or_else(|| {
429 DatabaseError::Storage(StorageError::from(io::Error::new(
430 io::ErrorKind::NotFound,
431 format!("No entries for CF '{cf_name}'"),
432 )))
433 })?;
434
435 let mem = db.get_memory();
436 let data_root = mem.get_data_root();
437 let system_root = mem.get_system_root();
438 let txn_id = TransactionId::new(last_entry.transaction_id);
439
440 mem.commit(
443 data_root,
444 system_root,
445 txn_id,
446 false,
447 crate::tree_store::ShrinkPolicy::Never,
448 )
449 .map_err(|e| {
450 DatabaseError::Storage(StorageError::from(io::Error::other(format!(
451 "recovery commit failed for '{cf_name}': {e}"
452 ))))
453 })?;
454
455 #[cfg(feature = "logging")]
456 log::debug!(
457 "Recovered CF '{cf_name}' to transaction {}",
458 txn_id.raw_id()
459 );
460 }
461
462 let latest_seq = entries.last().unwrap().sequence;
464 journal
465 .truncate(latest_seq + 1)
466 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
467
468 #[cfg(feature = "logging")]
469 log::info!("WAL recovery completed successfully");
470
471 Ok(())
474 }
475
476 #[cfg(not(target_arch = "wasm32"))]
478 pub(crate) fn open_with_builder(
479 path: PathBuf,
480 pool_size: usize,
481 ) -> Result<Self, DatabaseError> {
482 let file = std::fs::OpenOptions::new()
483 .read(true)
484 .write(true)
485 .create(true)
486 .truncate(false)
487 .open(&path)
488 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
489
490 let header_backend = Arc::new(FileBackend::new(file)?);
491
492 let is_new = header_backend
493 .len()
494 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
495 == 0;
496
497 let header = if is_new {
498 let header = MasterHeader::new();
499 let header_bytes = header
500 .to_bytes()
501 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
502
503 header_backend
504 .write(0, &header_bytes)
505 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
506 header_backend
507 .sync_data()
508 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
509
510 header
511 } else {
512 let mut header_bytes = vec![0u8; PAGE_SIZE];
513 header_backend
514 .read(0, &mut header_bytes)
515 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
516
517 MasterHeader::from_bytes(&header_bytes)
518 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?
519 };
520
521 let handle_pool = Arc::new(FileHandlePool::new(path.clone(), pool_size));
522 let header = Arc::new(RwLock::new(header));
523
524 let mut column_families = HashMap::new();
525 for cf_meta in &header.read().unwrap().column_families {
526 let state = ColumnFamilyState::new(cf_meta.name.clone(), cf_meta.segments.clone());
527 column_families.insert(cf_meta.name.clone(), Arc::new(state));
528 }
529
530 let wal_journal = if pool_size > 0 {
532 let wal_path = path.with_extension("wal");
533 let journal = WALJournal::open(&wal_path)
534 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
535
536 let entries = journal
539 .read_from(0)
540 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
541
542 if !entries.is_empty() {
543 Self::perform_wal_recovery(&column_families, &handle_pool, &journal)?;
544 }
545
546 Some(Arc::new(journal))
547 } else {
548 None
549 };
550
551 let checkpoint_manager = if let Some(ref journal_arc) = wal_journal {
553 let config = CheckpointConfig {
554 interval: std::time::Duration::from_secs(60),
555 max_wal_size: 64 * 1024 * 1024,
556 };
557
558 let db_arc = Arc::new(Self {
560 path: path.clone(),
561 header_backend: Arc::clone(&header_backend),
562 handle_pool: Arc::clone(&handle_pool),
563 column_families: Arc::new(RwLock::new(column_families.clone())),
564 header: Arc::clone(&header),
565 wal_journal: Some(Arc::clone(journal_arc)),
566 checkpoint_manager: None, });
568
569 let manager = CheckpointManager::start(Arc::clone(journal_arc), db_arc, config);
570
571 Some(Arc::new(manager))
572 } else {
573 None
574 };
575
576 Ok(Self {
577 path,
578 header_backend,
579 handle_pool,
580 column_families: Arc::new(RwLock::new(column_families)),
581 header,
582 wal_journal,
583 checkpoint_manager,
584 })
585 }
586
587 pub fn create_column_family(
602 &self,
603 name: impl Into<String>,
604 size: Option<u64>,
605 ) -> Result<ColumnFamily, ColumnFamilyError> {
606 let name = name.into();
607 let size = size.unwrap_or(DEFAULT_COLUMN_FAMILY_SIZE);
608
609 let mut cfs = self.column_families.write().unwrap();
610
611 if cfs.contains_key(&name) {
612 return Err(ColumnFamilyError::AlreadyExists(name));
613 }
614
615 let (segments, cf_name) = {
616 let mut header = self.header.write().unwrap();
617 let offset = header.end_of_file();
618 let metadata = ColumnFamilyMetadata::new(name.clone(), offset, size);
619
620 header.column_families.push(metadata.clone());
621
622 let header_bytes = header.to_bytes()?;
623 self.header_backend.write(0, &header_bytes)?;
624 self.header_backend.sync_data()?;
625
626 let new_file_size = offset + size;
633 let current_file_size = self.header_backend.len().map_err(ColumnFamilyError::Io)?;
634
635 if new_file_size > current_file_size {
636 self.header_backend
638 .set_len(new_file_size)
639 .map_err(ColumnFamilyError::Io)?;
640
641 }
644
645 (metadata.segments, metadata.name.clone())
646 };
647
648 let state = Arc::new(ColumnFamilyState::new(name.clone(), segments));
649 cfs.insert(name.clone(), Arc::clone(&state));
650
651 #[cfg(not(target_arch = "wasm32"))]
652 {
653 Ok(ColumnFamily {
654 name: cf_name,
655 state,
656 pool: self.handle_pool.clone(),
657 path: self.path.clone(),
658 header: self.header.clone(),
659 header_backend: self.header_backend.clone(),
660 wal_journal: self.wal_journal.clone(),
661 checkpoint_manager: self.checkpoint_manager.clone(),
662 })
663 }
664 #[cfg(target_arch = "wasm32")]
665 {
666 Ok(ColumnFamily {
667 name: cf_name,
668 state,
669 backend: self.header_backend.clone(),
670 header: self.header.clone(),
671 header_backend: self.header_backend.clone(),
672 file_growth_lock: self.file_growth_lock.clone(),
673 wal_journal: self.wal_journal.clone(),
674 checkpoint_manager: self.checkpoint_manager.clone(),
675 })
676 }
677 }
678
679 pub fn column_family(&self, name: &str) -> Result<ColumnFamily, ColumnFamilyError> {
687 let cfs = self.column_families.read().unwrap();
688
689 match cfs.get(name) {
690 Some(state) => {
691 #[cfg(not(target_arch = "wasm32"))]
692 {
693 Ok(ColumnFamily {
694 name: name.to_string(),
695 state: state.clone(),
696 pool: self.handle_pool.clone(),
697 path: self.path.clone(),
698 header: self.header.clone(),
699 header_backend: self.header_backend.clone(),
700 wal_journal: self.wal_journal.clone(),
701 checkpoint_manager: self.checkpoint_manager.clone(),
702 })
703 }
704 #[cfg(target_arch = "wasm32")]
705 {
706 Ok(ColumnFamily {
707 name: name.to_string(),
708 state: state.clone(),
709 backend: self.header_backend.clone(),
710 header: self.header.clone(),
711 header_backend: self.header_backend.clone(),
712 file_growth_lock: self.file_growth_lock.clone(),
713 wal_journal: self.wal_journal.clone(),
714 checkpoint_manager: self.checkpoint_manager.clone(),
715 })
716 }
717 }
718 None => Err(ColumnFamilyError::NotFound(name.to_string())),
719 }
720 }
721
722 pub fn column_family_or_create(&self, name: &str) -> Result<ColumnFamily, ColumnFamilyError> {
746 {
748 let cfs = self.column_families.read().unwrap();
749 if let Some(state) = cfs.get(name) {
750 #[cfg(not(target_arch = "wasm32"))]
751 {
752 return Ok(ColumnFamily {
753 name: name.to_string(),
754 state: state.clone(),
755 pool: self.handle_pool.clone(),
756 path: self.path.clone(),
757 header: self.header.clone(),
758 header_backend: self.header_backend.clone(),
759 wal_journal: self.wal_journal.clone(),
760 checkpoint_manager: self.checkpoint_manager.clone(),
761 });
762 }
763 #[cfg(target_arch = "wasm32")]
764 {
765 return Ok(ColumnFamily {
766 name: name.to_string(),
767 state: state.clone(),
768 backend: self.header_backend.clone(),
769 header: self.header.clone(),
770 header_backend: self.header_backend.clone(),
771 wal_journal: self.wal_journal.clone(),
772 checkpoint_manager: self.checkpoint_manager.clone(),
773 });
774 }
775 }
776 }
777
778 self.create_column_family(name, None)
780 }
781
782 pub fn list_column_families(&self) -> Vec<String> {
784 let header = self.header.read().unwrap();
785 header
786 .column_families
787 .iter()
788 .map(|cf| cf.name.clone())
789 .collect()
790 }
791
792 #[cfg(target_arch = "wasm32")]
797 pub fn enable_wal(
798 &mut self,
799 wal_backend: Arc<dyn StorageBackend>,
800 ) -> Result<(), DatabaseError> {
801 use crate::column_family::wal::checkpoint::CheckpointManager;
802 use crate::column_family::wal::config::CheckpointConfig;
803 use crate::column_family::wal::journal::WALJournal;
804
805 let journal = WALJournal::new(wal_backend)
807 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
808 let journal_arc = Arc::new(journal);
809
810 let config = CheckpointConfig {
812 interval: std::time::Duration::from_secs(15), max_wal_size: 32 * 1024 * 1024, };
815
816 self.wal_journal = Some(Arc::clone(&journal_arc));
819
820 let db_arc = unsafe { Arc::from_raw(self as *const Self) };
823
824 let checkpoint_mgr =
825 CheckpointManager::start(Arc::clone(&journal_arc), Arc::clone(&db_arc), config);
826
827 std::mem::forget(db_arc);
829
830 self.checkpoint_manager = Some(Arc::new(checkpoint_mgr));
831
832 Ok(())
833 }
834
835 #[cfg(not(target_arch = "wasm32"))]
844 pub fn checkpoint(&self) -> Result<(), DatabaseError> {
845 if let Some(checkpoint_mgr) = self.checkpoint_manager.as_ref() {
846 checkpoint_mgr
847 .checkpoint_now()
848 .map_err(|e| DatabaseError::Storage(StorageError::from(e)))?;
849 }
850 Ok(())
851 }
852
853 #[cfg(target_arch = "wasm32")]
862 pub fn checkpoint(&self) -> Result<(), DatabaseError> {
863 Ok(())
865 }
866
867 #[cfg(not(target_arch = "wasm32"))]
869 pub fn path(&self) -> &Path {
870 &self.path
871 }
872
873 #[cfg(target_arch = "wasm32")]
875 pub fn file_name(&self) -> &str {
876 &self.file_name
877 }
878
879 pub fn delete_column_family(&self, name: &str) -> Result<(), ColumnFamilyError> {
886 let mut cfs = self.column_families.write().unwrap();
887
888 if !cfs.contains_key(name) {
889 return Err(ColumnFamilyError::NotFound(name.to_string()));
890 }
891
892 cfs.remove(name);
893
894 let mut header = self.header.write().unwrap();
895
896 let cf_idx = header
897 .column_families
898 .iter()
899 .position(|cf| cf.name == name)
900 .ok_or_else(|| ColumnFamilyError::NotFound(name.to_string()))?;
901
902 let cf_meta = header.column_families.remove(cf_idx);
903 for segment in cf_meta.segments {
904 header
905 .free_segments
906 .push(FreeSegment::new(segment.offset, segment.size));
907 }
908
909 let header_bytes = header.to_bytes()?;
910 self.header_backend.write(0, &header_bytes)?;
911 self.header_backend.sync_data()?;
912
913 Ok(())
914 }
915
916 #[cfg(not(target_arch = "wasm32"))]
918 fn allocate_segment_internal(
919 cf_name: &str,
920 size: u64,
921 header: &Arc<RwLock<MasterHeader>>,
922 _header_backend: &Arc<FileBackend>,
923 state: &Arc<ColumnFamilyState>,
924 ) -> io::Result<Segment> {
925 let allocated_segment = {
927 let mut hdr = header.write().unwrap();
928
929 let mut best_fit_idx = None;
930 let mut best_fit_size = u64::MAX;
931
932 for (idx, free_seg) in hdr.free_segments.iter().enumerate() {
933 if free_seg.size >= size && free_seg.size < best_fit_size {
934 best_fit_idx = Some(idx);
935 best_fit_size = free_seg.size;
936 }
937 }
938
939 let allocated_segment = if let Some(idx) = best_fit_idx {
940 let free_seg = hdr.free_segments.remove(idx);
941
942 if free_seg.size == size {
943 Segment::new(free_seg.offset, free_seg.size)
944 } else {
945 let allocated = Segment::new(free_seg.offset, size);
946 let remaining = FreeSegment::new(free_seg.offset + size, free_seg.size - size);
947 hdr.free_segments.push(remaining);
948 allocated
949 }
950 } else {
951 let offset = hdr.end_of_file();
952 let aligned_offset = offset.div_ceil(PAGE_SIZE as u64) * PAGE_SIZE as u64;
953 Segment::new(aligned_offset, size)
954 };
955
956 if let Some(cf_meta) = hdr.column_families.iter_mut().find(|cf| cf.name == cf_name) {
957 cf_meta.segments.push(allocated_segment.clone());
958 }
959
960 allocated_segment
961 }; let mut state_segments = state.segments.write().unwrap();
965 state_segments.push(allocated_segment.clone());
966
967 Ok(allocated_segment)
972 }
973
974 #[cfg(target_arch = "wasm32")]
976 fn allocate_segment_internal(
977 cf_name: &str,
978 size: u64,
979 header: &Arc<RwLock<MasterHeader>>,
980 _header_backend: &Arc<dyn StorageBackend>,
981 state: &Arc<ColumnFamilyState>,
982 ) -> io::Result<Segment> {
983 let allocated_segment = {
985 let mut hdr = header.write().unwrap();
986
987 let mut best_fit_idx = None;
988 let mut best_fit_size = u64::MAX;
989
990 for (idx, free_seg) in hdr.free_segments.iter().enumerate() {
991 if free_seg.size >= size && free_seg.size < best_fit_size {
992 best_fit_idx = Some(idx);
993 best_fit_size = free_seg.size;
994 }
995 }
996
997 let allocated_segment = if let Some(idx) = best_fit_idx {
998 let free_seg = hdr.free_segments.remove(idx);
999
1000 if free_seg.size == size {
1001 Segment::new(free_seg.offset, free_seg.size)
1002 } else {
1003 let allocated = Segment::new(free_seg.offset, size);
1004 let remaining = FreeSegment::new(free_seg.offset + size, free_seg.size - size);
1005 hdr.free_segments.push(remaining);
1006 allocated
1007 }
1008 } else {
1009 let offset = hdr.end_of_file();
1010 let aligned_offset = offset.div_ceil(PAGE_SIZE as u64) * PAGE_SIZE as u64;
1011 Segment::new(aligned_offset, size)
1012 };
1013
1014 if let Some(cf_meta) = hdr.column_families.iter_mut().find(|cf| cf.name == cf_name) {
1015 cf_meta.segments.push(allocated_segment.clone());
1016 }
1017
1018 allocated_segment
1019 }; let mut segments = state.segments.write().unwrap();
1023 segments.push(allocated_segment.clone());
1024
1025 Ok(allocated_segment)
1026 }
1027}
1028
1029#[derive(Clone)]
1035pub struct ColumnFamily {
1036 name: String,
1037 state: Arc<ColumnFamilyState>,
1038 #[cfg(not(target_arch = "wasm32"))]
1039 pool: Arc<FileHandlePool>,
1040 #[cfg(not(target_arch = "wasm32"))]
1041 path: PathBuf,
1042 #[cfg(not(target_arch = "wasm32"))]
1043 header_backend: Arc<FileBackend>,
1044 #[cfg(target_arch = "wasm32")]
1045 header_backend: Arc<dyn StorageBackend>,
1046 #[cfg(target_arch = "wasm32")]
1047 backend: Arc<dyn StorageBackend>,
1048 #[cfg(target_arch = "wasm32")]
1049 file_growth_lock: Arc<std::sync::Mutex<()>>,
1050 header: Arc<RwLock<MasterHeader>>,
1051 wal_journal: Option<Arc<WALJournal>>,
1052 checkpoint_manager: Option<Arc<CheckpointManager>>,
1053}
1054
1055impl ColumnFamily {
1056 pub fn name(&self) -> &str {
1058 &self.name
1059 }
1060
1061 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1071 let db = self.ensure_database().map_err(|e| match e {
1072 DatabaseError::Storage(s) => TransactionError::Storage(s),
1073 _ => TransactionError::Storage(StorageError::from(io::Error::other(format!(
1074 "database initialization error: {e}"
1075 )))),
1076 })?;
1077
1078 let mut txn = db.begin_write()?;
1079
1080 #[cfg(not(target_arch = "wasm32"))]
1082 if let Some(wal_journal) = &self.wal_journal {
1083 txn.set_wal_context(
1084 self.name.clone(),
1085 Arc::clone(wal_journal),
1086 self.checkpoint_manager.as_ref().map(Arc::clone),
1087 );
1088 }
1089
1090 Ok(txn)
1091 }
1092
1093 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
1097 let db = self.ensure_database().map_err(|e| match e {
1098 DatabaseError::Storage(s) => TransactionError::Storage(s),
1099 _ => TransactionError::Storage(StorageError::from(io::Error::other(format!(
1100 "database initialization error: {e}"
1101 )))),
1102 })?;
1103 db.begin_read()
1104 }
1105
1106 #[cfg(not(target_arch = "wasm32"))]
1115 pub fn release_handle(&self) {
1116 self.pool.release(&self.name);
1117 self.state.evict_database();
1118 }
1119
1120 #[cfg(target_arch = "wasm32")]
1122 pub fn release_handle(&self) {
1123 self.state.evict_database();
1124 }
1125
1126 #[cfg(not(target_arch = "wasm32"))]
1128 pub(crate) fn ensure_database(&self) -> Result<Arc<Database>, DatabaseError> {
1129 let name = self.name.clone();
1130 let header = self.header.clone();
1131 let header_backend = self.header_backend.clone();
1132
1133 let state = self.state.clone();
1134
1135 let expansion_callback = Arc::new(move |requested_size: u64| -> io::Result<Segment> {
1136 ColumnFamilyDatabase::allocate_segment_internal(
1137 &name,
1138 requested_size,
1139 &header,
1140 &header_backend,
1141 &state,
1142 )
1143 });
1144
1145 self.state
1146 .ensure_database(&self.pool, &self.path, expansion_callback)
1147 }
1148
1149 #[cfg(target_arch = "wasm32")]
1151 pub(crate) fn ensure_database(&self) -> Result<Arc<Database>, DatabaseError> {
1152 let name = self.name.clone();
1153 let header = self.header.clone();
1154 let header_backend = self.header_backend.clone();
1155 let state = self.state.clone();
1156
1157 let expansion_callback = Arc::new(move |requested_size: u64| -> io::Result<Segment> {
1158 ColumnFamilyDatabase::allocate_segment_internal(
1159 &name,
1160 requested_size,
1161 &header,
1162 &header_backend,
1163 &state,
1164 )
1165 });
1166
1167 self.state.ensure_database_wasm(
1168 &self.backend,
1169 expansion_callback,
1170 self.file_growth_lock.clone(),
1171 )
1172 }
1173}
1174
1175impl Drop for ColumnFamilyDatabase {
1176 fn drop(&mut self) {
1177 #[cfg(not(target_arch = "wasm32"))]
1179 if self.wal_journal.is_some() {
1180 for cf_name in self.list_column_families() {
1182 if let Ok(cf) = self.column_family(&cf_name)
1183 && let Ok(db) = cf.ensure_database()
1184 {
1185 let mem = db.get_memory();
1186 if let Ok((data_root, system_root, txn_id)) = mem.get_current_secondary_state()
1187 {
1188 let _ = mem.checkpoint_commit(data_root, system_root, txn_id);
1189 }
1190 }
1191 }
1192 }
1193
1194 #[cfg(not(target_arch = "wasm32"))]
1196 if let Some(checkpoint_mgr) = self.checkpoint_manager.take() {
1197 if let Ok(manager) = Arc::try_unwrap(checkpoint_mgr) {
1199 let _ = manager.shutdown();
1200 }
1201 }
1204
1205 let _ = self.header_backend.close();
1207 }
1208}