1#[cfg(feature = "write-support")]
26pub mod cql_to_mutation;
27#[cfg(feature = "write-support")]
28pub mod export;
29#[cfg(feature = "write-support")]
30pub mod memtable;
31#[cfg(feature = "write-support")]
32pub mod merge;
33#[cfg(feature = "write-support")]
34pub mod merge_policy;
35#[cfg(feature = "write-support")]
36pub mod mutation;
37#[cfg(feature = "write-support")]
38pub mod wal;
39
40#[cfg(feature = "write-support")]
41pub use export::{ExportOptions, ExportReport};
42#[cfg(feature = "write-support")]
43pub use memtable::Memtable;
44#[cfg(feature = "write-support")]
45pub use merge::KWayMerger;
46#[cfg(feature = "write-support")]
47pub use merge_policy::STCSPolicy;
48#[cfg(feature = "write-support")]
49pub use mutation::{
50 CellOperation, ClusteringBound, ClusteringKey, DecoratedKey, Mutation, PartitionKey,
51 PartitionTombstone, RangeTombstone, TableId,
52};
53#[cfg(feature = "write-support")]
54pub use wal::WriteAheadLog;
55
56use crate::error::{Error, Result};
57use crate::schema::TableSchema;
58use crate::storage::sstable::writer::SSTableInfo;
59use std::path::{Path, PathBuf};
60use std::sync::atomic::{AtomicBool, Ordering};
61use std::time::{Duration, Instant};
62
63#[cfg(feature = "write-support")]
65#[derive(Debug, Clone)]
66pub struct MaintenanceReport {
67 pub time_spent: Duration,
69 pub completed_merges: Vec<PathBuf>,
71 pub rows_merged: u64,
73 pub bytes_written: u64,
75 pub pending_compaction: bool,
77}
78
79#[cfg(feature = "write-support")]
84#[derive(Debug, Clone, Default)]
85pub struct CompactionStats {
86 pub compactions_completed: u64,
88 pub sstables_merged_in: u64,
90 pub sstables_produced: u64,
92 pub bytes_read: u64,
94 pub bytes_written: u64,
96 pub rows_merged: u64,
98 pub total_time: Duration,
100}
101
102#[cfg(feature = "write-support")]
108pub trait MergePolicy: Send + std::fmt::Debug {
109 fn select_merge(&self, candidates: &[PathBuf]) -> Result<Vec<PathBuf>>;
120}
121
122#[cfg(feature = "write-support")]
124#[derive(Debug)]
125struct ActiveMerge {
126 merger: KWayMerger,
128 writer: crate::storage::sstable::writer::SSTableWriter,
130 input_paths: Vec<PathBuf>,
132 tmp_dir: PathBuf,
143 sstable_dir: PathBuf,
147 rows_merged: u64,
149 bytes_read: u64,
151 started_at: Instant,
153}
154
155#[cfg(feature = "write-support")]
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
196pub enum Durability {
197 #[default]
203 SyncEachWrite,
204
205 Disabled,
212}
213
214#[cfg(feature = "write-support")]
216#[derive(Debug, Clone)]
217pub struct WriteEngineConfig {
218 pub data_dir: PathBuf,
220 pub wal_dir: PathBuf,
222 pub memtable_flush_threshold: usize,
224 pub memtable_hard_limit: usize,
227 pub schema: TableSchema,
229 pub durability: Durability,
231}
232
233#[cfg(feature = "write-support")]
234impl WriteEngineConfig {
235 pub const DEFAULT_FLUSH_THRESHOLD: usize = 64 * 1024 * 1024;
237 pub const DEFAULT_HARD_LIMIT: usize = 256 * 1024 * 1024;
239
240 pub fn new(data_dir: PathBuf, wal_dir: PathBuf, schema: TableSchema) -> Self {
242 Self {
243 data_dir,
244 wal_dir,
245 memtable_flush_threshold: Self::DEFAULT_FLUSH_THRESHOLD,
246 memtable_hard_limit: Self::DEFAULT_HARD_LIMIT,
247 schema,
248 durability: Durability::default(),
249 }
250 }
251
252 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
254 self.memtable_flush_threshold = threshold;
255 self
256 }
257
258 pub fn with_hard_limit(mut self, limit: usize) -> Self {
260 self.memtable_hard_limit = limit;
261 self
262 }
263
264 pub fn with_durability(mut self, durability: Durability) -> Self {
279 self.durability = durability;
280 self
281 }
282}
283
284#[cfg(feature = "write-support")]
324#[derive(Debug)]
325pub struct WriteEngine {
326 config: WriteEngineConfig,
328 wal: WriteAheadLog,
330 memtable: Memtable,
332 generation: u64,
334 closed: AtomicBool,
336 active_merge: Option<ActiveMerge>,
338 merge_policy: Option<Box<dyn MergePolicy>>,
340 cumulative_stats: CompactionStats,
342}
343
344#[cfg(feature = "write-support")]
351fn reject_counter_cells(mutation: &Mutation) -> Result<()> {
352 for op in &mutation.operations {
353 match op {
354 CellOperation::Write { value, .. } | CellOperation::WriteWithTtl { value, .. } => {
355 if matches!(value, crate::types::Value::Counter(_)) {
356 return Err(Error::invalid_operation(
357 "counter writes are not supported via the standard mutation path; \
358 counter columns require server-side distributed increment semantics",
359 ));
360 }
361 }
362 _ => {}
363 }
364 }
365 Ok(())
366}
367
368#[cfg(feature = "write-support")]
369impl WriteEngine {
370 pub fn new(config: WriteEngineConfig) -> Result<Self> {
390 std::fs::create_dir_all(&config.data_dir).map_err(|e| {
392 Error::Storage(format!(
393 "Failed to create data directory {:?}: {}",
394 config.data_dir, e
395 ))
396 })?;
397
398 std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
399 Error::Storage(format!(
400 "Failed to create WAL directory {:?}: {}",
401 config.wal_dir, e
402 ))
403 })?;
404
405 Self::sweep_orphaned_compaction_tmp(&config.data_dir);
423 Self::sweep_orphaned_partial_sstables(
424 &config.data_dir,
425 &config.schema.keyspace,
426 &config.schema.table,
427 );
428
429 let wal_path = config.wal_dir.join(WriteAheadLog::WAL_FILENAME);
431 let wal = if wal_path.exists() {
432 WriteAheadLog::open_existing(&wal_path)?
434 } else {
435 WriteAheadLog::create(&config.wal_dir)?
437 };
438
439 let mut memtable = Memtable::new();
441 let mutations = wal.replay()?;
442
443 if !mutations.is_empty() {
444 log::info!("Replaying {} mutations from WAL", mutations.len());
445
446 for mutation in mutations {
447 let decorated_key = mutation.decorated_key(&config.schema)?;
449
450 memtable.insert_with_key(decorated_key, mutation)?;
452 }
453
454 log::info!(
455 "WAL replay complete: {} rows in memtable, {} bytes",
456 memtable.row_count(),
457 memtable.size_bytes()
458 );
459 }
460
461 let generation = Self::determine_next_generation(&config.data_dir)?;
463
464 Ok(Self {
465 config,
466 wal,
467 memtable,
468 generation,
469 closed: AtomicBool::new(false),
470 active_merge: None,
471 merge_policy: None,
472 cumulative_stats: CompactionStats::default(),
473 })
474 }
475
476 pub fn write(&mut self, mutation: Mutation) -> Result<()> {
501 if self.closed.load(Ordering::SeqCst) {
502 return Err(Error::InvalidInput(
503 "WriteEngine has been closed".to_string(),
504 ));
505 }
506
507 reject_counter_cells(&mutation)?;
508
509 if self.memtable.size_bytes() >= self.config.memtable_hard_limit {
511 return Err(Error::Storage(format!(
512 "Memtable at hard limit ({} bytes >= {} bytes). Flush required before accepting more writes.",
513 self.memtable.size_bytes(),
514 self.config.memtable_hard_limit
515 )));
516 }
517
518 if self.config.durability == Durability::SyncEachWrite {
520 self.wal.append(&mutation)?;
521 self.wal.sync()?;
522 }
523
524 let decorated_key = mutation.decorated_key(&self.config.schema)?;
526
527 self.memtable.insert_with_key(decorated_key, mutation)?;
529
530 if self
532 .memtable
533 .should_flush(self.config.memtable_flush_threshold)
534 {
535 log::warn!(
536 "Memtable size {} exceeds threshold {} - call flush() manually in async context",
537 self.memtable.size_bytes(),
538 self.config.memtable_flush_threshold
539 );
540
541 if tokio::runtime::Handle::try_current().is_err() {
543 log::info!("Triggering automatic flush");
544 self.flush_internal()?;
545 }
546 }
547
548 Ok(())
549 }
550
551 pub async fn write_async(&mut self, mutation: Mutation) -> Result<()> {
572 if self.closed.load(Ordering::SeqCst) {
573 return Err(Error::InvalidInput(
574 "WriteEngine has been closed".to_string(),
575 ));
576 }
577
578 reject_counter_cells(&mutation)?;
579
580 if self.memtable.size_bytes() >= self.config.memtable_hard_limit {
582 return Err(Error::Storage(format!(
583 "Memtable at hard limit ({} bytes >= {} bytes). Flush required before accepting more writes.",
584 self.memtable.size_bytes(),
585 self.config.memtable_hard_limit
586 )));
587 }
588
589 if self.config.durability == Durability::SyncEachWrite {
591 self.wal.append(&mutation)?;
592 self.wal.sync()?;
593 }
594
595 let decorated_key = mutation.decorated_key(&self.config.schema)?;
597
598 self.memtable.insert_with_key(decorated_key, mutation)?;
600
601 if self
603 .memtable
604 .should_flush(self.config.memtable_flush_threshold)
605 {
606 log::info!(
607 "Memtable size {} exceeds threshold {}, triggering flush",
608 self.memtable.size_bytes(),
609 self.config.memtable_flush_threshold
610 );
611 self.flush_internal_async().await?;
612 }
613
614 Ok(())
615 }
616
617 pub fn execute(&mut self, statement: &str) -> Result<()> {
646 if self.closed.load(Ordering::SeqCst) {
647 return Err(Error::InvalidInput(
648 "WriteEngine has been closed".to_string(),
649 ));
650 }
651
652 let trimmed = statement.trim();
653
654 if trimmed.len() >= 5 && trimmed.as_bytes()[..5].eq_ignore_ascii_case(b"BEGIN") {
656 let mutations =
657 cql_to_mutation::convert_cql_to_mutations(trimmed, &self.config.schema)?;
658 for mutation in mutations {
659 self.write(mutation)?;
660 }
661 Ok(())
662 } else {
663 let mutation = self.parse_cql_to_mutation(statement)?;
664 self.write(mutation)
665 }
666 }
667
668 pub async fn flush(&mut self) -> Result<Option<SSTableInfo>> {
685 if self.closed.load(Ordering::SeqCst) {
686 return Err(Error::InvalidInput(
687 "WriteEngine has been closed".to_string(),
688 ));
689 }
690
691 self.flush_internal_async().await
692 }
693
694 fn flush_internal(&mut self) -> Result<()> {
700 merge::block_on_async(self.flush_internal_async())?;
701 Ok(())
702 }
703
704 async fn flush_internal_async(&mut self) -> Result<Option<SSTableInfo>> {
706 if self.memtable.is_empty() {
708 return Ok(None);
709 }
710
711 log::info!(
712 "Flushing memtable: {} partitions, {} rows, {} bytes",
713 self.memtable.iter().count(),
714 self.memtable.row_count(),
715 self.memtable.size_bytes()
716 );
717
718 let partition_count_hint = self.memtable.iter().count();
720 let mut writer = crate::storage::sstable::writer::SSTableWriter::with_expected_partitions(
721 self.config.data_dir.clone(),
722 self.generation,
723 &self.config.schema,
724 partition_count_hint,
725 )?;
726
727 for (decorated_key, mutations) in self.memtable.iter() {
729 writer.write_partition(decorated_key.clone(), mutations.to_vec())?;
730 }
731
732 let info = writer.finish().await?;
734
735 log::info!(
736 "SSTable flush complete: generation {}, {} partitions, {} bytes",
737 self.generation,
738 info.partition_count,
739 info.data_size
740 );
741
742 if let Err(e) = self.wal.truncate() {
745 log::warn!(
746 "Failed to truncate WAL after successful SSTable flush: {}. \
747 Data is safe in SSTable, but WAL cleanup failed.",
748 e
749 );
750 }
752
753 self.memtable.clear();
755
756 self.generation += 1;
758
759 Ok(Some(info))
760 }
761
762 pub async fn close(&mut self) -> Result<()> {
780 if self.closed.swap(true, Ordering::SeqCst) {
782 return Ok(());
783 }
784
785 log::info!("Closing WriteEngine");
786
787 if !self.memtable.is_empty() {
789 log::info!("Flushing memtable before close");
790
791 match self.flush_internal_async().await {
793 Ok(_) => {
794 log::info!("Memtable flushed successfully");
795 }
796 Err(e) => {
797 log::error!("Failed to flush memtable during close: {}", e);
799 self.closed.store(false, Ordering::SeqCst);
801 return Err(e);
802 }
803 }
804 }
805
806 if let Err(e) = self.wal.sync() {
808 log::warn!("Failed to sync WAL during close: {}", e);
809 }
811
812 log::info!("WriteEngine closed");
813
814 Ok(())
815 }
816
817 pub fn memtable_size(&self) -> usize {
819 self.memtable.size_bytes()
820 }
821
822 pub fn memtable_row_count(&self) -> usize {
824 self.memtable.row_count()
825 }
826
827 pub fn wal_size(&self) -> u64 {
829 self.wal.size()
830 }
831
832 pub fn generation(&self) -> u64 {
834 self.generation
835 }
836
837 fn parse_cql_to_mutation(&self, statement: &str) -> Result<Mutation> {
841 cql_to_mutation::convert_cql_to_mutation(statement, &self.config.schema)
842 }
843
844 fn determine_next_generation(data_dir: &Path) -> Result<u64> {
849 let mut max_generation = 0u64;
850
851 if !data_dir.exists() {
852 return Ok(1);
853 }
854
855 Self::scan_generations(
857 data_dir,
858 &mut max_generation,
859 crate::storage::sstable::MAX_SSTABLE_SCAN_DEPTH,
860 )?;
861
862 Ok(max_generation + 1)
863 }
864
865 fn scan_generations(dir: &Path, max_generation: &mut u64, depth: usize) -> Result<()> {
867 for entry in std::fs::read_dir(dir)
868 .map_err(|e| Error::Storage(format!("Failed to read data directory: {}", e)))?
869 {
870 let entry = entry
871 .map_err(|e| Error::Storage(format!("Failed to read directory entry: {}", e)))?;
872
873 let filename = entry.file_name();
874 let filename_str = filename.to_string_lossy();
875
876 if filename_str.starts_with("nb-") && filename_str.contains("-big-") {
878 if let Some(gen_str) = filename_str
879 .strip_prefix("nb-")
880 .and_then(|s| s.split('-').next())
881 {
882 if let Ok(gen) = gen_str.parse::<u64>() {
883 *max_generation = (*max_generation).max(gen);
884 }
885 }
886 } else if depth > 0 {
887 let path = entry.path();
888 if path.is_dir() {
889 Self::scan_generations(&path, max_generation, depth - 1)?;
890 }
891 }
892 }
893 Ok(())
894 }
895
896 pub fn set_merge_policy(&mut self, policy: Box<dyn MergePolicy>) -> Result<()> {
902 self.merge_policy = Some(policy);
903 Ok(())
904 }
905
906 pub fn maintenance_stats(&self) -> CompactionStats {
925 self.cumulative_stats.clone()
926 }
927
928 pub fn maintenance_step(&mut self, budget: Duration) -> Result<MaintenanceReport> {
1000 if self.closed.load(Ordering::SeqCst) {
1001 return Err(Error::InvalidInput(
1002 "WriteEngine has been closed".to_string(),
1003 ));
1004 }
1005
1006 let start = Instant::now();
1007 let mut report = MaintenanceReport {
1008 time_spent: Duration::from_secs(0),
1009 completed_merges: Vec::new(),
1010 rows_merged: 0,
1011 bytes_written: 0,
1012 pending_compaction: false,
1013 };
1014
1015 let merge_policy = match &self.merge_policy {
1017 Some(policy) => policy,
1018 None => {
1019 report.time_spent = start.elapsed();
1020 return Ok(report);
1021 }
1022 };
1023
1024 if self.active_merge.is_none() {
1026 let candidates = self.scan_sstable_candidates()?;
1027 let selected = merge_policy.select_merge(&candidates)?;
1028
1029 if !selected.is_empty() {
1030 self.start_merge(selected)?;
1032 } else {
1033 report.time_spent = start.elapsed();
1035 report.pending_compaction = false;
1036 return Ok(report);
1037 }
1038 }
1039
1040 let budget_tolerance = budget.mul_f32(1.1); let mut partitions_processed = 0;
1043
1044 while let Some(merge) = &mut self.active_merge {
1045 if partitions_processed > 0 && start.elapsed() >= budget_tolerance {
1047 break;
1048 }
1049
1050 let step = merge.merger.step()?;
1052
1053 match step {
1054 merge::MergeStep::Partition { key, rows } => {
1055 partitions_processed += 1;
1056 let row_count = rows.len() as u64;
1057
1058 let entries_vec: Vec<_> = rows.into_iter().collect();
1061
1062 let mutations = entries_vec
1064 .into_iter()
1065 .map(|entry| self.merge_entry_to_mutation(entry))
1066 .collect::<Result<Vec<_>>>()?;
1067
1068 if let Some(merge) = &mut self.active_merge {
1071 merge.writer.write_partition(key, mutations)?;
1072 merge.rows_merged += row_count;
1073 }
1074
1075 report.rows_merged += row_count;
1077 }
1078 merge::MergeStep::Complete => {
1079 self.finalize_merge_blocking(&mut report)?;
1082 break;
1083 }
1084 }
1085 }
1086
1087 report.pending_compaction = self.active_merge.is_some();
1089 report.time_spent = start.elapsed();
1090
1091 Ok(report)
1092 }
1093
1094 fn sweep_orphaned_compaction_tmp(data_dir: &Path) {
1099 let read_dir = match std::fs::read_dir(data_dir) {
1100 Ok(rd) => rd,
1101 Err(e) => {
1102 log::debug!(
1103 "sweep_orphaned_compaction_tmp: cannot read {:?}: {}",
1104 data_dir,
1105 e
1106 );
1107 return;
1108 }
1109 };
1110
1111 for entry in read_dir.flatten() {
1112 let path = entry.path();
1113 let name = entry.file_name();
1114 let name_str = name.to_string_lossy();
1115 if name_str.starts_with(".compaction-tmp-") && path.is_dir() {
1116 log::warn!("removing orphaned compaction tmp directory: {:?}", path);
1117 if let Err(e) = std::fs::remove_dir_all(&path) {
1118 log::warn!(
1119 "failed to remove orphaned compaction tmp directory {:?}: {}",
1120 path,
1121 e
1122 );
1123 }
1124 }
1125 }
1126 }
1127
1128 fn sweep_orphaned_partial_sstables(data_dir: &Path, keyspace: &str, table: &str) {
1133 let sstable_dir = data_dir.join(keyspace).join(table);
1134
1135 let read_dir = match std::fs::read_dir(&sstable_dir) {
1136 Ok(rd) => rd,
1137 Err(_) => {
1138 return;
1140 }
1141 };
1142
1143 for entry in read_dir.flatten() {
1144 let path = entry.path();
1145 let name = entry.file_name();
1146 let name_str = name.to_string_lossy();
1147
1148 if !name_str.starts_with("nb-")
1150 || !name_str.ends_with("-big-Data.db")
1151 || !path.is_file()
1152 {
1153 continue;
1154 }
1155
1156 let base = match name_str.strip_suffix("-Data.db") {
1158 Some(b) => b.to_owned(),
1159 None => continue,
1160 };
1161
1162 let gen_str = base
1164 .strip_prefix("nb-")
1165 .and_then(|s| s.strip_suffix("-big"))
1166 .unwrap_or(&base);
1167
1168 let toc_path = sstable_dir.join(format!("{}-TOC.txt", base));
1169 if !toc_path.exists() {
1170 log::warn!(
1171 "removing orphaned partial SSTable components for generation {}: missing TOC.txt",
1172 gen_str
1173 );
1174 if let Err(e) = Self::delete_sstable_files_static(&path) {
1175 log::warn!(
1176 "failed to remove orphaned partial SSTable for generation {}: {}",
1177 gen_str,
1178 e
1179 );
1180 }
1181 }
1182 }
1183 }
1184
1185 fn scan_sstable_candidates(&self) -> Result<Vec<PathBuf>> {
1186 let mut candidates = Vec::new();
1187
1188 if !self.config.data_dir.exists() {
1189 return Ok(candidates);
1190 }
1191
1192 Self::scan_data_files(
1193 &self.config.data_dir,
1194 &mut candidates,
1195 crate::storage::sstable::MAX_SSTABLE_SCAN_DEPTH,
1196 )?;
1197 Ok(candidates)
1198 }
1199
1200 fn scan_data_files(dir: &Path, candidates: &mut Vec<PathBuf>, depth: usize) -> Result<()> {
1202 for entry in std::fs::read_dir(dir)
1203 .map_err(|e| Error::Storage(format!("Failed to read data directory: {}", e)))?
1204 {
1205 let entry = entry
1206 .map_err(|e| Error::Storage(format!("Failed to read directory entry: {}", e)))?;
1207
1208 let path = entry.path();
1209 let filename = path.file_name().unwrap_or_default().to_string_lossy();
1210
1211 if filename.starts_with("nb-") && filename.ends_with("-big-Data.db") {
1213 let base = filename.trim_end_matches("-Data.db");
1223 let toc_path = path.with_file_name(format!("{base}-TOC.txt"));
1224 if toc_path.exists() {
1225 candidates.push(path);
1226 } else {
1227 log::debug!(
1228 "scan_data_files: skipping unpublished SSTable (no TOC.txt): {:?}",
1229 path
1230 );
1231 }
1232 } else if depth > 0 && path.is_dir() {
1233 Self::scan_data_files(&path, candidates, depth - 1)?;
1234 }
1235 }
1236 Ok(())
1237 }
1238
1239 fn start_merge(&mut self, input_paths: Vec<PathBuf>) -> Result<()> {
1256 log::info!(
1257 "Starting compaction merge of {} SSTables",
1258 input_paths.len()
1259 );
1260
1261 let bytes_read: u64 = input_paths
1263 .iter()
1264 .map(|p| std::fs::metadata(p).map(|m| m.len()).unwrap_or(0))
1265 .sum();
1266
1267 let output_generation = self.generation;
1268
1269 let sstable_dir = self
1271 .config
1272 .data_dir
1273 .join(&self.config.schema.keyspace)
1274 .join(&self.config.schema.table);
1275
1276 let tmp_dir = self
1283 .config
1284 .data_dir
1285 .join(format!(".compaction-tmp-{}", output_generation));
1286
1287 std::fs::create_dir_all(&tmp_dir).map_err(|e| {
1289 Error::Storage(format!(
1290 "Failed to create compaction tmp directory {:?}: {}",
1291 tmp_dir, e
1292 ))
1293 })?;
1294
1295 let merger = KWayMerger::new(input_paths.clone(), &self.config.schema)?;
1297
1298 let writer = crate::storage::sstable::writer::SSTableWriter::new(
1301 tmp_dir.clone(),
1302 output_generation,
1303 &self.config.schema,
1304 )?;
1305
1306 self.generation += 1;
1308
1309 self.active_merge = Some(ActiveMerge {
1310 merger,
1311 writer,
1312 input_paths,
1313 tmp_dir,
1314 sstable_dir,
1315 rows_merged: 0,
1316 bytes_read,
1317 started_at: Instant::now(),
1318 });
1319
1320 Ok(())
1321 }
1322
1323 fn finalize_merge_blocking(&mut self, report: &mut MaintenanceReport) -> Result<()> {
1331 merge::block_on_async(self.finalize_merge_async(report))
1332 }
1333
1334 async fn finalize_merge_async(&mut self, report: &mut MaintenanceReport) -> Result<()> {
1348 let merge = match self.active_merge.take() {
1349 Some(m) => m,
1350 None => return Ok(()),
1351 };
1352
1353 let elapsed = merge.started_at.elapsed();
1354 log::info!(
1355 "Finalizing compaction merge: {} rows, {:?} elapsed",
1356 merge.rows_merged,
1357 elapsed
1358 );
1359
1360 let tmp_info = match merge.writer.finish().await {
1363 Ok(info) => info,
1364 Err(e) => {
1365 let _ = std::fs::remove_dir_all(&merge.tmp_dir);
1367 return Err(Error::Storage(format!(
1368 "Compaction merge write failed (inputs intact): {}",
1369 e
1370 )));
1371 }
1372 };
1373
1374 log::info!(
1375 "Compaction tmp output: {} bytes, {} partitions",
1376 tmp_info.data_size,
1377 tmp_info.partition_count
1378 );
1379
1380 let sstable_dir = &merge.sstable_dir;
1385
1386 std::fs::create_dir_all(sstable_dir).map_err(|e| {
1389 Error::Storage(format!(
1390 "Failed to create SSTable directory {:?}: {}",
1391 sstable_dir, e
1392 ))
1393 })?;
1394
1395 let make_rename = |src: &PathBuf| -> Result<(PathBuf, PathBuf)> {
1399 let filename = src
1400 .file_name()
1401 .ok_or_else(|| Error::Storage("Component path has no filename".to_string()))?;
1402 let dst = sstable_dir.join(filename);
1403 Ok((src.clone(), dst))
1404 };
1405
1406 let mut renames: Vec<(PathBuf, PathBuf)> = Vec::new();
1408
1409 for src in &[
1411 &tmp_info.data_path,
1412 &tmp_info.index_path,
1413 &tmp_info.filter_path,
1414 &tmp_info.summary_path,
1415 &tmp_info.stats_path,
1416 &tmp_info.digest_path,
1417 ] {
1418 renames.push(make_rename(src)?);
1419 }
1420 if let Some(ref ci_path) = tmp_info.compression_info_path {
1421 renames.push(make_rename(ci_path)?);
1422 }
1423 renames.push(make_rename(&tmp_info.toc_path)?);
1425
1426 let mut renamed: Vec<PathBuf> = Vec::with_capacity(renames.len());
1429 let mut rename_error: Option<Error> = None;
1430
1431 for (src, dst) in &renames {
1432 match std::fs::rename(src, dst) {
1433 Ok(()) => {
1434 log::debug!(
1435 "Renamed {:?} → {:?}",
1436 src.file_name().unwrap_or_default(),
1437 dst.file_name().unwrap_or_default()
1438 );
1439 renamed.push(dst.clone());
1440 }
1441 Err(e) => {
1442 rename_error = Some(Error::Storage(format!(
1443 "Atomic rename of {:?} to {:?} failed (rolling back, inputs intact): {}",
1444 src, dst, e
1445 )));
1446 break;
1447 }
1448 }
1449 }
1450
1451 if let Some(err) = rename_error {
1452 for dst in &renamed {
1454 let _ = std::fs::remove_file(dst);
1455 }
1456 let _ = std::fs::remove_dir_all(&merge.tmp_dir);
1458 return Err(err);
1459 }
1460
1461 for input_path in &merge.input_paths {
1474 if let Err(e) = self.delete_sstable_files(input_path) {
1475 log::warn!(
1476 "Failed to delete compaction input {:?}: {} \
1477 (merge output is valid; inputs will be re-evaluated next cycle)",
1478 input_path,
1479 e
1480 );
1481 }
1482 }
1483
1484 if let Err(e) = std::fs::remove_dir_all(&merge.tmp_dir) {
1486 log::debug!(
1487 "Failed to remove compaction tmp directory {:?}: {}",
1488 merge.tmp_dir,
1489 e
1490 );
1491 }
1492
1493 let final_data_path = sstable_dir.join(
1495 tmp_info
1496 .data_path
1497 .file_name()
1498 .ok_or_else(|| Error::Storage("Data.db path has no filename".to_string()))?,
1499 );
1500
1501 let total_bytes_written: u64 = [
1504 &tmp_info.data_path,
1505 &tmp_info.index_path,
1506 &tmp_info.filter_path,
1507 &tmp_info.summary_path,
1508 &tmp_info.stats_path,
1509 &tmp_info.digest_path,
1510 ]
1511 .iter()
1512 .map(|p| {
1513 let filename = p.file_name().unwrap_or_default();
1514 let final_path = sstable_dir.join(filename);
1515 std::fs::metadata(&final_path).map(|m| m.len()).unwrap_or(0)
1516 })
1517 .sum::<u64>()
1518 + tmp_info
1519 .compression_info_path
1520 .as_ref()
1521 .and_then(|p| {
1522 let filename = p.file_name()?;
1523 std::fs::metadata(sstable_dir.join(filename))
1524 .ok()
1525 .map(|m| m.len())
1526 })
1527 .unwrap_or(0);
1528
1529 report.completed_merges.push(final_data_path);
1531 report.bytes_written += total_bytes_written;
1532
1533 self.cumulative_stats.compactions_completed += 1;
1535 self.cumulative_stats.sstables_merged_in += merge.input_paths.len() as u64;
1536 self.cumulative_stats.sstables_produced += 1;
1537 self.cumulative_stats.bytes_read += merge.bytes_read;
1538 self.cumulative_stats.bytes_written += total_bytes_written;
1539 self.cumulative_stats.rows_merged += merge.rows_merged;
1540 self.cumulative_stats.total_time += elapsed;
1541
1542 log::info!(
1543 "Compaction complete: merged {} inputs → 1 output ({} bytes total across all components, {} rows, {:?})",
1544 merge.input_paths.len(),
1545 total_bytes_written,
1546 merge.rows_merged,
1547 elapsed
1548 );
1549
1550 Ok(())
1551 }
1552
1553 fn delete_sstable_files(&self, data_path: &Path) -> Result<()> {
1555 Self::delete_sstable_files_static(data_path)
1556 }
1557
1558 fn delete_sstable_files_static(data_path: &Path) -> Result<()> {
1582 let filename = data_path
1584 .file_name()
1585 .and_then(|s| s.to_str())
1586 .ok_or_else(|| Error::Storage("Invalid SSTable path".to_string()))?;
1587
1588 let base = filename
1589 .strip_suffix("-Data.db")
1590 .ok_or_else(|| Error::Storage("Invalid Data.db filename".to_string()))?;
1591
1592 let parent_dir = data_path.parent().ok_or_else(|| {
1593 Error::Storage(format!(
1594 "Data.db path has no parent directory: {:?}",
1595 data_path
1596 ))
1597 })?;
1598
1599 let components = [
1603 "TOC.txt",
1604 "Data.db",
1605 "Index.db",
1606 "Summary.db",
1607 "Statistics.db",
1608 "CompressionInfo.db",
1609 "Filter.db",
1610 "Digest.crc32",
1611 ];
1612
1613 let mut failures: Vec<String> = Vec::new();
1614 for component in &components {
1615 let component_path = parent_dir.join(format!("{}-{}", base, component));
1616 if component_path.exists() {
1617 match std::fs::remove_file(&component_path) {
1618 Ok(()) => log::debug!("Deleted compaction input: {:?}", component_path),
1619 Err(e) => {
1620 log::warn!(
1624 "Deferred delete of {:?}: {} (component left as orphan; \
1625 unpublished via TOC.txt removal, reclaimed on next startup)",
1626 component_path,
1627 e
1628 );
1629 failures.push(format!("{:?}: {}", component_path, e));
1630 }
1631 }
1632 }
1633 }
1634
1635 if failures.is_empty() {
1636 Ok(())
1637 } else {
1638 Err(Error::Storage(format!(
1642 "Deferred delete left {} orphaned component(s) (unpublished, reclaimed on \
1643 next startup): {}",
1644 failures.len(),
1645 failures.join("; ")
1646 )))
1647 }
1648 }
1649
1650 fn merge_entry_to_mutation(
1654 &self,
1655 entry: merge::MergeEntry,
1656 ) -> Result<crate::storage::write_engine::mutation::Mutation> {
1657 merge::KWayMerger::merge_entry_to_mutation(entry, &self.config.schema)
1658 }
1659}
1660
1661#[cfg(all(test, feature = "write-support"))]
1662mod tests {
1663 use super::*;
1664 use crate::schema::{Column, KeyColumn};
1665 use crate::storage::write_engine::mutation::{CellOperation, PartitionKey, TableId};
1666 use crate::types::Value;
1667 use std::collections::HashMap;
1668 use tempfile::TempDir;
1669
1670 fn create_test_schema() -> TableSchema {
1671 TableSchema {
1672 keyspace: "test_ks".to_string(),
1673 table: "test_table".to_string(),
1674 partition_keys: vec![KeyColumn {
1675 name: "id".to_string(),
1676 data_type: "int".to_string(),
1677 position: 0,
1678 }],
1679 clustering_keys: vec![],
1680 columns: vec![
1681 Column {
1682 name: "id".to_string(),
1683 data_type: "int".to_string(),
1684 nullable: false,
1685 default: None,
1686 is_static: false,
1687 },
1688 Column {
1689 name: "name".to_string(),
1690 data_type: "text".to_string(),
1691 nullable: true,
1692 default: None,
1693 is_static: false,
1694 },
1695 ],
1696 comments: HashMap::new(),
1697 }
1698 }
1699
1700 fn create_test_mutation(id: i32, name: &str, timestamp: i64) -> Mutation {
1701 let table_id = TableId::new("test_ks", "test_table");
1702 let pk = PartitionKey::single("id", Value::Integer(id));
1703 let ops = vec![CellOperation::Write {
1704 column: "name".to_string(),
1705 value: Value::Text(name.to_string()),
1706 }];
1707
1708 Mutation::new(table_id, pk, None, ops, timestamp, None)
1709 }
1710
1711 #[test]
1712 fn test_set_merge_policy() {
1713 let temp_dir = TempDir::new().unwrap();
1714 let schema = create_test_schema();
1715
1716 let config = WriteEngineConfig::new(
1717 temp_dir.path().join("data"),
1718 temp_dir.path().join("wal"),
1719 schema,
1720 );
1721
1722 let mut engine = WriteEngine::new(config).unwrap();
1723
1724 let policy = Box::new(crate::storage::write_engine::STCSPolicy::default());
1726 engine.set_merge_policy(policy).unwrap();
1727
1728 let report = engine
1730 .maintenance_step(std::time::Duration::from_millis(100))
1731 .unwrap();
1732 assert!(!report.pending_compaction);
1733 assert_eq!(report.rows_merged, 0);
1734 }
1735
1736 #[test]
1737 fn test_write_engine_config() {
1738 let temp_dir = TempDir::new().unwrap();
1739 let schema = create_test_schema();
1740
1741 let config = WriteEngineConfig::new(
1742 temp_dir.path().join("data"),
1743 temp_dir.path().join("wal"),
1744 schema,
1745 );
1746
1747 assert_eq!(
1748 config.memtable_flush_threshold,
1749 WriteEngineConfig::DEFAULT_FLUSH_THRESHOLD
1750 );
1751 assert_eq!(
1752 config.memtable_hard_limit,
1753 WriteEngineConfig::DEFAULT_HARD_LIMIT
1754 );
1755
1756 let config = config.with_flush_threshold(128 * 1024 * 1024);
1757 assert_eq!(config.memtable_flush_threshold, 128 * 1024 * 1024);
1758
1759 let config = config.with_hard_limit(512 * 1024 * 1024);
1760 assert_eq!(config.memtable_hard_limit, 512 * 1024 * 1024);
1761 }
1762
1763 #[test]
1764 fn test_write_engine_new() {
1765 let temp_dir = TempDir::new().unwrap();
1766 let schema = create_test_schema();
1767
1768 let config = WriteEngineConfig::new(
1769 temp_dir.path().join("data"),
1770 temp_dir.path().join("wal"),
1771 schema,
1772 );
1773
1774 let engine = WriteEngine::new(config).unwrap();
1775
1776 assert_eq!(engine.generation(), 1);
1777 assert_eq!(engine.memtable_size(), 0);
1778 assert_eq!(engine.memtable_row_count(), 0);
1779 assert!(!engine.closed.load(std::sync::atomic::Ordering::Relaxed));
1780 }
1781
1782 #[test]
1783 fn test_write_engine_write_single_mutation() {
1784 let temp_dir = TempDir::new().unwrap();
1785 let schema = create_test_schema();
1786
1787 let config = WriteEngineConfig::new(
1788 temp_dir.path().join("data"),
1789 temp_dir.path().join("wal"),
1790 schema,
1791 );
1792
1793 let mut engine = WriteEngine::new(config).unwrap();
1794
1795 let mutation = create_test_mutation(1, "Alice", 1000000);
1796 engine.write(mutation).unwrap();
1797
1798 assert_eq!(engine.memtable_row_count(), 1);
1799 assert!(engine.memtable_size() > 0);
1800 assert!(engine.wal_size() > 0);
1801 }
1802
1803 #[test]
1804 fn test_write_engine_write_multiple_mutations() {
1805 let temp_dir = TempDir::new().unwrap();
1806 let schema = create_test_schema();
1807
1808 let config = WriteEngineConfig::new(
1809 temp_dir.path().join("data"),
1810 temp_dir.path().join("wal"),
1811 schema,
1812 );
1813
1814 let mut engine = WriteEngine::new(config).unwrap();
1815
1816 for i in 0..10 {
1818 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
1819 engine.write(mutation).unwrap();
1820 }
1821
1822 assert_eq!(engine.memtable_row_count(), 10);
1823 assert!(engine.memtable_size() > 0);
1824 }
1825
1826 #[tokio::test]
1827 async fn test_write_engine_flush_empty() {
1828 let temp_dir = TempDir::new().unwrap();
1829 let schema = create_test_schema();
1830
1831 let config = WriteEngineConfig::new(
1832 temp_dir.path().join("data"),
1833 temp_dir.path().join("wal"),
1834 schema,
1835 );
1836
1837 let mut engine = WriteEngine::new(config).unwrap();
1838
1839 let result = engine.flush().await.unwrap();
1841 assert!(result.is_none());
1842 }
1843
1844 #[tokio::test]
1845 async fn test_write_engine_flush_with_data() {
1846 let temp_dir = TempDir::new().unwrap();
1847 let schema = create_test_schema();
1848
1849 let config = WriteEngineConfig::new(
1850 temp_dir.path().join("data"),
1851 temp_dir.path().join("wal"),
1852 schema,
1853 );
1854
1855 let mut engine = WriteEngine::new(config).unwrap();
1856
1857 for i in 0..5 {
1859 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
1860 engine.write(mutation).unwrap();
1861 }
1862
1863 let initial_generation = engine.generation();
1864
1865 let info = engine.flush().await.unwrap();
1867 assert!(info.is_some());
1868
1869 let info = info.unwrap();
1870 assert_eq!(info.partition_count, 5);
1871 assert!(info.data_size > 0);
1872 assert!(info.data_path.exists());
1873
1874 assert_eq!(engine.memtable_row_count(), 0);
1876 assert_eq!(engine.memtable_size(), 0);
1877
1878 assert_eq!(engine.wal_size(), 0);
1880
1881 assert_eq!(engine.generation(), initial_generation + 1);
1883 }
1884
1885 #[test]
1886 fn test_write_engine_automatic_flush() {
1887 let temp_dir = TempDir::new().unwrap();
1888 let schema = create_test_schema();
1889
1890 let config = WriteEngineConfig::new(
1892 temp_dir.path().join("data"),
1893 temp_dir.path().join("wal"),
1894 schema,
1895 )
1896 .with_flush_threshold(1024);
1897
1898 let mut engine = WriteEngine::new(config).unwrap();
1899
1900 for i in 0..100 {
1902 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
1903 engine.write(mutation).unwrap();
1904 }
1905
1906 assert!(engine.generation() > 1 || engine.memtable_size() < 10000);
1910 }
1911
1912 #[tokio::test]
1913 async fn test_write_engine_close_with_data() {
1914 let temp_dir = TempDir::new().unwrap();
1915 let schema = create_test_schema();
1916
1917 let config = WriteEngineConfig::new(
1918 temp_dir.path().join("data"),
1919 temp_dir.path().join("wal"),
1920 schema,
1921 );
1922
1923 let mut engine = WriteEngine::new(config).unwrap();
1924
1925 for i in 0..5 {
1927 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
1928 engine.write(mutation).unwrap();
1929 }
1930
1931 engine.close().await.unwrap();
1933
1934 let data_dir = temp_dir.path().join("data");
1936 let entries: Vec<_> = std::fs::read_dir(&data_dir).unwrap().collect();
1937 assert!(!entries.is_empty(), "SSTable files should exist");
1938 }
1939
1940 #[tokio::test]
1941 async fn test_write_engine_close_empty() {
1942 let temp_dir = TempDir::new().unwrap();
1943 let schema = create_test_schema();
1944
1945 let config = WriteEngineConfig::new(
1946 temp_dir.path().join("data"),
1947 temp_dir.path().join("wal"),
1948 schema,
1949 );
1950
1951 let mut engine = WriteEngine::new(config).unwrap();
1952
1953 engine.close().await.unwrap();
1955 }
1956
1957 #[test]
1958 fn test_write_engine_write_after_close() {
1959 let temp_dir = TempDir::new().unwrap();
1960 let schema = create_test_schema();
1961
1962 let config = WriteEngineConfig::new(
1963 temp_dir.path().join("data"),
1964 temp_dir.path().join("wal"),
1965 schema,
1966 );
1967
1968 let mut engine = WriteEngine::new(config).unwrap();
1969
1970 tokio::runtime::Runtime::new()
1972 .unwrap()
1973 .block_on(engine.close())
1974 .unwrap();
1975
1976 let schema2 = create_test_schema();
1978 let config2 = WriteEngineConfig::new(
1979 temp_dir.path().join("data"),
1980 temp_dir.path().join("wal"),
1981 schema2,
1982 );
1983
1984 let mut engine2 = WriteEngine::new(config2).unwrap();
1985
1986 let mutation = create_test_mutation(1, "Alice", 1000000);
1989 engine2.write(mutation).unwrap();
1990 assert_eq!(engine2.memtable_row_count(), 1);
1991 }
1992
1993 #[test]
1994 fn test_write_engine_wal_recovery() {
1995 let temp_dir = TempDir::new().unwrap();
1996 let schema = create_test_schema();
1997
1998 let config = WriteEngineConfig::new(
1999 temp_dir.path().join("data"),
2000 temp_dir.path().join("wal"),
2001 schema.clone(),
2002 );
2003
2004 {
2006 let mut engine = WriteEngine::new(config.clone()).unwrap();
2007
2008 for i in 0..5 {
2009 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
2010 engine.write(mutation).unwrap();
2011 }
2012
2013 }
2015
2016 let config2 = WriteEngineConfig::new(
2018 temp_dir.path().join("data"),
2019 temp_dir.path().join("wal"),
2020 schema,
2021 );
2022
2023 let engine = WriteEngine::new(config2).unwrap();
2024
2025 assert_eq!(engine.memtable_row_count(), 5);
2027 assert!(engine.memtable_size() > 0);
2028 }
2029
2030 #[test]
2031 fn test_write_engine_generation_tracking() {
2032 let temp_dir = TempDir::new().unwrap();
2033 let schema = create_test_schema();
2034
2035 let config = WriteEngineConfig::new(
2036 temp_dir.path().join("data"),
2037 temp_dir.path().join("wal"),
2038 schema.clone(),
2039 );
2040
2041 {
2043 let mut engine = WriteEngine::new(config.clone()).unwrap();
2044 assert_eq!(engine.generation(), 1);
2045
2046 let mutation = create_test_mutation(1, "Alice", 1000000);
2048 engine.write(mutation).unwrap();
2049
2050 tokio::runtime::Runtime::new()
2051 .unwrap()
2052 .block_on(engine.flush())
2053 .unwrap();
2054
2055 assert_eq!(engine.generation(), 2);
2056 }
2057
2058 let config2 = WriteEngineConfig::new(
2060 temp_dir.path().join("data"),
2061 temp_dir.path().join("wal"),
2062 schema,
2063 );
2064
2065 let engine = WriteEngine::new(config2).unwrap();
2066 assert_eq!(engine.generation(), 2);
2067 }
2068
2069 #[test]
2070 fn test_write_engine_execute_table_mismatch() {
2071 let temp_dir = TempDir::new().unwrap();
2072 let schema = create_test_schema();
2073
2074 let config = WriteEngineConfig::new(
2075 temp_dir.path().join("data"),
2076 temp_dir.path().join("wal"),
2077 schema,
2078 );
2079
2080 let mut engine = WriteEngine::new(config).unwrap();
2081
2082 let result = engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')");
2084 let err_msg = result.unwrap_err().to_string();
2085 assert!(
2086 err_msg.contains("targets table 'users'")
2087 && err_msg.contains("schema is for 'test_table'"),
2088 "Expected table mismatch error, got: {}",
2089 err_msg
2090 );
2091 }
2092
2093 #[test]
2094 fn test_write_engine_execute_insert_success() {
2095 let temp_dir = TempDir::new().unwrap();
2096 let schema = create_test_schema();
2097
2098 let config = WriteEngineConfig::new(
2099 temp_dir.path().join("data"),
2100 temp_dir.path().join("wal"),
2101 schema,
2102 );
2103
2104 let mut engine = WriteEngine::new(config).unwrap();
2105
2106 assert_eq!(engine.memtable_row_count(), 0);
2107
2108 let result = engine.execute("INSERT INTO test_table (id, name) VALUES (1, 'Alice')");
2110 assert!(
2111 result.is_ok(),
2112 "execute() failed: {:?}",
2113 result.unwrap_err()
2114 );
2115
2116 assert_eq!(engine.memtable_row_count(), 1);
2117 }
2118
2119 #[test]
2120 fn test_determine_next_generation_empty_dir() {
2121 let temp_dir = TempDir::new().unwrap();
2122 let data_dir = temp_dir.path().join("data");
2123 std::fs::create_dir_all(&data_dir).unwrap();
2124
2125 let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
2126 assert_eq!(generation, 1);
2127 }
2128
2129 #[test]
2130 fn test_determine_next_generation_with_sstables() {
2131 let temp_dir = TempDir::new().unwrap();
2132 let data_dir = temp_dir.path().join("data");
2133 std::fs::create_dir_all(&data_dir).unwrap();
2134
2135 std::fs::write(data_dir.join("nb-1-big-Data.db"), b"").unwrap();
2137 std::fs::write(data_dir.join("nb-2-big-Data.db"), b"").unwrap();
2138 std::fs::write(data_dir.join("nb-5-big-Data.db"), b"").unwrap();
2139
2140 let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
2141 assert_eq!(generation, 6);
2142 }
2143
2144 #[tokio::test]
2145 async fn test_write_engine_close_idempotent() {
2146 let temp_dir = TempDir::new().unwrap();
2147 let schema = create_test_schema();
2148
2149 let config = WriteEngineConfig::new(
2150 temp_dir.path().join("data"),
2151 temp_dir.path().join("wal"),
2152 schema,
2153 );
2154
2155 let mut engine = WriteEngine::new(config).unwrap();
2156
2157 engine.close().await.unwrap();
2159 assert!(engine.closed.load(Ordering::SeqCst));
2160
2161 engine.close().await.unwrap();
2163 assert!(engine.closed.load(Ordering::SeqCst));
2164 }
2165
2166 #[tokio::test]
2167 async fn test_write_engine_close_syncs_wal() {
2168 let temp_dir = TempDir::new().unwrap();
2169 let schema = create_test_schema();
2170
2171 let config = WriteEngineConfig::new(
2172 temp_dir.path().join("data"),
2173 temp_dir.path().join("wal"),
2174 schema,
2175 );
2176
2177 let mut engine = WriteEngine::new(config).unwrap();
2178
2179 let mutation = create_test_mutation(1, "Alice", 1000000);
2181 engine.write(mutation).unwrap();
2182
2183 engine.close().await.unwrap();
2185
2186 assert_eq!(engine.wal_size(), 0);
2188 }
2189
2190 #[test]
2191 fn test_write_engine_closed_flag_atomic() {
2192 let temp_dir = TempDir::new().unwrap();
2193 let schema = create_test_schema();
2194
2195 let config = WriteEngineConfig::new(
2196 temp_dir.path().join("data"),
2197 temp_dir.path().join("wal"),
2198 schema,
2199 );
2200
2201 let engine = WriteEngine::new(config).unwrap();
2202
2203 assert!(!engine.closed.load(Ordering::SeqCst));
2205
2206 engine.closed.store(true, Ordering::SeqCst);
2208 assert!(engine.closed.load(Ordering::SeqCst));
2209
2210 let prev = engine.closed.swap(false, Ordering::SeqCst);
2212 assert!(prev);
2213 assert!(!engine.closed.load(Ordering::SeqCst));
2214 }
2215
2216 #[tokio::test]
2217 async fn test_write_engine_write_after_close_fails() {
2218 let temp_dir = TempDir::new().unwrap();
2219 let schema = create_test_schema();
2220
2221 let config = WriteEngineConfig::new(
2222 temp_dir.path().join("data"),
2223 temp_dir.path().join("wal"),
2224 schema,
2225 );
2226
2227 let mut engine = WriteEngine::new(config).unwrap();
2228
2229 engine.close().await.unwrap();
2231
2232 let mutation = create_test_mutation(1, "Alice", 1000000);
2234 let result = engine.write(mutation);
2235
2236 assert!(result.is_err());
2237 match result {
2238 Err(Error::InvalidInput(_)) => {}
2239 _ => panic!("Expected InvalidInput error"),
2240 }
2241 }
2242
2243 #[tokio::test]
2244 async fn test_write_engine_flush_after_close_fails() {
2245 let temp_dir = TempDir::new().unwrap();
2246 let schema = create_test_schema();
2247
2248 let config = WriteEngineConfig::new(
2249 temp_dir.path().join("data"),
2250 temp_dir.path().join("wal"),
2251 schema,
2252 );
2253
2254 let mut engine = WriteEngine::new(config).unwrap();
2255
2256 engine.close().await.unwrap();
2258
2259 let result = engine.flush().await;
2261
2262 assert!(result.is_err());
2263 match result {
2264 Err(Error::InvalidInput(_)) => {}
2265 _ => panic!("Expected InvalidInput error"),
2266 }
2267 }
2268
2269 #[test]
2270 fn test_write_engine_hard_limit_enforcement() {
2271 let temp_dir = TempDir::new().unwrap();
2272 let schema = create_test_schema();
2273
2274 let config = WriteEngineConfig::new(
2276 temp_dir.path().join("data"),
2277 temp_dir.path().join("wal"),
2278 schema,
2279 )
2280 .with_flush_threshold(10 * 1024) .with_hard_limit(2048); let mut engine = WriteEngine::new(config).unwrap();
2284
2285 let mut write_count = 0;
2287 for i in 0..1000 {
2288 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
2289 let result = engine.write(mutation);
2290
2291 match result {
2292 Ok(()) => {
2293 write_count += 1;
2294 }
2295 Err(Error::Storage(msg)) => {
2296 assert!(msg.contains("hard limit"));
2297 break;
2298 }
2299 Err(e) => panic!("Expected Storage error, got: {:?}", e),
2300 }
2301 }
2302
2303 assert!(
2305 write_count < 1000,
2306 "Should have hit hard limit before 1000 writes"
2307 );
2308 assert!(
2309 write_count > 0,
2310 "Should have accepted at least some writes before hitting limit"
2311 );
2312 }
2313
2314 #[tokio::test]
2315 async fn test_write_engine_hard_limit_enforcement_async() {
2316 let temp_dir = TempDir::new().unwrap();
2317 let schema = create_test_schema();
2318
2319 let config = WriteEngineConfig::new(
2321 temp_dir.path().join("data"),
2322 temp_dir.path().join("wal"),
2323 schema,
2324 )
2325 .with_flush_threshold(10 * 1024) .with_hard_limit(2048); let mut engine = WriteEngine::new(config).unwrap();
2329
2330 let mut write_count = 0;
2332 for i in 0..1000 {
2333 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
2334 let result = engine.write_async(mutation).await;
2335
2336 match result {
2337 Ok(()) => {
2338 write_count += 1;
2339 }
2340 Err(Error::Storage(msg)) => {
2341 assert!(msg.contains("hard limit"));
2342 break;
2343 }
2344 Err(e) => panic!("Expected Storage error, got: {:?}", e),
2345 }
2346 }
2347
2348 assert!(
2350 write_count < 1000,
2351 "Should have hit hard limit before 1000 writes"
2352 );
2353 assert!(
2354 write_count > 0,
2355 "Should have accepted at least some writes before hitting limit"
2356 );
2357 }
2358
2359 #[tokio::test]
2360 async fn test_write_engine_hard_limit_recovery_after_flush() {
2361 let temp_dir = TempDir::new().unwrap();
2362 let schema = create_test_schema();
2363
2364 let config = WriteEngineConfig::new(
2366 temp_dir.path().join("data"),
2367 temp_dir.path().join("wal"),
2368 schema,
2369 )
2370 .with_flush_threshold(1024)
2371 .with_hard_limit(2048);
2372
2373 let mut engine = WriteEngine::new(config).unwrap();
2374
2375 let mut first_batch_count = 0;
2377 for i in 0..1000 {
2378 let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
2379 let result = engine.write(mutation);
2380
2381 if result.is_err() {
2382 break;
2383 }
2384
2385 first_batch_count += 1;
2386 }
2387
2388 assert!(
2389 first_batch_count > 0,
2390 "Should have accepted some writes before limit"
2391 );
2392
2393 engine.flush().await.unwrap();
2395
2396 let mutation = create_test_mutation(9999, "After flush", 2000000);
2398 let result = engine.write(mutation);
2399 assert!(result.is_ok(), "Should accept writes after flush");
2400
2401 assert_eq!(engine.memtable_row_count(), 1);
2402 }
2403
2404 #[test]
2405 fn test_generation_counter_is_u64() {
2406 let temp_dir = TempDir::new().unwrap();
2408 let schema = create_test_schema();
2409
2410 let config = WriteEngineConfig::new(
2411 temp_dir.path().join("data"),
2412 temp_dir.path().join("wal"),
2413 schema,
2414 );
2415
2416 let engine = WriteEngine::new(config).unwrap();
2417
2418 let generation: u64 = engine.generation();
2420 assert_eq!(generation, 1u64);
2421
2422 let _type_check: u64 = generation;
2425
2426 let large_generation: u64 = u32::MAX as u64 + 1000;
2429 assert!(large_generation > u32::MAX as u64);
2430 assert_eq!(large_generation, 4_294_968_295u64);
2431 }
2432
2433 #[test]
2434 fn test_determine_next_generation_large_numbers() {
2435 let temp_dir = TempDir::new().unwrap();
2437 let data_dir = temp_dir.path().join("data");
2438 std::fs::create_dir_all(&data_dir).unwrap();
2439
2440 let large_gen: u64 = u32::MAX as u64 + 100;
2443 std::fs::write(data_dir.join(format!("nb-{}-big-Data.db", large_gen)), b"").unwrap();
2444
2445 let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
2446 assert_eq!(generation, large_gen + 1);
2447 assert!(generation > u32::MAX as u64);
2448 }
2449
2450 #[test]
2453 fn test_maintenance_step_no_policy() {
2454 let temp_dir = TempDir::new().unwrap();
2456 let schema = create_test_schema();
2457
2458 let config = WriteEngineConfig::new(
2459 temp_dir.path().join("data"),
2460 temp_dir.path().join("wal"),
2461 schema,
2462 );
2463
2464 let mut engine = WriteEngine::new(config).unwrap();
2465
2466 let report = engine.maintenance_step(Duration::from_millis(100)).unwrap();
2468
2469 assert_eq!(report.rows_merged, 0);
2471 assert_eq!(report.bytes_written, 0);
2472 assert_eq!(report.completed_merges.len(), 0);
2473 assert!(!report.pending_compaction);
2474 assert!(report.time_spent < Duration::from_millis(50));
2475 }
2476
2477 #[test]
2478 fn test_maintenance_step_with_closed_engine() {
2479 let temp_dir = TempDir::new().unwrap();
2480 let schema = create_test_schema();
2481
2482 let config = WriteEngineConfig::new(
2483 temp_dir.path().join("data"),
2484 temp_dir.path().join("wal"),
2485 schema,
2486 );
2487
2488 let mut engine = WriteEngine::new(config).unwrap();
2489
2490 tokio::runtime::Runtime::new()
2492 .unwrap()
2493 .block_on(engine.close())
2494 .unwrap();
2495
2496 let result = engine.maintenance_step(Duration::from_millis(100));
2498 assert!(result.is_err());
2499 match result {
2500 Err(Error::InvalidInput(msg)) => {
2501 assert!(msg.contains("closed"));
2502 }
2503 _ => panic!("Expected InvalidInput error"),
2504 }
2505 }
2506
2507 #[test]
2508 fn test_maintenance_report_creation() {
2509 let report = MaintenanceReport {
2510 time_spent: Duration::from_millis(250),
2511 completed_merges: vec![PathBuf::from("data/nb-5-big-Data.db")],
2512 rows_merged: 1000,
2513 bytes_written: 1024 * 1024,
2514 pending_compaction: true,
2515 };
2516
2517 assert_eq!(report.time_spent.as_millis(), 250);
2518 assert_eq!(report.completed_merges.len(), 1);
2519 assert_eq!(report.rows_merged, 1000);
2520 assert_eq!(report.bytes_written, 1024 * 1024);
2521 assert!(report.pending_compaction);
2522 }
2523
2524 #[test]
2525 fn test_scan_sstable_candidates_empty_dir() {
2526 let temp_dir = TempDir::new().unwrap();
2527 let schema = create_test_schema();
2528
2529 let config = WriteEngineConfig::new(
2530 temp_dir.path().join("data"),
2531 temp_dir.path().join("wal"),
2532 schema,
2533 );
2534
2535 let engine = WriteEngine::new(config).unwrap();
2536
2537 let candidates = engine.scan_sstable_candidates().unwrap();
2538 assert_eq!(candidates.len(), 0);
2539 }
2540
2541 #[test]
2542 fn test_scan_sstable_candidates_with_sstables() {
2543 let temp_dir = TempDir::new().unwrap();
2544 let schema = create_test_schema();
2545
2546 let config = WriteEngineConfig::new(
2547 temp_dir.path().join("data"),
2548 temp_dir.path().join("wal"),
2549 schema,
2550 );
2551
2552 let engine = WriteEngine::new(config).unwrap();
2553
2554 let data_dir = temp_dir.path().join("data");
2559 std::fs::create_dir_all(&data_dir).unwrap();
2560 std::fs::write(data_dir.join("nb-1-big-Data.db"), b"").unwrap();
2561 std::fs::write(data_dir.join("nb-1-big-TOC.txt"), b"").unwrap();
2562 std::fs::write(data_dir.join("nb-2-big-Data.db"), b"").unwrap();
2563 std::fs::write(data_dir.join("nb-2-big-TOC.txt"), b"").unwrap();
2564 std::fs::write(data_dir.join("nb-3-big-Index.db"), b"").unwrap(); std::fs::write(data_dir.join("other-file.txt"), b"").unwrap(); std::fs::write(data_dir.join("nb-4-big-Data.db"), b"").unwrap();
2568
2569 let candidates = engine.scan_sstable_candidates().unwrap();
2570
2571 assert_eq!(candidates.len(), 2);
2574 assert!(candidates
2575 .iter()
2576 .all(|p| p.to_string_lossy().contains("Data.db")));
2577 assert!(
2578 !candidates
2579 .iter()
2580 .any(|p| p.to_string_lossy().contains("nb-4-big")),
2581 "unpublished Data.db (no TOC.txt) must be excluded (Issue #591)"
2582 );
2583 }
2584
2585 #[test]
2586 fn test_delete_sstable_files() {
2587 let temp_dir = TempDir::new().unwrap();
2588 let schema = create_test_schema();
2589
2590 let config = WriteEngineConfig::new(
2591 temp_dir.path().join("data"),
2592 temp_dir.path().join("wal"),
2593 schema,
2594 );
2595
2596 let engine = WriteEngine::new(config).unwrap();
2597
2598 let data_dir = temp_dir.path().join("data");
2600 std::fs::create_dir_all(&data_dir).unwrap();
2601
2602 let components = [
2603 "nb-5-big-Data.db",
2604 "nb-5-big-Index.db",
2605 "nb-5-big-Summary.db",
2606 "nb-5-big-Statistics.db",
2607 ];
2608
2609 for component in &components {
2610 std::fs::write(data_dir.join(component), b"dummy").unwrap();
2611 }
2612
2613 for component in &components {
2615 assert!(data_dir.join(component).exists());
2616 }
2617
2618 let data_path = data_dir.join("nb-5-big-Data.db");
2620 engine.delete_sstable_files(&data_path).unwrap();
2621
2622 for component in &components {
2624 assert!(!data_dir.join(component).exists());
2625 }
2626 }
2627
2628 #[test]
2634 fn test_delete_removes_toc_first_unpublishing_atomically() {
2635 let temp_dir = TempDir::new().unwrap();
2636 let data_dir = temp_dir.path().join("data");
2637 std::fs::create_dir_all(&data_dir).unwrap();
2638
2639 for comp in &[
2641 "nb-7-big-Data.db",
2642 "nb-7-big-Index.db",
2643 "nb-7-big-Statistics.db",
2644 "nb-7-big-TOC.txt",
2645 ] {
2646 std::fs::write(data_dir.join(comp), b"x").unwrap();
2647 }
2648
2649 let data_path = data_dir.join("nb-7-big-Data.db");
2650 WriteEngine::delete_sstable_files_static(&data_path).unwrap();
2651
2652 assert!(!data_dir.join("nb-7-big-TOC.txt").exists());
2654 assert!(!data_path.exists());
2655
2656 std::fs::write(data_dir.join("nb-8-big-Data.db"), b"x").unwrap();
2660 let mut candidates = Vec::new();
2661 WriteEngine::scan_data_files(&data_dir, &mut candidates, 1).unwrap();
2662 assert!(
2663 candidates.is_empty(),
2664 "a Data.db without a sibling TOC.txt must NOT be a compaction candidate \
2665 (publication barrier, Issue #591); got {:?}",
2666 candidates
2667 );
2668
2669 std::fs::write(data_dir.join("nb-8-big-TOC.txt"), b"x").unwrap();
2671 let mut candidates = Vec::new();
2672 WriteEngine::scan_data_files(&data_dir, &mut candidates, 1).unwrap();
2673 assert_eq!(
2674 candidates.len(),
2675 1,
2676 "a published Data.db (TOC.txt present) must be discovered"
2677 );
2678 }
2679
2680 #[derive(Debug)]
2682 #[allow(dead_code)] struct TestMergePolicy {
2684 files_to_select: Vec<PathBuf>,
2685 }
2686
2687 impl MergePolicy for TestMergePolicy {
2688 fn select_merge(&self, _candidates: &[PathBuf]) -> Result<Vec<PathBuf>> {
2689 Ok(self.files_to_select.clone())
2690 }
2691 }
2692
2693 #[test]
2694 fn test_maintenance_step_with_policy_no_work() {
2695 let temp_dir = TempDir::new().unwrap();
2697 let schema = create_test_schema();
2698
2699 let config = WriteEngineConfig::new(
2700 temp_dir.path().join("data"),
2701 temp_dir.path().join("wal"),
2702 schema,
2703 );
2704
2705 let mut engine = WriteEngine::new(config).unwrap();
2706
2707 let policy = TestMergePolicy {
2709 files_to_select: vec![],
2710 };
2711 engine.set_merge_policy(Box::new(policy)).unwrap();
2712
2713 let report = engine.maintenance_step(Duration::from_millis(100)).unwrap();
2715
2716 assert_eq!(report.rows_merged, 0);
2718 assert_eq!(report.bytes_written, 0);
2719 assert_eq!(report.completed_merges.len(), 0);
2720 assert!(!report.pending_compaction);
2721 }
2722
2723 #[test]
2724 fn test_maintenance_step_budget_honored() {
2725 let temp_dir = TempDir::new().unwrap();
2727 let schema = create_test_schema();
2728
2729 let config = WriteEngineConfig::new(
2730 temp_dir.path().join("data"),
2731 temp_dir.path().join("wal"),
2732 schema,
2733 );
2734
2735 let mut engine = WriteEngine::new(config).unwrap();
2736
2737 let policy = TestMergePolicy {
2739 files_to_select: vec![],
2740 };
2741 engine.set_merge_policy(Box::new(policy)).unwrap();
2742
2743 let budget = Duration::from_millis(10);
2745 let report = engine.maintenance_step(budget).unwrap();
2746
2747 assert!(
2749 report.time_spent < budget.mul_f32(1.5),
2750 "Time spent {:?} exceeded budget {:?} by >50%",
2751 report.time_spent,
2752 budget
2753 );
2754 }
2755
2756 fn flush_n_sstables_sync(engine: &mut WriteEngine, n: usize) -> Vec<PathBuf> {
2765 let rt = tokio::runtime::Builder::new_current_thread()
2766 .enable_all()
2767 .build()
2768 .unwrap();
2769 let mut paths = Vec::new();
2770 for batch in 0..n {
2771 for row in 0..5 {
2772 let mutation = create_test_mutation(
2773 (batch * 100 + row) as i32,
2774 &format!("User-{}-{}", batch, row),
2775 1_000_000 + (batch * 100 + row) as i64,
2776 );
2777 engine.write(mutation).unwrap();
2778 }
2779 let info = rt.block_on(engine.flush()).unwrap().unwrap();
2780 paths.push(info.data_path);
2781 }
2782 paths
2783 }
2784
2785 #[test]
2786 fn test_maintenance_stats_initial_zero() {
2787 let temp_dir = TempDir::new().unwrap();
2789 let schema = create_test_schema();
2790
2791 let config = WriteEngineConfig::new(
2792 temp_dir.path().join("data"),
2793 temp_dir.path().join("wal"),
2794 schema,
2795 );
2796
2797 let engine = WriteEngine::new(config).unwrap();
2798
2799 let stats = engine.maintenance_stats();
2800 assert_eq!(stats.compactions_completed, 0);
2801 assert_eq!(stats.sstables_merged_in, 0);
2802 assert_eq!(stats.sstables_produced, 0);
2803 assert_eq!(stats.bytes_read, 0);
2804 assert_eq!(stats.bytes_written, 0);
2805 assert_eq!(stats.rows_merged, 0);
2806 assert_eq!(stats.total_time, Duration::ZERO);
2807 }
2808
2809 #[test]
2810 fn test_stcs_selects_expected_group_by_size() {
2811 let policy = crate::storage::write_engine::STCSPolicy::default();
2814
2815 let temp_dir = TempDir::new().unwrap();
2817 let mut paths = Vec::new();
2818 for i in 1..=4 {
2819 let path = temp_dir.path().join(format!("nb-{}-big-Data.db", i));
2820 let size_bytes = 60 * 1024 * 1024u64;
2822 let file = std::fs::File::create(&path).unwrap();
2823 file.set_len(size_bytes).unwrap();
2824 paths.push(path);
2825 }
2826
2827 let selected = policy.select_merge(&paths).unwrap();
2829 assert_eq!(
2830 selected.len(),
2831 4,
2832 "STCS should select all 4 same-sized SSTables as one compaction group"
2833 );
2834
2835 for sel in &selected {
2837 assert!(
2838 paths.contains(sel),
2839 "Selected path {:?} not in input set",
2840 sel
2841 );
2842 }
2843 }
2844
2845 #[test]
2846 fn test_stcs_does_not_select_below_threshold() {
2847 let policy = crate::storage::write_engine::STCSPolicy::default();
2849
2850 let temp_dir = TempDir::new().unwrap();
2851 let mut paths = Vec::new();
2852 for i in 1..=3 {
2853 let path = temp_dir.path().join(format!("nb-{}-big-Data.db", i));
2854 let file = std::fs::File::create(&path).unwrap();
2855 file.set_len(60 * 1024 * 1024).unwrap();
2856 paths.push(path);
2857 }
2858
2859 let selected = policy.select_merge(&paths).unwrap();
2860 assert!(
2861 selected.is_empty(),
2862 "STCS should NOT select when fewer than min_threshold SSTables exist"
2863 );
2864 }
2865
2866 #[test]
2867 fn test_maintenance_step_compacts_sstables_atomically() {
2868 let temp_dir = TempDir::new().unwrap();
2875 let schema = create_test_schema();
2876
2877 let policy = crate::storage::write_engine::STCSPolicy::new(
2879 4, 32, 0.5, 1.5, 0, )
2885 .unwrap();
2886
2887 let config = WriteEngineConfig::new(
2888 temp_dir.path().join("data"),
2889 temp_dir.path().join("wal"),
2890 schema,
2891 );
2892
2893 let mut engine = WriteEngine::new(config).unwrap();
2894
2895 let input_paths = flush_n_sstables_sync(&mut engine, 4);
2897 assert_eq!(input_paths.len(), 4, "Expected 4 flushed SSTables");
2898
2899 for p in &input_paths {
2901 assert!(
2902 p.exists(),
2903 "Input file {:?} should exist before compaction",
2904 p
2905 );
2906 }
2907
2908 engine.set_merge_policy(Box::new(policy)).unwrap();
2910 let report = engine.maintenance_step(Duration::from_secs(60)).unwrap();
2911
2912 assert_eq!(
2914 report.completed_merges.len(),
2915 1,
2916 "Expected exactly 1 completed merge, got: {:?}",
2917 report.completed_merges
2918 );
2919 let merged_path = &report.completed_merges[0];
2923 assert!(
2924 merged_path.exists(),
2925 "Merged output file {:?} must exist after compaction",
2926 merged_path
2927 );
2928
2929 for p in &input_paths {
2931 assert!(
2932 !p.exists(),
2933 "Input file {:?} should have been deleted after compaction",
2934 p
2935 );
2936 }
2937
2938 let stats = engine.maintenance_stats();
2940 assert_eq!(
2941 stats.compactions_completed, 1,
2942 "compactions_completed must be 1"
2943 );
2944 assert_eq!(
2945 stats.sstables_merged_in, 4,
2946 "Should have consumed 4 input SSTables"
2947 );
2948 assert_eq!(stats.sstables_produced, 1, "sstables_produced must be 1");
2949 assert!(stats.total_time > Duration::ZERO, "total_time must be > 0");
2952 }
2953
2954 #[test]
2955 fn test_maintenance_stats_accumulate_across_cycles() {
2956 let temp_dir = TempDir::new().unwrap();
2958 let schema = create_test_schema();
2959
2960 let policy = crate::storage::write_engine::STCSPolicy::new(
2961 4, 32, 0.5, 1.5, 0, )
2963 .unwrap();
2964
2965 let config = WriteEngineConfig::new(
2966 temp_dir.path().join("data"),
2967 temp_dir.path().join("wal"),
2968 schema,
2969 );
2970
2971 let mut engine = WriteEngine::new(config).unwrap();
2972 engine.set_merge_policy(Box::new(policy)).unwrap();
2973
2974 flush_n_sstables_sync(&mut engine, 4);
2976 engine.maintenance_step(Duration::from_secs(60)).unwrap();
2977
2978 let stats_after_first = engine.maintenance_stats();
2979 assert_eq!(stats_after_first.compactions_completed, 1);
2980
2981 flush_n_sstables_sync(&mut engine, 4);
2987 engine.maintenance_step(Duration::from_secs(60)).unwrap();
2988
2989 let stats_after_second = engine.maintenance_stats();
2990 assert_eq!(
2991 stats_after_second.compactions_completed, 2,
2992 "Stats must accumulate across compaction cycles"
2993 );
2994 assert_eq!(
2995 stats_after_second.sstables_merged_in, 8,
2996 "Should have consumed 8 total input SSTables (2 cycles × 4 each)"
2997 );
2998 assert_eq!(
2999 stats_after_second.sstables_produced, 2,
3000 "Should have produced 2 output SSTables"
3001 );
3002 assert!(
3003 stats_after_second.total_time >= stats_after_first.total_time,
3004 "Cumulative total_time must only increase"
3005 );
3006 }
3007
3008 #[test]
3009 fn test_maintenance_step_inputs_intact_on_unwriteable_tmp_dir() {
3010 #[cfg(unix)]
3018 {
3019 use std::os::unix::fs::MetadataExt;
3020 let is_root = std::fs::metadata("/proc/self")
3022 .map(|m| m.uid() == 0)
3023 .unwrap_or_else(|_| {
3024 false
3026 });
3027 let is_root_macos = std::fs::write("/etc/cqlite-test-root-check", b"")
3029 .map(|_| {
3030 let _ = std::fs::remove_file("/etc/cqlite-test-root-check");
3031 true
3032 })
3033 .unwrap_or(false);
3034 if is_root || is_root_macos {
3035 return;
3037 }
3038 }
3039
3040 let temp_dir = TempDir::new().unwrap();
3041 let schema = create_test_schema();
3042
3043 let config = WriteEngineConfig::new(
3044 temp_dir.path().join("data"),
3045 temp_dir.path().join("wal"),
3046 schema,
3047 );
3048
3049 let mut engine = WriteEngine::new(config).unwrap();
3050
3051 let input_paths = flush_n_sstables_sync(&mut engine, 4);
3053 for p in &input_paths {
3054 assert!(
3055 p.exists(),
3056 "Input file {:?} should exist before failure test",
3057 p
3058 );
3059 }
3060
3061 let data_dir = temp_dir.path().join("data");
3063 #[cfg(unix)]
3064 {
3065 use std::os::unix::fs::PermissionsExt;
3066 std::fs::set_permissions(
3067 &data_dir,
3068 std::fs::Permissions::from_mode(0o555), )
3070 .unwrap();
3071 }
3072
3073 let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
3074 engine.set_merge_policy(Box::new(policy)).unwrap();
3075
3076 let result = engine.maintenance_step(Duration::from_secs(60));
3078
3079 #[cfg(unix)]
3081 {
3082 use std::os::unix::fs::PermissionsExt;
3083 std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o755)).unwrap();
3084 }
3085
3086 assert!(
3087 result.is_err(),
3088 "maintenance_step should return an error when the tmp dir cannot be created"
3089 );
3090
3091 for p in &input_paths {
3093 assert!(
3094 p.exists(),
3095 "Input file {:?} must remain intact after failed compaction",
3096 p
3097 );
3098 }
3099
3100 let stats = engine.maintenance_stats();
3102 assert_eq!(
3103 stats.compactions_completed, 0,
3104 "compactions_completed must not increment on failure"
3105 );
3106 }
3107
3108 #[test]
3109 fn test_no_tmp_dir_remains_after_successful_merge() {
3110 let temp_dir = TempDir::new().unwrap();
3112 let schema = create_test_schema();
3113
3114 let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
3115
3116 let config = WriteEngineConfig::new(
3117 temp_dir.path().join("data"),
3118 temp_dir.path().join("wal"),
3119 schema,
3120 );
3121
3122 let mut engine = WriteEngine::new(config).unwrap();
3123 flush_n_sstables_sync(&mut engine, 4);
3124
3125 engine.set_merge_policy(Box::new(policy)).unwrap();
3126 engine.maintenance_step(Duration::from_secs(60)).unwrap();
3127
3128 let data_dir = temp_dir.path().join("data");
3130 let leftover_tmp: Vec<_> = std::fs::read_dir(&data_dir)
3131 .unwrap()
3132 .filter_map(|e| e.ok())
3133 .filter(|e| {
3134 e.file_name()
3135 .to_string_lossy()
3136 .starts_with(".compaction-tmp-")
3137 })
3138 .collect();
3139
3140 assert!(
3141 leftover_tmp.is_empty(),
3142 "No .compaction-tmp-* directories should remain after successful compaction, \
3143 found: {:?}",
3144 leftover_tmp.iter().map(|e| e.path()).collect::<Vec<_>>()
3145 );
3146 }
3147
3148 fn config_for(temp_dir: &TempDir) -> WriteEngineConfig {
3154 WriteEngineConfig::new(
3155 temp_dir.path().join("data"),
3156 temp_dir.path().join("wal"),
3157 create_test_schema(),
3158 )
3159 }
3160
3161 #[test]
3162 fn test_startup_sweep_removes_orphaned_compaction_tmp_dir() {
3163 let temp_dir = TempDir::new().unwrap();
3166 let data_dir = temp_dir.path().join("data");
3167 std::fs::create_dir_all(&data_dir).unwrap();
3168
3169 let orphan_dir = data_dir.join(".compaction-tmp-99");
3170 std::fs::create_dir_all(&orphan_dir).unwrap();
3171 std::fs::write(orphan_dir.join("partial.db"), b"partial content").unwrap();
3173
3174 assert!(
3175 orphan_dir.exists(),
3176 "Orphan dir should exist before engine creation"
3177 );
3178
3179 let config = config_for(&temp_dir);
3180 let _engine = WriteEngine::new(config).unwrap();
3181
3182 assert!(
3183 !orphan_dir.exists(),
3184 "Startup sweep must remove orphaned .compaction-tmp-99/ directory"
3185 );
3186 }
3187
3188 #[test]
3189 fn test_startup_sweep_removes_orphaned_partial_sstable() {
3190 let temp_dir = TempDir::new().unwrap();
3193 let data_dir = temp_dir.path().join("data");
3194 let sstable_dir = data_dir.join("test_ks").join("test_table");
3195 std::fs::create_dir_all(&sstable_dir).unwrap();
3196
3197 let orphan_components = [
3199 "nb-99-big-Data.db",
3200 "nb-99-big-Index.db",
3201 "nb-99-big-Statistics.db",
3202 ];
3203 for name in &orphan_components {
3204 std::fs::write(sstable_dir.join(name), b"orphaned").unwrap();
3205 }
3206
3207 let complete_components = ["nb-1-big-Data.db", "nb-1-big-Index.db", "nb-1-big-TOC.txt"];
3209 for name in &complete_components {
3210 std::fs::write(sstable_dir.join(name), b"good").unwrap();
3211 }
3212
3213 let config = config_for(&temp_dir);
3214 let _engine = WriteEngine::new(config).unwrap();
3215
3216 for name in &orphan_components {
3218 assert!(
3219 !sstable_dir.join(name).exists(),
3220 "Startup sweep must remove orphaned component {:?}",
3221 name
3222 );
3223 }
3224
3225 for name in &complete_components {
3227 assert!(
3228 sstable_dir.join(name).exists(),
3229 "Startup sweep must NOT remove complete SSTable component {:?}",
3230 name
3231 );
3232 }
3233 }
3234
3235 #[test]
3236 fn test_startup_sweep_leaves_complete_sstable_intact() {
3237 let temp_dir = TempDir::new().unwrap();
3239 let data_dir = temp_dir.path().join("data");
3240 let sstable_dir = data_dir.join("test_ks").join("test_table");
3241 std::fs::create_dir_all(&sstable_dir).unwrap();
3242
3243 let all_components = [
3244 "nb-3-big-Data.db",
3245 "nb-3-big-Index.db",
3246 "nb-3-big-Summary.db",
3247 "nb-3-big-Statistics.db",
3248 "nb-3-big-Filter.db",
3249 "nb-3-big-Digest.crc32",
3250 "nb-3-big-TOC.txt",
3251 ];
3252 for name in &all_components {
3253 std::fs::write(sstable_dir.join(name), b"complete").unwrap();
3254 }
3255
3256 let config = config_for(&temp_dir);
3257 let _engine = WriteEngine::new(config).unwrap();
3258
3259 for name in &all_components {
3260 assert!(
3261 sstable_dir.join(name).exists(),
3262 "Complete SSTable component {:?} must not be removed by startup sweep",
3263 name
3264 );
3265 }
3266 }
3267
3268 #[test]
3269 fn test_bytes_written_includes_all_components() {
3270 let temp_dir = TempDir::new().unwrap();
3274 let schema = create_test_schema();
3275
3276 let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
3277
3278 let config = WriteEngineConfig::new(
3279 temp_dir.path().join("data"),
3280 temp_dir.path().join("wal"),
3281 schema,
3282 );
3283
3284 let mut engine = WriteEngine::new(config).unwrap();
3285 let input_paths = flush_n_sstables_sync(&mut engine, 4);
3286
3287 let data_db_total: u64 = input_paths
3289 .iter()
3290 .map(|p| std::fs::metadata(p).map(|m| m.len()).unwrap_or(0))
3291 .sum();
3292
3293 engine.set_merge_policy(Box::new(policy)).unwrap();
3294 engine.maintenance_step(Duration::from_secs(60)).unwrap();
3295
3296 let stats = engine.maintenance_stats();
3297 assert_eq!(stats.compactions_completed, 1, "compaction must have run");
3306 let _ = data_db_total; let stats2 = engine.maintenance_stats();
3315 assert_eq!(
3316 stats.bytes_written, stats2.bytes_written,
3317 "maintenance_stats() must be consistent across calls"
3318 );
3319 assert_eq!(
3321 stats.sstables_produced, 1,
3322 "one output SSTable must have been produced"
3323 );
3324 }
3325
3326 #[test]
3332 fn test_durability_default_is_sync_each_write() {
3333 assert_eq!(Durability::default(), Durability::SyncEachWrite);
3334 }
3335
3336 #[test]
3338 fn test_config_default_durability() {
3339 let temp_dir = TempDir::new().unwrap();
3340 let schema = create_test_schema();
3341 let config = WriteEngineConfig::new(
3342 temp_dir.path().join("data"),
3343 temp_dir.path().join("wal"),
3344 schema,
3345 );
3346 assert_eq!(config.durability, Durability::SyncEachWrite);
3347 }
3348
3349 #[test]
3351 fn test_config_with_durability_builder() {
3352 let temp_dir = TempDir::new().unwrap();
3353 let schema = create_test_schema();
3354 let config = WriteEngineConfig::new(
3355 temp_dir.path().join("data"),
3356 temp_dir.path().join("wal"),
3357 schema,
3358 )
3359 .with_durability(Durability::Disabled);
3360 assert_eq!(config.durability, Durability::Disabled);
3361 }
3362
3363 #[test]
3365 fn test_wal_on_produces_wal_growth() {
3366 let temp_dir = TempDir::new().unwrap();
3367 let schema = create_test_schema();
3368 let config = WriteEngineConfig::new(
3369 temp_dir.path().join("data"),
3370 temp_dir.path().join("wal"),
3371 schema,
3372 )
3373 .with_durability(Durability::SyncEachWrite);
3374
3375 let mut engine = WriteEngine::new(config).unwrap();
3376 assert_eq!(engine.wal_size(), 0, "WAL must start empty");
3377
3378 let mutation = create_test_mutation(1, "Alice", 1_000_000);
3379 engine.write(mutation).unwrap();
3380
3381 assert!(
3382 engine.wal_size() > 0,
3383 "WAL must grow after write with SyncEachWrite"
3384 );
3385 }
3386
3387 #[test]
3389 fn test_wal_off_produces_no_wal_growth() {
3390 let temp_dir = TempDir::new().unwrap();
3391 let schema = create_test_schema();
3392 let config = WriteEngineConfig::new(
3393 temp_dir.path().join("data"),
3394 temp_dir.path().join("wal"),
3395 schema,
3396 )
3397 .with_durability(Durability::Disabled);
3398
3399 let mut engine = WriteEngine::new(config).unwrap();
3400 assert_eq!(engine.wal_size(), 0, "WAL must start empty");
3401
3402 for i in 0..10 {
3404 let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
3405 engine.write(mutation).unwrap();
3406 }
3407
3408 assert_eq!(
3409 engine.wal_size(),
3410 0,
3411 "WAL must remain empty with Durability::Disabled"
3412 );
3413 assert_eq!(
3414 engine.memtable_row_count(),
3415 10,
3416 "Mutations must reach the memtable even without WAL"
3417 );
3418 }
3419
3420 #[tokio::test]
3422 async fn test_wal_off_write_async_produces_no_wal_growth() {
3423 let temp_dir = TempDir::new().unwrap();
3424 let schema = create_test_schema();
3425 let config = WriteEngineConfig::new(
3426 temp_dir.path().join("data"),
3427 temp_dir.path().join("wal"),
3428 schema,
3429 )
3430 .with_durability(Durability::Disabled);
3431
3432 let mut engine = WriteEngine::new(config).unwrap();
3433
3434 for i in 0..5 {
3435 let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
3436 engine.write_async(mutation).await.unwrap();
3437 }
3438
3439 assert_eq!(
3440 engine.wal_size(),
3441 0,
3442 "WAL must remain empty with Durability::Disabled (async path)"
3443 );
3444 assert_eq!(engine.memtable_row_count(), 5);
3445 }
3446
3447 #[test]
3450 fn test_wal_off_no_replay_on_restart() {
3451 let temp_dir = TempDir::new().unwrap();
3452 let schema = create_test_schema();
3453
3454 {
3455 let config = WriteEngineConfig::new(
3456 temp_dir.path().join("data"),
3457 temp_dir.path().join("wal"),
3458 schema.clone(),
3459 )
3460 .with_durability(Durability::Disabled);
3461
3462 let mut engine = WriteEngine::new(config).unwrap();
3463
3464 for i in 0..5 {
3465 let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
3466 engine.write(mutation).unwrap();
3467 }
3468
3469 }
3471
3472 let config2 = WriteEngineConfig::new(
3475 temp_dir.path().join("data"),
3476 temp_dir.path().join("wal"),
3477 schema,
3478 );
3479 let engine2 = WriteEngine::new(config2).unwrap();
3480
3481 assert_eq!(
3482 engine2.memtable_row_count(),
3483 0,
3484 "No WAL entries were written with Disabled, so no replay is possible"
3485 );
3486 }
3487
3488 #[test]
3490 fn test_wal_on_replays_on_restart() {
3491 let temp_dir = TempDir::new().unwrap();
3492 let schema = create_test_schema();
3493
3494 {
3495 let config = WriteEngineConfig::new(
3496 temp_dir.path().join("data"),
3497 temp_dir.path().join("wal"),
3498 schema.clone(),
3499 )
3500 .with_durability(Durability::SyncEachWrite);
3501
3502 let mut engine = WriteEngine::new(config).unwrap();
3503
3504 for i in 0..5 {
3505 let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
3506 engine.write(mutation).unwrap();
3507 }
3508
3509 }
3511
3512 let config2 = WriteEngineConfig::new(
3514 temp_dir.path().join("data"),
3515 temp_dir.path().join("wal"),
3516 schema,
3517 )
3518 .with_durability(Durability::SyncEachWrite);
3519
3520 let engine2 = WriteEngine::new(config2).unwrap();
3521
3522 assert_eq!(
3523 engine2.memtable_row_count(),
3524 5,
3525 "SyncEachWrite must replay mutations durably on restart"
3526 );
3527 }
3528}