Skip to main content

featherdb_storage/
file.rs

1//! File I/O abstraction
2
3use crate::compression::{
4    CompressionStats, CompressionType as StorageCompressionType, PageCompressor,
5};
6use featherdb_core::{
7    constants, CompressionConfig, CompressionType, Config, EncryptionConfig, Error, PageId, Result,
8};
9use featherdb_crypto::{Cipher, KeyDerivation, ENCRYPTION_OVERHEAD, SALT_SIZE};
10use parking_lot::RwLock;
11use std::fs::{File, OpenOptions};
12use std::io::{Read, Seek, SeekFrom, Write};
13
14/// Superblock - first 512 bytes of the database file
15#[derive(Debug, Clone)]
16pub struct Superblock {
17    /// Magic bytes
18    pub magic: [u8; 10],
19    /// Format version
20    pub format_version: u32,
21    /// Page size
22    pub page_size: u32,
23    /// Total number of pages
24    pub total_pages: u64,
25    /// Free list head page
26    pub free_list_head: u64,
27    /// Schema root page
28    pub schema_root: u64,
29    /// Last checkpoint LSN
30    pub last_checkpoint_lsn: u64,
31    /// God byte for commit slot selection (0 = A, 1 = B)
32    pub god_byte: u8,
33    /// Encryption flags (0 = none, 1 = encrypted)
34    pub encryption_flags: u8,
35    /// Salt for password-based key derivation (32 bytes)
36    pub encryption_salt: [u8; SALT_SIZE],
37}
38
39impl Superblock {
40    pub fn new(page_size: u32, encrypted: bool, salt: [u8; SALT_SIZE]) -> Self {
41        let mut magic = [0u8; 10];
42        magic.copy_from_slice(constants::MAGIC);
43
44        Superblock {
45            magic,
46            format_version: constants::FORMAT_VERSION,
47            page_size,
48            total_pages: constants::FIRST_DATA_PAGE,
49            free_list_head: 0,
50            schema_root: constants::SCHEMA_ROOT_PAGE,
51            last_checkpoint_lsn: 0,
52            god_byte: 0,
53            encryption_flags: if encrypted { 1 } else { 0 },
54            encryption_salt: salt,
55        }
56    }
57
58    pub fn is_encrypted(&self) -> bool {
59        self.encryption_flags != 0
60    }
61
62    pub fn serialize(&self) -> [u8; constants::SUPERBLOCK_SIZE] {
63        let mut buf = [0u8; constants::SUPERBLOCK_SIZE];
64        let mut offset = 0;
65
66        buf[offset..offset + 10].copy_from_slice(&self.magic);
67        offset += 10;
68
69        buf[offset..offset + 4].copy_from_slice(&self.format_version.to_le_bytes());
70        offset += 4;
71
72        buf[offset..offset + 4].copy_from_slice(&self.page_size.to_le_bytes());
73        offset += 4;
74
75        buf[offset..offset + 8].copy_from_slice(&self.total_pages.to_le_bytes());
76        offset += 8;
77
78        buf[offset..offset + 8].copy_from_slice(&self.free_list_head.to_le_bytes());
79        offset += 8;
80
81        buf[offset..offset + 8].copy_from_slice(&self.schema_root.to_le_bytes());
82        offset += 8;
83
84        buf[offset..offset + 8].copy_from_slice(&self.last_checkpoint_lsn.to_le_bytes());
85        offset += 8;
86
87        buf[offset] = self.god_byte;
88        offset += 1;
89
90        buf[offset] = self.encryption_flags;
91        offset += 1;
92
93        buf[offset..offset + SALT_SIZE].copy_from_slice(&self.encryption_salt);
94
95        buf
96    }
97
98    pub fn deserialize(data: &[u8]) -> Result<Self> {
99        if data.len() < constants::SUPERBLOCK_SIZE {
100            return Err(Error::InvalidDatabaseFile {
101                message: "Superblock too small".into(),
102            });
103        }
104
105        let mut magic = [0u8; 10];
106        magic.copy_from_slice(&data[0..10]);
107
108        if &magic != constants::MAGIC {
109            return Err(Error::InvalidDatabaseFile {
110                message: "Invalid magic bytes".into(),
111            });
112        }
113
114        let format_version = u32::from_le_bytes([data[10], data[11], data[12], data[13]]);
115        if format_version != constants::FORMAT_VERSION {
116            return Err(Error::VersionMismatch {
117                file_version: format_version,
118                expected: constants::FORMAT_VERSION,
119            });
120        }
121
122        let mut encryption_salt = [0u8; SALT_SIZE];
123        encryption_salt.copy_from_slice(&data[52..52 + SALT_SIZE]);
124
125        Ok(Superblock {
126            magic,
127            format_version,
128            page_size: u32::from_le_bytes([data[14], data[15], data[16], data[17]]),
129            total_pages: u64::from_le_bytes([
130                data[18], data[19], data[20], data[21], data[22], data[23], data[24], data[25],
131            ]),
132            free_list_head: u64::from_le_bytes([
133                data[26], data[27], data[28], data[29], data[30], data[31], data[32], data[33],
134            ]),
135            schema_root: u64::from_le_bytes([
136                data[34], data[35], data[36], data[37], data[38], data[39], data[40], data[41],
137            ]),
138            last_checkpoint_lsn: u64::from_le_bytes([
139                data[42], data[43], data[44], data[45], data[46], data[47], data[48], data[49],
140            ]),
141            god_byte: data[50],
142            encryption_flags: data[51],
143            encryption_salt,
144        })
145    }
146
147    pub fn is_valid(&self) -> bool {
148        &self.magic == constants::MAGIC && self.format_version == constants::FORMAT_VERSION
149    }
150}
151
152/// Manages file I/O for the database
153pub struct FileManager {
154    file: RwLock<File>,
155    superblock: RwLock<Superblock>,
156    page_size: usize,
157    /// Optional cipher for page encryption
158    cipher: Option<Cipher>,
159    /// Optional page compressor
160    compressor: Option<PageCompressor>,
161    /// Storage limits configuration
162    storage_limits: featherdb_core::StorageLimitsConfig,
163}
164
165impl FileManager {
166    /// Open or create a database file
167    pub fn open(config: &Config) -> Result<Self> {
168        let exists = config.path.exists();
169
170        let file = OpenOptions::new()
171            .read(true)
172            .write(true)
173            .create(config.create_if_missing)
174            .open(&config.path)?;
175
176        let page_size = config.page_size;
177
178        // Determine encryption salt and flags
179        let (superblock, cipher) = if exists {
180            // Read existing superblock
181            let mut file_ref = &file;
182            file_ref.seek(SeekFrom::Start(0))?;
183
184            let mut buf = [0u8; constants::SUPERBLOCK_SIZE];
185            file_ref.read_exact(&mut buf)?;
186
187            let sb = Superblock::deserialize(&buf)?;
188
189            if sb.page_size as usize != page_size {
190                return Err(Error::InvalidDatabaseFile {
191                    message: format!(
192                        "Page size mismatch: file has {}, config has {}",
193                        sb.page_size, page_size
194                    ),
195                });
196            }
197
198            // Check encryption configuration matches file
199            if sb.is_encrypted() && !config.is_encrypted() {
200                return Err(Error::InvalidDatabaseFile {
201                    message: "Database is encrypted but no password/key provided".into(),
202                });
203            }
204            if !sb.is_encrypted() && config.is_encrypted() {
205                return Err(Error::InvalidDatabaseFile {
206                    message: "Database is not encrypted but password/key was provided".into(),
207                });
208            }
209
210            // Derive cipher from config and stored salt
211            let cipher = Self::create_cipher(config, &sb.encryption_salt)?;
212
213            (sb, cipher)
214        } else {
215            // New database - generate salt if encrypted
216            let (encrypted, salt) = match &config.encryption {
217                EncryptionConfig::None => (false, [0u8; SALT_SIZE]),
218                EncryptionConfig::Password(_) => (true, KeyDerivation::generate_salt()),
219                EncryptionConfig::Key(_) => (true, [0u8; SALT_SIZE]), // No salt needed for raw key
220            };
221
222            let sb = Superblock::new(page_size as u32, encrypted, salt);
223            let cipher = Self::create_cipher(config, &salt)?;
224
225            (sb, cipher)
226        };
227
228        // Create compressor if compression is enabled
229        let compressor = Self::create_compressor(&config.compression);
230
231        Ok(FileManager {
232            file: RwLock::new(file),
233            superblock: RwLock::new(superblock),
234            page_size,
235            cipher,
236            compressor,
237            storage_limits: config.storage_limits.clone(),
238        })
239    }
240
241    /// Create compressor from compression config
242    fn create_compressor(config: &CompressionConfig) -> Option<PageCompressor> {
243        if !config.is_enabled() {
244            return None;
245        }
246
247        let storage_type = match config.compression_type {
248            CompressionType::None => StorageCompressionType::None,
249            CompressionType::Lz4 => StorageCompressionType::Lz4,
250            CompressionType::Zstd { level } => StorageCompressionType::Zstd { level },
251        };
252
253        let compressor = PageCompressor::new(storage_type, config.threshold);
254        Some(compressor)
255    }
256
257    /// Create cipher from encryption config and salt
258    fn create_cipher(config: &Config, salt: &[u8; SALT_SIZE]) -> Result<Option<Cipher>> {
259        match &config.encryption {
260            EncryptionConfig::None => Ok(None),
261            EncryptionConfig::Password(password) => {
262                let key = KeyDerivation::derive_key(password.as_bytes(), salt)?;
263                Ok(Some(Cipher::new(&key)))
264            }
265            EncryptionConfig::Key(key) => Ok(Some(Cipher::new(key))),
266        }
267    }
268
269    /// Initialize the database file if it's new
270    pub fn init_if_needed(&self, _config: &Config) -> Result<()> {
271        let superblock = self.superblock.read();
272
273        // Check if file is already initialized
274        let mut file = self.file.write();
275        let file_len = file.seek(SeekFrom::End(0))?;
276
277        if file_len == 0 {
278            // New file - write superblock and initial pages
279            file.seek(SeekFrom::Start(0))?;
280            file.write_all(&superblock.serialize())?;
281
282            // Pad to first page boundary (using disk_page_size for encrypted DBs)
283            let disk_page = self.disk_page_size();
284            let padding = vec![0u8; disk_page - constants::SUPERBLOCK_SIZE];
285            file.write_all(&padding)?;
286            drop(superblock); // Release lock before calling write_page
287
288            // Write empty schema catalog page (page 1) with valid checksum
289            let mut schema_page = crate::Page::new(
290                PageId(constants::SCHEMA_ROOT_PAGE),
291                crate::PageType::Leaf,
292                self.page_size,
293            );
294            schema_page.compute_checksum();
295            drop(file); // Release lock before calling write_page
296            self.write_page(PageId(constants::SCHEMA_ROOT_PAGE), schema_page.as_bytes())?;
297
298            // Write empty freelist page (page 2) with valid checksum
299            let mut freelist_page = crate::Page::new(
300                PageId(constants::FREELIST_ROOT_PAGE),
301                crate::PageType::Free,
302                self.page_size,
303            );
304            freelist_page.compute_checksum();
305            self.write_page(
306                PageId(constants::FREELIST_ROOT_PAGE),
307                freelist_page.as_bytes(),
308            )?;
309
310            self.file.write().sync_all()?;
311        }
312
313        Ok(())
314    }
315
316    /// Read a page from disk
317    ///
318    /// The read order is: read from disk -> decrypt (if encrypted) -> decompress (if compressed)
319    pub fn read_page(&self, page_id: PageId) -> Result<Vec<u8>> {
320        let offset = self.page_offset(page_id);
321
322        // Step 1: Read raw data from disk
323        let raw_data = if let Some(ref cipher) = self.cipher {
324            // Read encrypted data (always full page + overhead when encrypted)
325            let encrypted_size = self.page_size + ENCRYPTION_OVERHEAD;
326            let mut buf = vec![0u8; encrypted_size];
327
328            let mut file = self.file.write();
329            file.seek(SeekFrom::Start(offset))?;
330            file.read_exact(&mut buf)?;
331
332            // Decrypt
333            cipher.decrypt(&buf, page_id.0)?
334        } else {
335            // No encryption - read raw page
336            let mut buf = vec![0u8; self.page_size];
337
338            let mut file = self.file.write();
339            file.seek(SeekFrom::Start(offset))?;
340            file.read_exact(&mut buf)?;
341
342            buf
343        };
344
345        // Step 2: Decompress if we have a compressor
346        if let Some(ref compressor) = self.compressor {
347            compressor.decompress_page(&raw_data, self.page_size)
348        } else {
349            Ok(raw_data)
350        }
351    }
352
353    /// Write a page to disk
354    ///
355    /// The write order is: compress (if enabled) -> encrypt (if enabled) -> write to disk
356    pub fn write_page(&self, page_id: PageId, data: &[u8]) -> Result<()> {
357        if data.len() != self.page_size {
358            return Err(Error::Internal(format!(
359                "Page data size mismatch: expected {}, got {}",
360                self.page_size,
361                data.len()
362            )));
363        }
364
365        // Step 1: Compress if we have a compressor
366        let processed_data = if let Some(ref compressor) = self.compressor {
367            let compressed = compressor.compress_page(data)?;
368            // Pad to page size if compressed is smaller (needed for fixed-size page storage)
369            if compressed.len() < self.page_size {
370                let mut padded = compressed;
371                padded.resize(self.page_size, 0);
372                padded
373            } else {
374                compressed
375            }
376        } else {
377            data.to_vec()
378        };
379
380        let offset = self.page_offset(page_id);
381        let mut file = self.file.write();
382        file.seek(SeekFrom::Start(offset))?;
383
384        // Step 2: Encrypt if we have a cipher
385        if let Some(ref cipher) = self.cipher {
386            // Encrypt and write
387            let encrypted = cipher.encrypt(&processed_data, page_id.0)?;
388            file.write_all(&encrypted)?;
389        } else {
390            // No encryption
391            file.write_all(&processed_data)?;
392        }
393
394        Ok(())
395    }
396
397    /// Sync all changes to disk
398    pub fn sync(&self) -> Result<()> {
399        self.file.write().sync_all()?;
400        Ok(())
401    }
402
403    /// Get the current total number of pages
404    pub fn total_pages(&self) -> u64 {
405        self.superblock.read().total_pages
406    }
407
408    /// Extend the file by one page, return the new page ID
409    pub fn extend(&self) -> Result<PageId> {
410        let mut superblock = self.superblock.write();
411        let new_page_id = PageId(superblock.total_pages);
412
413        // Calculate how many bytes the extension will add
414        let new_total = superblock.total_pages + 1;
415        let new_file_size = self.disk_page_size() as u64 * (new_total + 1);
416        let current_size = {
417            let file = self.file.read();
418            file.metadata()?.len()
419        };
420        let additional_bytes = new_file_size.saturating_sub(current_size);
421
422        // Check storage limit before extending
423        if self.would_exceed_limit(additional_bytes) {
424            let limit = self.storage_limits.effective_database_limit().unwrap_or(0);
425            return Err(Error::StorageLimitExceeded {
426                current_bytes: current_size,
427                limit_bytes: limit,
428                operation_bytes: additional_bytes,
429            });
430        }
431
432        // Check disk space availability
433        self.check_disk_space(additional_bytes)?;
434
435        // Extend file - calculate new file size based on disk page size
436        let mut file = self.file.write();
437        file.set_len(new_file_size)?;
438
439        superblock.total_pages = new_total;
440
441        // Write updated superblock
442        file.seek(SeekFrom::Start(0))?;
443        file.write_all(&superblock.serialize())?;
444
445        Ok(new_page_id)
446    }
447
448    /// Update superblock on disk
449    pub fn write_superblock(&self) -> Result<()> {
450        let superblock = self.superblock.read();
451        let mut file = self.file.write();
452
453        file.seek(SeekFrom::Start(0))?;
454        file.write_all(&superblock.serialize())?;
455
456        Ok(())
457    }
458
459    /// Get superblock
460    pub fn superblock(&self) -> Superblock {
461        self.superblock.read().clone()
462    }
463
464    /// Update superblock
465    pub fn update_superblock<F>(&self, f: F) -> Result<()>
466    where
467        F: FnOnce(&mut Superblock),
468    {
469        let mut superblock = self.superblock.write();
470        f(&mut superblock);
471        drop(superblock);
472        self.write_superblock()
473    }
474
475    /// Get page size
476    pub fn page_size(&self) -> usize {
477        self.page_size
478    }
479
480    /// Check if encryption is enabled
481    pub fn is_encrypted(&self) -> bool {
482        self.cipher.is_some()
483    }
484
485    /// Check if compression is enabled
486    pub fn is_compressed(&self) -> bool {
487        self.compressor.is_some()
488    }
489
490    /// Get compression statistics
491    /// Returns the stats from the compressor if compression is enabled,
492    /// otherwise returns the empty stats.
493    pub fn compression_stats(&self) -> &CompressionStats {
494        if let Some(ref compressor) = self.compressor {
495            compressor.stats()
496        } else {
497            // Leak the Arc to get a static reference for the empty case.
498            // This is safe because the Arc is owned by FileManager.
499            // We use Box::leak on a clone to avoid lifetime issues.
500            static EMPTY_STATS: std::sync::OnceLock<CompressionStats> = std::sync::OnceLock::new();
501            EMPTY_STATS.get_or_init(CompressionStats::default)
502        }
503    }
504
505    /// Get the compression type (if any)
506    pub fn compression_type(&self) -> Option<StorageCompressionType> {
507        self.compressor.as_ref().map(|c| c.compression_type())
508    }
509
510    /// Get current database file size in bytes
511    pub fn current_size(&self) -> u64 {
512        let file = self.file.read();
513        file.metadata().map(|m| m.len()).unwrap_or(0)
514    }
515
516    /// Get configured maximum size (None if unlimited)
517    pub fn max_size(&self) -> Option<u64> {
518        self.storage_limits.max_database_size
519    }
520
521    /// Get remaining capacity before hitting limit (None if unlimited)
522    pub fn remaining_capacity(&self) -> Option<u64> {
523        if let Some(limit) = self.storage_limits.effective_database_limit() {
524            let current = self.current_size();
525            Some(limit.saturating_sub(current))
526        } else {
527            None
528        }
529    }
530
531    /// Check if an operation of given size would exceed the limit
532    pub fn would_exceed_limit(&self, additional_bytes: u64) -> bool {
533        if let Some(limit) = self.storage_limits.effective_database_limit() {
534            let current = self.current_size();
535            current.saturating_add(additional_bytes) > limit
536        } else {
537            false
538        }
539    }
540
541    /// Check if sufficient disk space is available for an operation
542    ///
543    /// Returns Ok(()) if enough space, or Err(InsufficientDiskSpace) if not.
544    /// This performs a best-effort check. On platforms where we can't determine
545    /// available space, we optimistically allow the operation (it will fail with
546    /// I/O error if there's actually insufficient space).
547    pub fn check_disk_space(&self, _required_bytes: u64) -> Result<()> {
548        // Note: A proper implementation would use platform-specific APIs to check
549        // available disk space (statvfs on Unix, GetDiskFreeSpaceEx on Windows).
550        // For now, we do a best-effort check by attempting the operation and
551        // letting the OS return an error if there's insufficient space.
552        //
553        // Future improvement: Use the fs2 crate or platform-specific syscalls
554        // to check available space before attempting the write.
555        Ok(())
556    }
557
558    /// Get the on-disk size of a page (including encryption overhead if any)
559    fn disk_page_size(&self) -> usize {
560        if self.cipher.is_some() {
561            self.page_size + ENCRYPTION_OVERHEAD
562        } else {
563            self.page_size
564        }
565    }
566
567    /// Calculate file offset for a page
568    fn page_offset(&self, page_id: PageId) -> u64 {
569        // Page 0 starts at offset page_size (after superblock + padding)
570        // But the superblock takes up less than a page, so:
571        // Offset 0-511: Superblock
572        // Offset 512 to page_size-1: Padding
573        // Offset page_size: Page 1 (schema root)
574        // etc.
575        // When encrypted, pages are larger on disk
576        self.disk_page_size() as u64 * page_id.0
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use tempfile::TempDir;
584
585    #[test]
586    fn test_superblock_serialization() {
587        let sb = Superblock::new(4096, false, [0u8; SALT_SIZE]);
588        let serialized = sb.serialize();
589        let deserialized = Superblock::deserialize(&serialized).unwrap();
590
591        assert_eq!(sb.format_version, deserialized.format_version);
592        assert_eq!(sb.page_size, deserialized.page_size);
593        assert_eq!(sb.total_pages, deserialized.total_pages);
594        assert_eq!(sb.encryption_flags, deserialized.encryption_flags);
595    }
596
597    #[test]
598    fn test_superblock_encryption_flags() {
599        let salt = [0x42u8; SALT_SIZE];
600        let sb = Superblock::new(4096, true, salt);
601        let serialized = sb.serialize();
602        let deserialized = Superblock::deserialize(&serialized).unwrap();
603
604        assert!(deserialized.is_encrypted());
605        assert_eq!(deserialized.encryption_salt, salt);
606    }
607
608    #[test]
609    fn test_file_manager() {
610        let tmp = TempDir::new().unwrap();
611        let path = tmp.path().join("test.db");
612
613        let config = Config::new(&path);
614        let fm = FileManager::open(&config).unwrap();
615        fm.init_if_needed(&config).unwrap();
616
617        // Write a page
618        let page_data = vec![0xAB; 4096];
619        fm.write_page(PageId(3), &page_data).unwrap();
620
621        // Read it back
622        let read_data = fm.read_page(PageId(3)).unwrap();
623        assert_eq!(page_data, read_data);
624    }
625
626    #[test]
627    fn test_file_manager_encrypted() {
628        let tmp = TempDir::new().unwrap();
629        let path = tmp.path().join("encrypted.db");
630
631        let config = Config::new(&path).with_password("test_password");
632        let fm = FileManager::open(&config).unwrap();
633        fm.init_if_needed(&config).unwrap();
634
635        assert!(fm.is_encrypted());
636
637        // Write a page
638        let page_data = vec![0xAB; 4096];
639        fm.write_page(PageId(3), &page_data).unwrap();
640
641        // Read it back
642        let read_data = fm.read_page(PageId(3)).unwrap();
643        assert_eq!(page_data, read_data);
644    }
645
646    #[test]
647    fn test_encrypted_db_reopen() {
648        let tmp = TempDir::new().unwrap();
649        let path = tmp.path().join("encrypted_reopen.db");
650
651        // Create encrypted DB
652        {
653            let config = Config::new(&path).with_password("my_secret");
654            let fm = FileManager::open(&config).unwrap();
655            fm.init_if_needed(&config).unwrap();
656
657            let page_data = vec![0xCD; 4096];
658            fm.write_page(PageId(3), &page_data).unwrap();
659        }
660
661        // Reopen with correct password
662        {
663            let config = Config::new(&path).with_password("my_secret");
664            let fm = FileManager::open(&config).unwrap();
665
666            let read_data = fm.read_page(PageId(3)).unwrap();
667            assert_eq!(read_data, vec![0xCD; 4096]);
668        }
669    }
670
671    #[test]
672    fn test_encrypted_db_wrong_password() {
673        let tmp = TempDir::new().unwrap();
674        let path = tmp.path().join("encrypted_wrong.db");
675
676        // Create encrypted DB
677        {
678            let config = Config::new(&path).with_password("correct_password");
679            let fm = FileManager::open(&config).unwrap();
680            fm.init_if_needed(&config).unwrap();
681
682            let page_data = vec![0xEF; 4096];
683            fm.write_page(PageId(3), &page_data).unwrap();
684        }
685
686        // Try with wrong password - should fail on decrypt
687        {
688            let config = Config::new(&path).with_password("wrong_password");
689            let fm = FileManager::open(&config).unwrap();
690
691            let result = fm.read_page(PageId(3));
692            assert!(result.is_err());
693        }
694    }
695
696    #[test]
697    fn test_file_manager_lz4_compression() {
698        let tmp = TempDir::new().unwrap();
699        let path = tmp.path().join("compressed_lz4.db");
700
701        let config = Config::new(&path).with_lz4_compression();
702        let fm = FileManager::open(&config).unwrap();
703        fm.init_if_needed(&config).unwrap();
704
705        assert!(fm.is_compressed());
706
707        // Write a compressible page (repeated data compresses well)
708        let page_data = vec![0xAB; 4096];
709        fm.write_page(PageId(3), &page_data).unwrap();
710
711        // Read it back
712        let read_data = fm.read_page(PageId(3)).unwrap();
713        assert_eq!(page_data, read_data);
714
715        // Check compression stats
716        let stats = fm.compression_stats();
717        assert!(stats.compression_ratio() < 1.0, "Data should be compressed");
718    }
719
720    #[test]
721    fn test_file_manager_zstd_compression() {
722        let tmp = TempDir::new().unwrap();
723        let path = tmp.path().join("compressed_zstd.db");
724
725        let config = Config::new(&path).with_zstd_compression();
726        let fm = FileManager::open(&config).unwrap();
727        fm.init_if_needed(&config).unwrap();
728
729        assert!(fm.is_compressed());
730
731        // Write a compressible page
732        let page_data = vec![0xCD; 4096];
733        fm.write_page(PageId(3), &page_data).unwrap();
734
735        // Read it back
736        let read_data = fm.read_page(PageId(3)).unwrap();
737        assert_eq!(page_data, read_data);
738    }
739
740    #[test]
741    fn test_file_manager_compression_with_encryption() {
742        let tmp = TempDir::new().unwrap();
743        let path = tmp.path().join("compressed_encrypted.db");
744
745        let config = Config::new(&path)
746            .with_lz4_compression()
747            .with_password("test_password");
748        let fm = FileManager::open(&config).unwrap();
749        fm.init_if_needed(&config).unwrap();
750
751        assert!(fm.is_compressed());
752        assert!(fm.is_encrypted());
753
754        // Write a compressible page
755        let page_data = vec![0xEF; 4096];
756        fm.write_page(PageId(3), &page_data).unwrap();
757
758        // Read it back
759        let read_data = fm.read_page(PageId(3)).unwrap();
760        assert_eq!(page_data, read_data);
761    }
762
763    #[test]
764    fn test_compression_with_varied_data() {
765        let tmp = TempDir::new().unwrap();
766        let path = tmp.path().join("varied_data.db");
767
768        let config = Config::new(&path).with_lz4_compression();
769        let fm = FileManager::open(&config).unwrap();
770        fm.init_if_needed(&config).unwrap();
771
772        // Write varied data (semi-random pattern)
773        let page_data: Vec<u8> = (0..4096).map(|i| (i * 17 + i / 3) as u8).collect();
774        fm.write_page(PageId(3), &page_data).unwrap();
775
776        // Read it back
777        let read_data = fm.read_page(PageId(3)).unwrap();
778        assert_eq!(page_data, read_data);
779    }
780
781    #[test]
782    fn test_storage_limit_enforcement() {
783        use featherdb_core::StorageLimitsConfig;
784
785        let tmp = TempDir::new().unwrap();
786        let path = tmp.path().join("limited.db");
787
788        // Create a database with a very small limit (5MB)
789        let storage_limits = StorageLimitsConfig::new()
790            .with_max_database_size_mb(5)
791            .with_safety_margin_percent(10);
792
793        let config = Config::new(&path).with_storage_limits(storage_limits);
794        let fm = FileManager::open(&config).unwrap();
795        fm.init_if_needed(&config).unwrap();
796
797        // Check size tracking methods
798        let current = fm.current_size();
799        assert!(current > 0); // Should have superblock + initial pages
800
801        let max = fm.max_size();
802        assert_eq!(max, Some(5 * 1024 * 1024));
803
804        let remaining = fm.remaining_capacity();
805        assert!(remaining.is_some());
806        assert!(remaining.unwrap() < max.unwrap());
807
808        // Try to extend beyond limit - this should eventually fail
809        // Note: We can't easily test hitting the limit without filling the entire 5MB,
810        // so we just verify the methods work correctly
811        let would_exceed = fm.would_exceed_limit(10 * 1024 * 1024); // 10MB
812        assert!(would_exceed);
813
814        let would_fit = fm.would_exceed_limit(100); // 100 bytes
815        assert!(!would_fit || fm.remaining_capacity().unwrap() < 100);
816    }
817
818    #[test]
819    fn test_storage_limit_exceeded_error() {
820        use featherdb_core::StorageLimitsConfig;
821
822        let tmp = TempDir::new().unwrap();
823        let path = tmp.path().join("tiny_limit.db");
824
825        // Create a database with a tiny limit that we can easily exceed
826        // Set to 1MB with 5% safety margin = ~950KB effective limit
827        let storage_limits = StorageLimitsConfig::new()
828            .with_max_database_size_mb(1)
829            .with_safety_margin_percent(5);
830
831        let config = Config::new(&path).with_storage_limits(storage_limits);
832        let fm = FileManager::open(&config).unwrap();
833        fm.init_if_needed(&config).unwrap();
834
835        // Try to extend many pages until we hit the limit
836        // Each page is 4KB, so we should hit 1MB (~250 pages) eventually
837        let mut extended = 0;
838        loop {
839            match fm.extend() {
840                Ok(_) => {
841                    extended += 1;
842                    // Safety check to avoid infinite loop
843                    if extended > 300 {
844                        panic!("Should have hit storage limit by now");
845                    }
846                }
847                Err(Error::StorageLimitExceeded { .. }) => {
848                    // Expected error
849                    break;
850                }
851                Err(e) => {
852                    panic!("Unexpected error: {}", e);
853                }
854            }
855        }
856
857        assert!(extended > 0, "Should have extended at least some pages");
858        assert!(extended < 300, "Should have hit limit before 300 pages");
859    }
860
861    #[test]
862    fn test_unlimited_storage() {
863        let tmp = TempDir::new().unwrap();
864        let path = tmp.path().join("unlimited.db");
865
866        let config = Config::new(&path); // No limits
867        let fm = FileManager::open(&config).unwrap();
868        fm.init_if_needed(&config).unwrap();
869
870        assert_eq!(fm.max_size(), None);
871        assert_eq!(fm.remaining_capacity(), None);
872        assert!(!fm.would_exceed_limit(u64::MAX));
873
874        // Should be able to extend without limit checks
875        fm.extend().unwrap();
876    }
877}