nano_wal/
lib.rs

1//! A simple Write-Ahead Log (WAL) implementation with per-key segment sets.
2//!
3//! This crate provides a lightweight, performant WAL implementation designed for
4//! append-only operations with automatic segment rotation and expiration.
5//!
6//! # Features
7//!
8//! - Per-key segment isolation for better performance
9//! - Automatic segment rotation based on retention policies
10//! - Optional record headers for metadata storage
11//! - Configurable durability guarantees
12//! - Batch operations for improved throughput
13//!
14//! # Examples
15//!
16//! ```no_run
17//! use nano_wal::{Wal, WalOptions};
18//! use bytes::Bytes;
19//! use std::time::Duration;
20//!
21//! # fn main() -> Result<(), nano_wal::WalError> {
22//! // Create a new WAL with custom retention
23//! let options = WalOptions::default()
24//!     .retention(Duration::from_secs(3600));
25//! let mut wal = Wal::new("./my_wal", options)?;
26//!
27//! // Append an entry
28//! let entry_ref = wal.append_entry(
29//!     "user_123",
30//!     None,
31//!     Bytes::from("event data"),
32//!     true
33//! )?;
34//!
35//! // Read the entry back
36//! let data = wal.read_entry_at(entry_ref)?;
37//! # Ok(())
38//! # }
39//! ```
40
41use bytes::Bytes;
42use chrono::Utc;
43use std::collections::HashMap;
44use std::fmt::{self, Debug, Display};
45use std::fs::{self, File, OpenOptions};
46use std::hash::{Hash, Hasher};
47use std::io::{self, Read, Seek, SeekFrom, Write};
48use std::path::{Path, PathBuf};
49use std::time::Duration;
50
51/// UTF-8 'NANO-LOG' signature for segment file headers.
52///
53/// This signature is written at the beginning of each segment file
54/// to identify it as a valid nano-wal segment. The value is chosen
55/// to be human-readable in hex editors while being unlikely to occur
56/// naturally in data files.
57const NANO_LOG_SIGNATURE: [u8; 8] = [b'N', b'A', b'N', b'O', b'-', b'L', b'O', b'G'];
58
59/// UTF-8 'NANORC' signature for individual records.
60///
61/// This signature precedes each record within a segment file,
62/// allowing for record boundary detection and corruption recovery.
63/// The 6-byte size is chosen to balance overhead with reliability.
64const NANO_REC_SIGNATURE: [u8; 6] = [b'N', b'A', b'N', b'O', b'R', b'C'];
65
66/// Maximum size for record headers in bytes (64KB).
67///
68/// Headers larger than this will be rejected to prevent memory exhaustion
69/// and ensure reasonable performance. This size is sufficient for most
70/// metadata use cases while preventing abuse.
71const MAX_HEADER_SIZE: usize = 65535;
72
73/// Custom error type for WAL operations.
74///
75/// Provides detailed error information for debugging and error handling.
76#[derive(Debug)]
77pub enum WalError {
78    /// I/O operation failed
79    Io(io::Error),
80    /// Invalid configuration provided
81    InvalidConfig(String),
82    /// Entry not found at the specified location
83    EntryNotFound(String),
84    /// Data corruption detected
85    CorruptedData(String),
86    /// Header size exceeds maximum allowed
87    HeaderTooLarge { size: usize, max: usize },
88}
89
90impl fmt::Display for WalError {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self {
93            WalError::Io(e) => write!(f, "I/O error: {}", e),
94            WalError::InvalidConfig(msg) => write!(f, "Invalid configuration: {}", msg),
95            WalError::EntryNotFound(msg) => write!(f, "Entry not found: {}", msg),
96            WalError::CorruptedData(msg) => write!(f, "Data corruption: {}", msg),
97            WalError::HeaderTooLarge { size, max } => {
98                write!(f, "Header size {} exceeds maximum {}", size, max)
99            }
100        }
101    }
102}
103
104impl std::error::Error for WalError {
105    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
106        match self {
107            WalError::Io(e) => Some(e),
108            _ => None,
109        }
110    }
111}
112
113impl From<io::Error> for WalError {
114    fn from(e: io::Error) -> Self {
115        WalError::Io(e)
116    }
117}
118
119/// Custom Result type for WAL operations.
120pub type Result<T> = std::result::Result<T, WalError>;
121
122/// Reference to a specific entry location in the WAL.
123///
124/// An `EntryRef` uniquely identifies an entry's location within the WAL,
125/// allowing for efficient random access reads.
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub struct EntryRef {
128    /// Hash of the key for segment set identification
129    pub key_hash: u64,
130    /// Sequence number of the segment file
131    pub sequence_number: u64,
132    /// Byte offset within the segment file (after header)
133    pub offset: u64,
134}
135
136/// Configuration options for WAL behavior.
137///
138/// # Examples
139///
140/// ```
141/// use nano_wal::WalOptions;
142/// use std::time::Duration;
143///
144/// let options = WalOptions::default()
145///     .retention(Duration::from_secs(3600))
146///     .segments_per_retention_period(5);
147/// ```
148#[derive(Debug, Clone)]
149pub struct WalOptions {
150    /// Duration for which entries are retained before expiration
151    pub entry_retention: Duration,
152    /// Number of segments per retention period for rotation
153    pub segments_per_retention_period: u32,
154}
155
156impl Default for WalOptions {
157    fn default() -> Self {
158        Self {
159            entry_retention: Duration::from_secs(60 * 60 * 24 * 7), // 1 week
160            segments_per_retention_period: 10,
161        }
162    }
163}
164
165impl WalOptions {
166    /// Creates options with custom retention duration.
167    ///
168    /// # Examples
169    ///
170    /// ```
171    /// use nano_wal::WalOptions;
172    /// use std::time::Duration;
173    ///
174    /// let options = WalOptions::with_retention(Duration::from_secs(3600));
175    /// ```
176    pub fn with_retention(retention: Duration) -> Self {
177        Self {
178            entry_retention: retention,
179            ..Default::default()
180        }
181    }
182
183    /// Creates options with custom segment count.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use nano_wal::WalOptions;
189    ///
190    /// let options = WalOptions::with_segments_per_retention_period(20);
191    /// ```
192    pub fn with_segments_per_retention_period(segments: u32) -> Self {
193        Self {
194            segments_per_retention_period: segments,
195            ..Default::default()
196        }
197    }
198
199    /// Sets retention period (chainable).
200    pub fn retention(mut self, retention: Duration) -> Self {
201        self.entry_retention = retention;
202        self
203    }
204
205    /// Sets segments per retention period (chainable).
206    pub fn segments_per_retention_period(mut self, segments: u32) -> Self {
207        self.segments_per_retention_period = segments;
208        self
209    }
210
211    /// Validates the configuration.
212    ///
213    /// # Errors
214    ///
215    /// Returns `WalError::InvalidConfig` if:
216    /// - `entry_retention` is zero
217    /// - `segments_per_retention_period` is zero
218    pub fn validate(&self) -> Result<()> {
219        if self.entry_retention.as_secs() == 0 {
220            return Err(WalError::InvalidConfig(
221                "entry_retention must be greater than 0".to_string(),
222            ));
223        }
224        if self.segments_per_retention_period == 0 {
225            return Err(WalError::InvalidConfig(
226                "segments_per_retention_period must be greater than 0".to_string(),
227            ));
228        }
229        Ok(())
230    }
231}
232
233/// Information about an active segment for a specific key.
234#[derive(Debug)]
235struct ActiveSegment {
236    /// Current active file handle
237    file: File,
238    /// Sequence number of this segment
239    sequence_number: u64,
240    /// Unix timestamp when this segment expires
241    expiration_timestamp: u64,
242}
243
244/// Write-Ahead Log with per-key segment sets.
245///
246/// The `Wal` struct provides the main interface for WAL operations,
247/// managing segment files and ensuring durability guarantees.
248#[derive(Debug)]
249pub struct Wal {
250    dir: PathBuf,
251    options: WalOptions,
252    /// Map from key hash to active segment info
253    active_segments: HashMap<u64, ActiveSegment>,
254    /// Map from key hash to next sequence number
255    next_sequence: HashMap<u64, u64>,
256}
257
258impl Wal {
259    /// Creates a new WAL instance.
260    ///
261    /// # Arguments
262    ///
263    /// * `filepath` - Directory path for WAL files
264    /// * `options` - Configuration options
265    ///
266    /// # Errors
267    ///
268    /// Returns `WalError::InvalidConfig` if options are invalid.
269    /// Returns `WalError::Io` if directory creation fails.
270    ///
271    /// # Examples
272    ///
273    /// ```no_run
274    /// use nano_wal::{Wal, WalOptions};
275    ///
276    /// let wal = Wal::new("./my_wal", WalOptions::default())?;
277    /// # Ok::<(), nano_wal::WalError>(())
278    /// ```
279    pub fn new(filepath: &str, options: WalOptions) -> Result<Self> {
280        options.validate()?;
281
282        let dir = Path::new(filepath);
283        if !dir.exists() {
284            fs::create_dir_all(dir)?;
285        }
286
287        let mut wal = Wal {
288            dir: dir.to_path_buf(),
289            options,
290            active_segments: HashMap::new(),
291            next_sequence: HashMap::new(),
292        };
293
294        wal.scan_existing_files()?;
295        Ok(wal)
296    }
297
298    /// Scans existing files to determine next sequence numbers.
299    fn scan_existing_files(&mut self) -> Result<()> {
300        if let Ok(entries) = fs::read_dir(&self.dir) {
301            for entry in entries.flatten() {
302                if let Some(filename) = entry.file_name().to_str() {
303                    if filename.ends_with(".log") {
304                        if let Some((key_hash, sequence)) = self.parse_filename(filename) {
305                            let current_max = *self.next_sequence.get(&key_hash).unwrap_or(&0);
306                            self.next_sequence
307                                .insert(key_hash, current_max.max(sequence + 1));
308                        }
309                    }
310                }
311            }
312        }
313        Ok(())
314    }
315
316    /// Parses segment filename to extract key hash and sequence.
317    fn parse_filename(&self, filename: &str) -> Option<(u64, u64)> {
318        if let Some(name_part) = filename.strip_suffix(".log") {
319            let parts: Vec<&str> = name_part.split('-').collect();
320            if parts.len() >= 3 {
321                let len = parts.len();
322                if let (Ok(sequence), Ok(key_hash)) =
323                    (parts[len - 1].parse::<u64>(), parts[len - 2].parse::<u64>())
324                {
325                    return Some((key_hash, sequence));
326                }
327            }
328        }
329        None
330    }
331
332    /// Generates a filename for a segment.
333    fn generate_filename<K: Display>(&self, key: &K, key_hash: u64, sequence: u64) -> String {
334        let key_str = format!("{}", key);
335        let sanitized_key = key_str
336            .chars()
337            .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
338            .take(20)
339            .collect::<String>();
340
341        format!("{}-{}-{:04}.log", sanitized_key, key_hash, sequence)
342    }
343
344    /// Gets or creates an active segment for the given key.
345    fn get_or_create_active_segment<K: Hash + AsRef<[u8]> + Display>(
346        &mut self,
347        key: &K,
348    ) -> Result<u64> {
349        let mut hasher = std::collections::hash_map::DefaultHasher::new();
350        key.as_ref().hash(&mut hasher);
351        let key_hash = hasher.finish();
352
353        let now = Utc::now().timestamp() as u64;
354
355        // Check if rotation is needed
356        if let Some(active) = self.active_segments.get(&key_hash) {
357            if now >= active.expiration_timestamp {
358                self.active_segments.remove(&key_hash);
359            }
360        }
361
362        // Create new segment if needed
363        if !self.active_segments.contains_key(&key_hash) {
364            let sequence = *self.next_sequence.get(&key_hash).unwrap_or(&1);
365            self.next_sequence.insert(key_hash, sequence + 1);
366
367            let segment_duration = self.options.entry_retention.as_secs()
368                / self.options.segments_per_retention_period as u64;
369            let expiration_timestamp = now + segment_duration;
370
371            let filename = self.generate_filename(key, key_hash, sequence);
372            let file_path = self.dir.join(&filename);
373
374            let mut file = OpenOptions::new()
375                .create(true)
376                
377                .append(true)
378                .open(&file_path)?;
379
380            self.write_file_header(&mut file, key, expiration_timestamp)?;
381
382            let active_segment = ActiveSegment {
383                file,
384                sequence_number: sequence,
385                expiration_timestamp,
386            };
387
388            self.active_segments.insert(key_hash, active_segment);
389        }
390
391        Ok(key_hash)
392    }
393
394    /// Writes file header for new segment.
395    fn write_file_header<K: AsRef<[u8]>>(
396        &self,
397        file: &mut File,
398        key: &K,
399        expiration_timestamp: u64,
400    ) -> Result<()> {
401        file.write_all(&NANO_LOG_SIGNATURE)?;
402        file.write_all(&0u64.to_le_bytes())?; // Sequence placeholder
403        file.write_all(&expiration_timestamp.to_le_bytes())?;
404
405        let key_bytes = key.as_ref();
406        let key_len = key_bytes.len() as u64;
407        file.write_all(&key_len.to_le_bytes())?;
408        file.write_all(key_bytes)?;
409
410        Ok(())
411    }
412
413    /// Appends an entry to the WAL.
414    ///
415    /// # Arguments
416    ///
417    /// * `key` - Entry key for segment selection
418    /// * `header` - Optional metadata header (max 64KB)
419    /// * `content` - Entry content
420    /// * `durable` - If true, syncs to disk before returning
421    ///
422    /// # Errors
423    ///
424    /// Returns `WalError::HeaderTooLarge` if header exceeds 64KB.
425    /// Returns `WalError::Io` for I/O failures.
426    ///
427    /// # Examples
428    ///
429    /// ```no_run
430    /// # use nano_wal::{Wal, WalOptions};
431    /// # use bytes::Bytes;
432    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
433    /// let entry_ref = wal.append_entry(
434    ///     "user_123",
435    ///     Some(Bytes::from("metadata")),
436    ///     Bytes::from("data"),
437    ///     true
438    /// )?;
439    /// # Ok::<(), nano_wal::WalError>(())
440    /// ```
441    pub fn append_entry<K: Hash + AsRef<[u8]> + Display>(
442        &mut self,
443        key: K,
444        header: Option<Bytes>,
445        content: Bytes,
446        durable: bool,
447    ) -> Result<EntryRef> {
448        // Validate header size
449        if let Some(ref h) = header {
450            if h.len() > MAX_HEADER_SIZE {
451                return Err(WalError::HeaderTooLarge {
452                    size: h.len(),
453                    max: MAX_HEADER_SIZE,
454                });
455            }
456        }
457
458        let key_hash = self.get_or_create_active_segment(&key)?;
459        let active_segment = self.active_segments.get_mut(&key_hash).unwrap();
460
461        let current_position = active_segment.file.stream_position()?;
462        let file_header_size = 8 + 8 + 8 + 8 + key.as_ref().len() as u64;
463        let entry_offset = current_position - file_header_size;
464
465        // Write record
466        active_segment.file.write_all(&NANO_REC_SIGNATURE)?;
467
468        let header_len = header.as_ref().map(|h| h.len()).unwrap_or(0);
469        active_segment
470            .file
471            .write_all(&(header_len as u16).to_le_bytes())?;
472        if let Some(header_bytes) = &header {
473            active_segment.file.write_all(header_bytes.as_ref())?;
474        }
475
476        let content_len = content.len() as u64;
477        active_segment.file.write_all(&content_len.to_le_bytes())?;
478        active_segment.file.write_all(content.as_ref())?;
479
480        if durable {
481            active_segment.file.sync_data()?;
482        } else {
483            active_segment.file.flush()?;
484        }
485
486        Ok(EntryRef {
487            key_hash,
488            sequence_number: active_segment.sequence_number,
489            offset: entry_offset,
490        })
491    }
492
493    /// Appends multiple entries in a batch.
494    ///
495    /// Batch operations provide better throughput by reducing I/O overhead.
496    ///
497    /// # Arguments
498    ///
499    /// * `entries` - Iterator of (key, header, content) tuples
500    /// * `durable` - If true, syncs after all entries are written
501    ///
502    /// # Errors
503    ///
504    /// Returns first error encountered; partial writes may occur.
505    ///
506    /// # Examples
507    ///
508    /// ```no_run
509    /// # use nano_wal::{Wal, WalOptions};
510    /// # use bytes::Bytes;
511    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
512    /// let entries = vec![
513    ///     ("key1", None, Bytes::from("data1")),
514    ///     ("key2", Some(Bytes::from("meta")), Bytes::from("data2")),
515    /// ];
516    /// let refs = wal.append_batch(entries, true)?;
517    /// # Ok::<(), nano_wal::WalError>(())
518    /// ```
519    pub fn append_batch<K, I>(&mut self, entries: I, durable: bool) -> Result<Vec<EntryRef>>
520    where
521        K: Hash + AsRef<[u8]> + Display,
522        I: IntoIterator<Item = (K, Option<Bytes>, Bytes)>,
523    {
524        let mut refs = Vec::new();
525
526        for (key, header, content) in entries {
527            refs.push(self.append_entry(key, header, content, false)?);
528        }
529
530        if durable {
531            self.sync()?;
532        }
533
534        Ok(refs)
535    }
536
537    /// Logs an entry with durability guarantee.
538    ///
539    /// Convenience method equivalent to `append_entry(key, header, content, true)`.
540    ///
541    /// # Examples
542    ///
543    /// ```no_run
544    /// # use nano_wal::{Wal, WalOptions};
545    /// # use bytes::Bytes;
546    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
547    /// wal.log_entry("key", None, Bytes::from("data"))?;
548    /// # Ok::<(), nano_wal::WalError>(())
549    /// ```
550    pub fn log_entry<K: Hash + AsRef<[u8]> + Display>(
551        &mut self,
552        key: K,
553        header: Option<Bytes>,
554        content: Bytes,
555    ) -> Result<EntryRef> {
556        self.append_entry(key, header, content, true)
557    }
558
559    /// Enumerates all keys in the WAL.
560    ///
561    /// # Errors
562    ///
563    /// Returns `WalError::Io` for filesystem errors.
564    ///
565    /// # Examples
566    ///
567    /// ```no_run
568    /// # use nano_wal::{Wal, WalOptions};
569    /// # let wal = Wal::new("./wal", WalOptions::default())?;
570    /// for key in wal.enumerate_keys()? {
571    ///     println!("Found key: {}", key);
572    /// }
573    /// # Ok::<(), nano_wal::WalError>(())
574    /// ```
575    pub fn enumerate_keys(&self) -> Result<impl Iterator<Item = String>> {
576        let mut keys = std::collections::HashSet::new();
577
578        if let Ok(entries) = fs::read_dir(&self.dir) {
579            for entry in entries.flatten() {
580                if let Some(filename) = entry.file_name().to_str() {
581                    if filename.ends_with(".log") {
582                        let segment_path = entry.path();
583                        if let Ok(key) = self.read_key_from_file(&segment_path) {
584                            keys.insert(key);
585                        }
586                    }
587                }
588            }
589        }
590
591        Ok(keys.into_iter())
592    }
593
594    /// Reads key from segment file header.
595    fn read_key_from_file(&self, file_path: &Path) -> Result<String> {
596        let mut file = File::open(file_path)?;
597
598        let mut signature_buf = [0u8; 8];
599        file.read_exact(&mut signature_buf)?;
600        if signature_buf != NANO_LOG_SIGNATURE {
601            return Err(WalError::CorruptedData(
602                "Invalid NANO-LOG signature".to_string(),
603            ));
604        }
605
606        file.seek(SeekFrom::Current(16))?; // Skip sequence and expiration
607
608        let mut key_len_bytes = [0u8; 8];
609        file.read_exact(&mut key_len_bytes)?;
610        let key_len = u64::from_le_bytes(key_len_bytes);
611
612        let mut key_bytes = vec![0u8; key_len as usize];
613        file.read_exact(&mut key_bytes)?;
614
615        Ok(String::from_utf8_lossy(&key_bytes).to_string())
616    }
617
618    /// Enumerates records for a specific key.
619    ///
620    /// # Arguments
621    ///
622    /// * `key` - Key to enumerate records for
623    ///
624    /// # Errors
625    ///
626    /// Returns `WalError::Io` for filesystem errors.
627    ///
628    /// # Examples
629    ///
630    /// ```no_run
631    /// # use nano_wal::{Wal, WalOptions};
632    /// # let wal = Wal::new("./wal", WalOptions::default())?;
633    /// for record in wal.enumerate_records("my_key")? {
634    ///     println!("Record size: {}", record.len());
635    /// }
636    /// # Ok::<(), nano_wal::WalError>(())
637    /// ```
638    pub fn enumerate_records<K: Hash + AsRef<[u8]> + Display>(
639        &self,
640        key: K,
641    ) -> Result<impl Iterator<Item = Bytes>> {
642        let mut hasher = std::collections::hash_map::DefaultHasher::new();
643        key.as_ref().hash(&mut hasher);
644        let key_hash = hasher.finish();
645
646        let mut records = Vec::new();
647
648        let key_str = format!("{}", key);
649        let sanitized_key = key_str
650            .chars()
651            .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
652            .take(20)
653            .collect::<String>();
654
655        if let Ok(entries) = fs::read_dir(&self.dir) {
656            let mut segment_files = Vec::new();
657
658            for entry in entries.flatten() {
659                if let Some(filename) = entry.file_name().to_str() {
660                    if filename.starts_with(&format!("{}-{}-", sanitized_key, key_hash))
661                        && filename.ends_with(".log")
662                    {
663                        if let Some((_, sequence)) = self.parse_filename(filename) {
664                            segment_files.push((sequence, entry.path()));
665                        }
666                    }
667                }
668            }
669
670            segment_files.sort_by_key(|(seq, _)| *seq);
671
672            for (_, file_path) in segment_files {
673                if let Ok(file_records) = self.read_records_from_segment(&file_path) {
674                    records.extend(file_records);
675                }
676            }
677        }
678
679        Ok(records.into_iter())
680    }
681
682    /// Reads all records from a segment file.
683    fn read_records_from_segment(&self, file_path: &Path) -> Result<Vec<Bytes>> {
684        let mut file = File::open(file_path)?;
685        let mut records = Vec::new();
686
687        self.skip_file_header(&mut file)?;
688
689        loop {
690            let mut signature_buf = [0u8; 6];
691            match file.read_exact(&mut signature_buf) {
692                Ok(_) => {
693                    if signature_buf != NANO_REC_SIGNATURE {
694                        break;
695                    }
696                }
697                Err(_) => break,
698            }
699
700            let mut header_len_bytes = [0u8; 2];
701            if file.read_exact(&mut header_len_bytes).is_err() {
702                break;
703            }
704            let header_len = u16::from_le_bytes(header_len_bytes);
705
706            if file.seek(SeekFrom::Current(header_len as i64)).is_err() {
707                break;
708            }
709
710            let mut content_len_bytes = [0u8; 8];
711            if file.read_exact(&mut content_len_bytes).is_err() {
712                break;
713            }
714            let content_len = u64::from_le_bytes(content_len_bytes);
715
716            let mut content = vec![0u8; content_len as usize];
717            if file.read_exact(&mut content).is_err() {
718                break;
719            }
720
721            records.push(Bytes::from(content));
722        }
723
724        Ok(records)
725    }
726
727    /// Skips file header to position at first record.
728    fn skip_file_header(&self, file: &mut File) -> Result<()> {
729        file.seek(SeekFrom::Current(24))?; // Skip signature, sequence, expiration
730
731        let mut key_len_bytes = [0u8; 8];
732        file.read_exact(&mut key_len_bytes)?;
733        let key_len = u64::from_le_bytes(key_len_bytes);
734        file.seek(SeekFrom::Current(key_len as i64))?;
735
736        Ok(())
737    }
738
739    /// Reads entry at specified location.
740    ///
741    /// # Arguments
742    ///
743    /// * `entry_ref` - Reference to the entry location
744    ///
745    /// # Errors
746    ///
747    /// Returns `WalError::EntryNotFound` if segment doesn't exist.
748    /// Returns `WalError::CorruptedData` if signature is invalid.
749    ///
750    /// # Examples
751    ///
752    /// ```no_run
753    /// # use nano_wal::{Wal, WalOptions};
754    /// # use bytes::Bytes;
755    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
756    /// # let entry_ref = wal.append_entry("key", None, Bytes::from("data"), true)?;
757    /// let data = wal.read_entry_at(entry_ref)?;
758    /// # Ok::<(), nano_wal::WalError>(())
759    /// ```
760    pub fn read_entry_at(&self, entry_ref: EntryRef) -> Result<Bytes> {
761        if let Ok(entries) = fs::read_dir(&self.dir) {
762            for entry in entries.flatten() {
763                if let Some(filename) = entry.file_name().to_str() {
764                    if let Some((key_hash, sequence)) = self.parse_filename(filename) {
765                        if key_hash == entry_ref.key_hash && sequence == entry_ref.sequence_number {
766                            let file_path = entry.path();
767                            return self.read_entry_from_file(&file_path, entry_ref.offset);
768                        }
769                    }
770                }
771            }
772        }
773
774        Err(WalError::EntryNotFound(format!(
775            "Segment for key_hash {} sequence {} not found",
776            entry_ref.key_hash, entry_ref.sequence_number
777        )))
778    }
779
780    /// Reads specific entry from segment file.
781    fn read_entry_from_file(&self, file_path: &Path, offset: u64) -> Result<Bytes> {
782        let mut file = File::open(file_path)?;
783
784        self.skip_file_header(&mut file)?;
785        file.seek(SeekFrom::Current(offset as i64))?;
786
787        let mut signature_buf = [0u8; 6];
788        file.read_exact(&mut signature_buf)?;
789        if signature_buf != NANO_REC_SIGNATURE {
790            return Err(WalError::CorruptedData(
791                "NANORC signature not found".to_string(),
792            ));
793        }
794
795        let mut header_len_bytes = [0u8; 2];
796        file.read_exact(&mut header_len_bytes)?;
797        let header_len = u16::from_le_bytes(header_len_bytes);
798
799        file.seek(SeekFrom::Current(header_len as i64))?;
800
801        let mut content_len_bytes = [0u8; 8];
802        file.read_exact(&mut content_len_bytes)?;
803        let content_len = u64::from_le_bytes(content_len_bytes);
804
805        let mut content = vec![0u8; content_len as usize];
806        file.read_exact(&mut content)?;
807
808        Ok(Bytes::from(content))
809    }
810
811    /// Removes expired segments from disk.
812    ///
813    /// # Errors
814    ///
815    /// Returns `WalError::Io` for filesystem errors.
816    ///
817    /// # Examples
818    ///
819    /// ```no_run
820    /// # use nano_wal::{Wal, WalOptions};
821    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
822    /// wal.compact()?;
823    /// # Ok::<(), nano_wal::WalError>(())
824    /// ```
825    pub fn compact(&mut self) -> Result<()> {
826        let now = Utc::now().timestamp() as u64;
827
828        if let Ok(entries) = fs::read_dir(&self.dir) {
829            for entry in entries.flatten() {
830                if let Some(filename) = entry.file_name().to_str() {
831                    if filename.ends_with(".log") {
832                        let file_path = entry.path();
833
834                        if let Ok(mut file) = File::open(&file_path) {
835                            let mut signature = [0u8; 8];
836                            if file.read_exact(&mut signature).is_ok()
837                                && signature == NANO_LOG_SIGNATURE
838                            {
839                                let mut sequence_bytes = [0u8; 8];
840                                let mut expiration_bytes = [0u8; 8];
841
842                                if file.read_exact(&mut sequence_bytes).is_ok()
843                                    && file.read_exact(&mut expiration_bytes).is_ok()
844                                {
845                                    let expiration_timestamp = u64::from_le_bytes(expiration_bytes);
846
847                                    if now > expiration_timestamp {
848                                        let _ = fs::remove_file(&file_path);
849                                    }
850                                }
851                            }
852                        }
853                    }
854                }
855            }
856        }
857
858        Ok(())
859    }
860
861    /// Syncs all active segments to disk.
862    ///
863    /// # Errors
864    ///
865    /// Returns `WalError::Io` if sync fails.
866    ///
867    /// # Examples
868    ///
869    /// ```no_run
870    /// # use nano_wal::{Wal, WalOptions};
871    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
872    /// wal.sync()?;
873    /// # Ok::<(), nano_wal::WalError>(())
874    /// ```
875    pub fn sync(&mut self) -> Result<()> {
876        for active_segment in self.active_segments.values_mut() {
877            active_segment.file.sync_data()?;
878        }
879        Ok(())
880    }
881
882    /// Returns count of active segments.
883    ///
884    /// # Examples
885    ///
886    /// ```no_run
887    /// # use nano_wal::{Wal, WalOptions};
888    /// # let wal = Wal::new("./wal", WalOptions::default())?;
889    /// println!("Active segments: {}", wal.active_segment_count());
890    /// # Ok::<(), nano_wal::WalError>(())
891    /// ```
892    pub fn active_segment_count(&self) -> usize {
893        self.active_segments.len()
894    }
895
896    /// Shuts down WAL and removes all storage.
897    ///
898    /// # Errors
899    ///
900    /// Returns `WalError::Io` if removal fails.
901    ///
902    /// # Examples
903    ///
904    /// ```no_run
905    /// # use nano_wal::{Wal, WalOptions};
906    /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
907    /// wal.shutdown()?;
908    /// # Ok::<(), nano_wal::WalError>(())
909    /// ```
910    pub fn shutdown(&mut self) -> Result<()> {
911        self.active_segments.clear();
912        fs::remove_dir_all(&self.dir)?;
913        Ok(())
914    }
915}