nano_wal/lib.rs
1//! A simple Write-Ahead Log (WAL) implementation with per-key segment sets.
2//!
3//! This crate provides a lightweight, performant WAL implementation designed for
4//! append-only operations with automatic segment rotation and expiration.
5//!
6//! # Features
7//!
8//! - Per-key segment isolation for better performance
9//! - Automatic segment rotation based on retention policies
10//! - Optional record headers for metadata storage
11//! - Configurable durability guarantees
12//! - Batch operations for improved throughput
13//!
14//! # Examples
15//!
16//! ```no_run
17//! use nano_wal::{Wal, WalOptions};
18//! use bytes::Bytes;
19//! use std::time::Duration;
20//!
21//! # fn main() -> Result<(), nano_wal::WalError> {
22//! // Create a new WAL with custom retention
23//! let options = WalOptions::default()
24//! .retention(Duration::from_secs(3600));
25//! let mut wal = Wal::new("./my_wal", options)?;
26//!
27//! // Append an entry
28//! let entry_ref = wal.append_entry(
29//! "user_123",
30//! None,
31//! Bytes::from("event data"),
32//! true
33//! )?;
34//!
35//! // Read the entry back
36//! let data = wal.read_entry_at(entry_ref)?;
37//! # Ok(())
38//! # }
39//! ```
40
41use bytes::Bytes;
42use chrono::Utc;
43use std::collections::HashMap;
44use std::fmt::{self, Debug, Display};
45use std::fs::{self, File, OpenOptions};
46use std::hash::{Hash, Hasher};
47use std::io::{self, Read, Seek, SeekFrom, Write};
48use std::path::{Path, PathBuf};
49use std::time::Duration;
50
51/// UTF-8 'NANO-LOG' signature for segment file headers.
52///
53/// This signature is written at the beginning of each segment file
54/// to identify it as a valid nano-wal segment. The value is chosen
55/// to be human-readable in hex editors while being unlikely to occur
56/// naturally in data files.
57const NANO_LOG_SIGNATURE: [u8; 8] = [b'N', b'A', b'N', b'O', b'-', b'L', b'O', b'G'];
58
59/// UTF-8 'NANORC' signature for individual records.
60///
61/// This signature precedes each record within a segment file,
62/// allowing for record boundary detection and corruption recovery.
63/// The 6-byte size is chosen to balance overhead with reliability.
64const NANO_REC_SIGNATURE: [u8; 6] = [b'N', b'A', b'N', b'O', b'R', b'C'];
65
66/// Maximum size for record headers in bytes (64KB).
67///
68/// Headers larger than this will be rejected to prevent memory exhaustion
69/// and ensure reasonable performance. This size is sufficient for most
70/// metadata use cases while preventing abuse.
71const MAX_HEADER_SIZE: usize = 65535;
72
73/// Custom error type for WAL operations.
74///
75/// Provides detailed error information for debugging and error handling.
76#[derive(Debug)]
77pub enum WalError {
78 /// I/O operation failed
79 Io(io::Error),
80 /// Invalid configuration provided
81 InvalidConfig(String),
82 /// Entry not found at the specified location
83 EntryNotFound(String),
84 /// Data corruption detected
85 CorruptedData(String),
86 /// Header size exceeds maximum allowed
87 HeaderTooLarge { size: usize, max: usize },
88}
89
90impl fmt::Display for WalError {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self {
93 WalError::Io(e) => write!(f, "I/O error: {}", e),
94 WalError::InvalidConfig(msg) => write!(f, "Invalid configuration: {}", msg),
95 WalError::EntryNotFound(msg) => write!(f, "Entry not found: {}", msg),
96 WalError::CorruptedData(msg) => write!(f, "Data corruption: {}", msg),
97 WalError::HeaderTooLarge { size, max } => {
98 write!(f, "Header size {} exceeds maximum {}", size, max)
99 }
100 }
101 }
102}
103
104impl std::error::Error for WalError {
105 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
106 match self {
107 WalError::Io(e) => Some(e),
108 _ => None,
109 }
110 }
111}
112
113impl From<io::Error> for WalError {
114 fn from(e: io::Error) -> Self {
115 WalError::Io(e)
116 }
117}
118
119/// Custom Result type for WAL operations.
120pub type Result<T> = std::result::Result<T, WalError>;
121
122/// Reference to a specific entry location in the WAL.
123///
124/// An `EntryRef` uniquely identifies an entry's location within the WAL,
125/// allowing for efficient random access reads.
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub struct EntryRef {
128 /// Hash of the key for segment set identification
129 pub key_hash: u64,
130 /// Sequence number of the segment file
131 pub sequence_number: u64,
132 /// Byte offset within the segment file (after header)
133 pub offset: u64,
134}
135
136/// Configuration options for WAL behavior.
137///
138/// # Examples
139///
140/// ```
141/// use nano_wal::WalOptions;
142/// use std::time::Duration;
143///
144/// let options = WalOptions::default()
145/// .retention(Duration::from_secs(3600))
146/// .segments_per_retention_period(5);
147/// ```
148#[derive(Debug, Clone)]
149pub struct WalOptions {
150 /// Duration for which entries are retained before expiration
151 pub entry_retention: Duration,
152 /// Number of segments per retention period for rotation
153 pub segments_per_retention_period: u32,
154}
155
156impl Default for WalOptions {
157 fn default() -> Self {
158 Self {
159 entry_retention: Duration::from_secs(60 * 60 * 24 * 7), // 1 week
160 segments_per_retention_period: 10,
161 }
162 }
163}
164
165impl WalOptions {
166 /// Creates options with custom retention duration.
167 ///
168 /// # Examples
169 ///
170 /// ```
171 /// use nano_wal::WalOptions;
172 /// use std::time::Duration;
173 ///
174 /// let options = WalOptions::with_retention(Duration::from_secs(3600));
175 /// ```
176 pub fn with_retention(retention: Duration) -> Self {
177 Self {
178 entry_retention: retention,
179 ..Default::default()
180 }
181 }
182
183 /// Creates options with custom segment count.
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// use nano_wal::WalOptions;
189 ///
190 /// let options = WalOptions::with_segments_per_retention_period(20);
191 /// ```
192 pub fn with_segments_per_retention_period(segments: u32) -> Self {
193 Self {
194 segments_per_retention_period: segments,
195 ..Default::default()
196 }
197 }
198
199 /// Sets retention period (chainable).
200 pub fn retention(mut self, retention: Duration) -> Self {
201 self.entry_retention = retention;
202 self
203 }
204
205 /// Sets segments per retention period (chainable).
206 pub fn segments_per_retention_period(mut self, segments: u32) -> Self {
207 self.segments_per_retention_period = segments;
208 self
209 }
210
211 /// Validates the configuration.
212 ///
213 /// # Errors
214 ///
215 /// Returns `WalError::InvalidConfig` if:
216 /// - `entry_retention` is zero
217 /// - `segments_per_retention_period` is zero
218 pub fn validate(&self) -> Result<()> {
219 if self.entry_retention.as_secs() == 0 {
220 return Err(WalError::InvalidConfig(
221 "entry_retention must be greater than 0".to_string(),
222 ));
223 }
224 if self.segments_per_retention_period == 0 {
225 return Err(WalError::InvalidConfig(
226 "segments_per_retention_period must be greater than 0".to_string(),
227 ));
228 }
229 Ok(())
230 }
231}
232
233/// Information about an active segment for a specific key.
234#[derive(Debug)]
235struct ActiveSegment {
236 /// Current active file handle
237 file: File,
238 /// Sequence number of this segment
239 sequence_number: u64,
240 /// Unix timestamp when this segment expires
241 expiration_timestamp: u64,
242}
243
244/// Write-Ahead Log with per-key segment sets.
245///
246/// The `Wal` struct provides the main interface for WAL operations,
247/// managing segment files and ensuring durability guarantees.
248#[derive(Debug)]
249pub struct Wal {
250 dir: PathBuf,
251 options: WalOptions,
252 /// Map from key hash to active segment info
253 active_segments: HashMap<u64, ActiveSegment>,
254 /// Map from key hash to next sequence number
255 next_sequence: HashMap<u64, u64>,
256}
257
258impl Wal {
259 /// Creates a new WAL instance.
260 ///
261 /// # Arguments
262 ///
263 /// * `filepath` - Directory path for WAL files
264 /// * `options` - Configuration options
265 ///
266 /// # Errors
267 ///
268 /// Returns `WalError::InvalidConfig` if options are invalid.
269 /// Returns `WalError::Io` if directory creation fails.
270 ///
271 /// # Examples
272 ///
273 /// ```no_run
274 /// use nano_wal::{Wal, WalOptions};
275 ///
276 /// let wal = Wal::new("./my_wal", WalOptions::default())?;
277 /// # Ok::<(), nano_wal::WalError>(())
278 /// ```
279 pub fn new(filepath: &str, options: WalOptions) -> Result<Self> {
280 options.validate()?;
281
282 let dir = Path::new(filepath);
283 if !dir.exists() {
284 fs::create_dir_all(dir)?;
285 }
286
287 let mut wal = Wal {
288 dir: dir.to_path_buf(),
289 options,
290 active_segments: HashMap::new(),
291 next_sequence: HashMap::new(),
292 };
293
294 wal.scan_existing_files()?;
295 Ok(wal)
296 }
297
298 /// Scans existing files to determine next sequence numbers.
299 fn scan_existing_files(&mut self) -> Result<()> {
300 if let Ok(entries) = fs::read_dir(&self.dir) {
301 for entry in entries.flatten() {
302 if let Some(filename) = entry.file_name().to_str() {
303 if filename.ends_with(".log") {
304 if let Some((key_hash, sequence)) = self.parse_filename(filename) {
305 let current_max = *self.next_sequence.get(&key_hash).unwrap_or(&0);
306 self.next_sequence
307 .insert(key_hash, current_max.max(sequence + 1));
308 }
309 }
310 }
311 }
312 }
313 Ok(())
314 }
315
316 /// Parses segment filename to extract key hash and sequence.
317 fn parse_filename(&self, filename: &str) -> Option<(u64, u64)> {
318 if let Some(name_part) = filename.strip_suffix(".log") {
319 let parts: Vec<&str> = name_part.split('-').collect();
320 if parts.len() >= 3 {
321 let len = parts.len();
322 if let (Ok(sequence), Ok(key_hash)) =
323 (parts[len - 1].parse::<u64>(), parts[len - 2].parse::<u64>())
324 {
325 return Some((key_hash, sequence));
326 }
327 }
328 }
329 None
330 }
331
332 /// Generates a filename for a segment.
333 fn generate_filename<K: Display>(&self, key: &K, key_hash: u64, sequence: u64) -> String {
334 let key_str = format!("{}", key);
335 let sanitized_key = key_str
336 .chars()
337 .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
338 .take(20)
339 .collect::<String>();
340
341 format!("{}-{}-{:04}.log", sanitized_key, key_hash, sequence)
342 }
343
344 /// Gets or creates an active segment for the given key.
345 fn get_or_create_active_segment<K: Hash + AsRef<[u8]> + Display>(
346 &mut self,
347 key: &K,
348 ) -> Result<u64> {
349 let mut hasher = std::collections::hash_map::DefaultHasher::new();
350 key.as_ref().hash(&mut hasher);
351 let key_hash = hasher.finish();
352
353 let now = Utc::now().timestamp() as u64;
354
355 // Check if rotation is needed
356 if let Some(active) = self.active_segments.get(&key_hash) {
357 if now >= active.expiration_timestamp {
358 self.active_segments.remove(&key_hash);
359 }
360 }
361
362 // Create new segment if needed
363 if !self.active_segments.contains_key(&key_hash) {
364 let sequence = *self.next_sequence.get(&key_hash).unwrap_or(&1);
365 self.next_sequence.insert(key_hash, sequence + 1);
366
367 let segment_duration = self.options.entry_retention.as_secs()
368 / self.options.segments_per_retention_period as u64;
369 let expiration_timestamp = now + segment_duration;
370
371 let filename = self.generate_filename(key, key_hash, sequence);
372 let file_path = self.dir.join(&filename);
373
374 let mut file = OpenOptions::new()
375 .create(true)
376
377 .append(true)
378 .open(&file_path)?;
379
380 self.write_file_header(&mut file, key, expiration_timestamp)?;
381
382 let active_segment = ActiveSegment {
383 file,
384 sequence_number: sequence,
385 expiration_timestamp,
386 };
387
388 self.active_segments.insert(key_hash, active_segment);
389 }
390
391 Ok(key_hash)
392 }
393
394 /// Writes file header for new segment.
395 fn write_file_header<K: AsRef<[u8]>>(
396 &self,
397 file: &mut File,
398 key: &K,
399 expiration_timestamp: u64,
400 ) -> Result<()> {
401 file.write_all(&NANO_LOG_SIGNATURE)?;
402 file.write_all(&0u64.to_le_bytes())?; // Sequence placeholder
403 file.write_all(&expiration_timestamp.to_le_bytes())?;
404
405 let key_bytes = key.as_ref();
406 let key_len = key_bytes.len() as u64;
407 file.write_all(&key_len.to_le_bytes())?;
408 file.write_all(key_bytes)?;
409
410 Ok(())
411 }
412
413 /// Appends an entry to the WAL.
414 ///
415 /// # Arguments
416 ///
417 /// * `key` - Entry key for segment selection
418 /// * `header` - Optional metadata header (max 64KB)
419 /// * `content` - Entry content
420 /// * `durable` - If true, syncs to disk before returning
421 ///
422 /// # Errors
423 ///
424 /// Returns `WalError::HeaderTooLarge` if header exceeds 64KB.
425 /// Returns `WalError::Io` for I/O failures.
426 ///
427 /// # Examples
428 ///
429 /// ```no_run
430 /// # use nano_wal::{Wal, WalOptions};
431 /// # use bytes::Bytes;
432 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
433 /// let entry_ref = wal.append_entry(
434 /// "user_123",
435 /// Some(Bytes::from("metadata")),
436 /// Bytes::from("data"),
437 /// true
438 /// )?;
439 /// # Ok::<(), nano_wal::WalError>(())
440 /// ```
441 pub fn append_entry<K: Hash + AsRef<[u8]> + Display>(
442 &mut self,
443 key: K,
444 header: Option<Bytes>,
445 content: Bytes,
446 durable: bool,
447 ) -> Result<EntryRef> {
448 // Validate header size
449 if let Some(ref h) = header {
450 if h.len() > MAX_HEADER_SIZE {
451 return Err(WalError::HeaderTooLarge {
452 size: h.len(),
453 max: MAX_HEADER_SIZE,
454 });
455 }
456 }
457
458 let key_hash = self.get_or_create_active_segment(&key)?;
459 let active_segment = self.active_segments.get_mut(&key_hash).unwrap();
460
461 let current_position = active_segment.file.stream_position()?;
462 let file_header_size = 8 + 8 + 8 + 8 + key.as_ref().len() as u64;
463 let entry_offset = current_position - file_header_size;
464
465 // Write record
466 active_segment.file.write_all(&NANO_REC_SIGNATURE)?;
467
468 let header_len = header.as_ref().map(|h| h.len()).unwrap_or(0);
469 active_segment
470 .file
471 .write_all(&(header_len as u16).to_le_bytes())?;
472 if let Some(header_bytes) = &header {
473 active_segment.file.write_all(header_bytes.as_ref())?;
474 }
475
476 let content_len = content.len() as u64;
477 active_segment.file.write_all(&content_len.to_le_bytes())?;
478 active_segment.file.write_all(content.as_ref())?;
479
480 if durable {
481 active_segment.file.sync_data()?;
482 } else {
483 active_segment.file.flush()?;
484 }
485
486 Ok(EntryRef {
487 key_hash,
488 sequence_number: active_segment.sequence_number,
489 offset: entry_offset,
490 })
491 }
492
493 /// Appends multiple entries in a batch.
494 ///
495 /// Batch operations provide better throughput by reducing I/O overhead.
496 ///
497 /// # Arguments
498 ///
499 /// * `entries` - Iterator of (key, header, content) tuples
500 /// * `durable` - If true, syncs after all entries are written
501 ///
502 /// # Errors
503 ///
504 /// Returns first error encountered; partial writes may occur.
505 ///
506 /// # Examples
507 ///
508 /// ```no_run
509 /// # use nano_wal::{Wal, WalOptions};
510 /// # use bytes::Bytes;
511 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
512 /// let entries = vec![
513 /// ("key1", None, Bytes::from("data1")),
514 /// ("key2", Some(Bytes::from("meta")), Bytes::from("data2")),
515 /// ];
516 /// let refs = wal.append_batch(entries, true)?;
517 /// # Ok::<(), nano_wal::WalError>(())
518 /// ```
519 pub fn append_batch<K, I>(&mut self, entries: I, durable: bool) -> Result<Vec<EntryRef>>
520 where
521 K: Hash + AsRef<[u8]> + Display,
522 I: IntoIterator<Item = (K, Option<Bytes>, Bytes)>,
523 {
524 let mut refs = Vec::new();
525
526 for (key, header, content) in entries {
527 refs.push(self.append_entry(key, header, content, false)?);
528 }
529
530 if durable {
531 self.sync()?;
532 }
533
534 Ok(refs)
535 }
536
537 /// Logs an entry with durability guarantee.
538 ///
539 /// Convenience method equivalent to `append_entry(key, header, content, true)`.
540 ///
541 /// # Examples
542 ///
543 /// ```no_run
544 /// # use nano_wal::{Wal, WalOptions};
545 /// # use bytes::Bytes;
546 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
547 /// wal.log_entry("key", None, Bytes::from("data"))?;
548 /// # Ok::<(), nano_wal::WalError>(())
549 /// ```
550 pub fn log_entry<K: Hash + AsRef<[u8]> + Display>(
551 &mut self,
552 key: K,
553 header: Option<Bytes>,
554 content: Bytes,
555 ) -> Result<EntryRef> {
556 self.append_entry(key, header, content, true)
557 }
558
559 /// Enumerates all keys in the WAL.
560 ///
561 /// # Errors
562 ///
563 /// Returns `WalError::Io` for filesystem errors.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// # use nano_wal::{Wal, WalOptions};
569 /// # let wal = Wal::new("./wal", WalOptions::default())?;
570 /// for key in wal.enumerate_keys()? {
571 /// println!("Found key: {}", key);
572 /// }
573 /// # Ok::<(), nano_wal::WalError>(())
574 /// ```
575 pub fn enumerate_keys(&self) -> Result<impl Iterator<Item = String>> {
576 let mut keys = std::collections::HashSet::new();
577
578 if let Ok(entries) = fs::read_dir(&self.dir) {
579 for entry in entries.flatten() {
580 if let Some(filename) = entry.file_name().to_str() {
581 if filename.ends_with(".log") {
582 let segment_path = entry.path();
583 if let Ok(key) = self.read_key_from_file(&segment_path) {
584 keys.insert(key);
585 }
586 }
587 }
588 }
589 }
590
591 Ok(keys.into_iter())
592 }
593
594 /// Reads key from segment file header.
595 fn read_key_from_file(&self, file_path: &Path) -> Result<String> {
596 let mut file = File::open(file_path)?;
597
598 let mut signature_buf = [0u8; 8];
599 file.read_exact(&mut signature_buf)?;
600 if signature_buf != NANO_LOG_SIGNATURE {
601 return Err(WalError::CorruptedData(
602 "Invalid NANO-LOG signature".to_string(),
603 ));
604 }
605
606 file.seek(SeekFrom::Current(16))?; // Skip sequence and expiration
607
608 let mut key_len_bytes = [0u8; 8];
609 file.read_exact(&mut key_len_bytes)?;
610 let key_len = u64::from_le_bytes(key_len_bytes);
611
612 let mut key_bytes = vec![0u8; key_len as usize];
613 file.read_exact(&mut key_bytes)?;
614
615 Ok(String::from_utf8_lossy(&key_bytes).to_string())
616 }
617
618 /// Enumerates records for a specific key.
619 ///
620 /// # Arguments
621 ///
622 /// * `key` - Key to enumerate records for
623 ///
624 /// # Errors
625 ///
626 /// Returns `WalError::Io` for filesystem errors.
627 ///
628 /// # Examples
629 ///
630 /// ```no_run
631 /// # use nano_wal::{Wal, WalOptions};
632 /// # let wal = Wal::new("./wal", WalOptions::default())?;
633 /// for record in wal.enumerate_records("my_key")? {
634 /// println!("Record size: {}", record.len());
635 /// }
636 /// # Ok::<(), nano_wal::WalError>(())
637 /// ```
638 pub fn enumerate_records<K: Hash + AsRef<[u8]> + Display>(
639 &self,
640 key: K,
641 ) -> Result<impl Iterator<Item = Bytes>> {
642 let mut hasher = std::collections::hash_map::DefaultHasher::new();
643 key.as_ref().hash(&mut hasher);
644 let key_hash = hasher.finish();
645
646 let mut records = Vec::new();
647
648 let key_str = format!("{}", key);
649 let sanitized_key = key_str
650 .chars()
651 .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
652 .take(20)
653 .collect::<String>();
654
655 if let Ok(entries) = fs::read_dir(&self.dir) {
656 let mut segment_files = Vec::new();
657
658 for entry in entries.flatten() {
659 if let Some(filename) = entry.file_name().to_str() {
660 if filename.starts_with(&format!("{}-{}-", sanitized_key, key_hash))
661 && filename.ends_with(".log")
662 {
663 if let Some((_, sequence)) = self.parse_filename(filename) {
664 segment_files.push((sequence, entry.path()));
665 }
666 }
667 }
668 }
669
670 segment_files.sort_by_key(|(seq, _)| *seq);
671
672 for (_, file_path) in segment_files {
673 if let Ok(file_records) = self.read_records_from_segment(&file_path) {
674 records.extend(file_records);
675 }
676 }
677 }
678
679 Ok(records.into_iter())
680 }
681
682 /// Reads all records from a segment file.
683 fn read_records_from_segment(&self, file_path: &Path) -> Result<Vec<Bytes>> {
684 let mut file = File::open(file_path)?;
685 let mut records = Vec::new();
686
687 self.skip_file_header(&mut file)?;
688
689 loop {
690 let mut signature_buf = [0u8; 6];
691 match file.read_exact(&mut signature_buf) {
692 Ok(_) => {
693 if signature_buf != NANO_REC_SIGNATURE {
694 break;
695 }
696 }
697 Err(_) => break,
698 }
699
700 let mut header_len_bytes = [0u8; 2];
701 if file.read_exact(&mut header_len_bytes).is_err() {
702 break;
703 }
704 let header_len = u16::from_le_bytes(header_len_bytes);
705
706 if file.seek(SeekFrom::Current(header_len as i64)).is_err() {
707 break;
708 }
709
710 let mut content_len_bytes = [0u8; 8];
711 if file.read_exact(&mut content_len_bytes).is_err() {
712 break;
713 }
714 let content_len = u64::from_le_bytes(content_len_bytes);
715
716 let mut content = vec![0u8; content_len as usize];
717 if file.read_exact(&mut content).is_err() {
718 break;
719 }
720
721 records.push(Bytes::from(content));
722 }
723
724 Ok(records)
725 }
726
727 /// Skips file header to position at first record.
728 fn skip_file_header(&self, file: &mut File) -> Result<()> {
729 file.seek(SeekFrom::Current(24))?; // Skip signature, sequence, expiration
730
731 let mut key_len_bytes = [0u8; 8];
732 file.read_exact(&mut key_len_bytes)?;
733 let key_len = u64::from_le_bytes(key_len_bytes);
734 file.seek(SeekFrom::Current(key_len as i64))?;
735
736 Ok(())
737 }
738
739 /// Reads entry at specified location.
740 ///
741 /// # Arguments
742 ///
743 /// * `entry_ref` - Reference to the entry location
744 ///
745 /// # Errors
746 ///
747 /// Returns `WalError::EntryNotFound` if segment doesn't exist.
748 /// Returns `WalError::CorruptedData` if signature is invalid.
749 ///
750 /// # Examples
751 ///
752 /// ```no_run
753 /// # use nano_wal::{Wal, WalOptions};
754 /// # use bytes::Bytes;
755 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
756 /// # let entry_ref = wal.append_entry("key", None, Bytes::from("data"), true)?;
757 /// let data = wal.read_entry_at(entry_ref)?;
758 /// # Ok::<(), nano_wal::WalError>(())
759 /// ```
760 pub fn read_entry_at(&self, entry_ref: EntryRef) -> Result<Bytes> {
761 if let Ok(entries) = fs::read_dir(&self.dir) {
762 for entry in entries.flatten() {
763 if let Some(filename) = entry.file_name().to_str() {
764 if let Some((key_hash, sequence)) = self.parse_filename(filename) {
765 if key_hash == entry_ref.key_hash && sequence == entry_ref.sequence_number {
766 let file_path = entry.path();
767 return self.read_entry_from_file(&file_path, entry_ref.offset);
768 }
769 }
770 }
771 }
772 }
773
774 Err(WalError::EntryNotFound(format!(
775 "Segment for key_hash {} sequence {} not found",
776 entry_ref.key_hash, entry_ref.sequence_number
777 )))
778 }
779
780 /// Reads specific entry from segment file.
781 fn read_entry_from_file(&self, file_path: &Path, offset: u64) -> Result<Bytes> {
782 let mut file = File::open(file_path)?;
783
784 self.skip_file_header(&mut file)?;
785 file.seek(SeekFrom::Current(offset as i64))?;
786
787 let mut signature_buf = [0u8; 6];
788 file.read_exact(&mut signature_buf)?;
789 if signature_buf != NANO_REC_SIGNATURE {
790 return Err(WalError::CorruptedData(
791 "NANORC signature not found".to_string(),
792 ));
793 }
794
795 let mut header_len_bytes = [0u8; 2];
796 file.read_exact(&mut header_len_bytes)?;
797 let header_len = u16::from_le_bytes(header_len_bytes);
798
799 file.seek(SeekFrom::Current(header_len as i64))?;
800
801 let mut content_len_bytes = [0u8; 8];
802 file.read_exact(&mut content_len_bytes)?;
803 let content_len = u64::from_le_bytes(content_len_bytes);
804
805 let mut content = vec![0u8; content_len as usize];
806 file.read_exact(&mut content)?;
807
808 Ok(Bytes::from(content))
809 }
810
811 /// Removes expired segments from disk.
812 ///
813 /// # Errors
814 ///
815 /// Returns `WalError::Io` for filesystem errors.
816 ///
817 /// # Examples
818 ///
819 /// ```no_run
820 /// # use nano_wal::{Wal, WalOptions};
821 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
822 /// wal.compact()?;
823 /// # Ok::<(), nano_wal::WalError>(())
824 /// ```
825 pub fn compact(&mut self) -> Result<()> {
826 let now = Utc::now().timestamp() as u64;
827
828 if let Ok(entries) = fs::read_dir(&self.dir) {
829 for entry in entries.flatten() {
830 if let Some(filename) = entry.file_name().to_str() {
831 if filename.ends_with(".log") {
832 let file_path = entry.path();
833
834 if let Ok(mut file) = File::open(&file_path) {
835 let mut signature = [0u8; 8];
836 if file.read_exact(&mut signature).is_ok()
837 && signature == NANO_LOG_SIGNATURE
838 {
839 let mut sequence_bytes = [0u8; 8];
840 let mut expiration_bytes = [0u8; 8];
841
842 if file.read_exact(&mut sequence_bytes).is_ok()
843 && file.read_exact(&mut expiration_bytes).is_ok()
844 {
845 let expiration_timestamp = u64::from_le_bytes(expiration_bytes);
846
847 if now > expiration_timestamp {
848 let _ = fs::remove_file(&file_path);
849 }
850 }
851 }
852 }
853 }
854 }
855 }
856 }
857
858 Ok(())
859 }
860
861 /// Syncs all active segments to disk.
862 ///
863 /// # Errors
864 ///
865 /// Returns `WalError::Io` if sync fails.
866 ///
867 /// # Examples
868 ///
869 /// ```no_run
870 /// # use nano_wal::{Wal, WalOptions};
871 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
872 /// wal.sync()?;
873 /// # Ok::<(), nano_wal::WalError>(())
874 /// ```
875 pub fn sync(&mut self) -> Result<()> {
876 for active_segment in self.active_segments.values_mut() {
877 active_segment.file.sync_data()?;
878 }
879 Ok(())
880 }
881
882 /// Returns count of active segments.
883 ///
884 /// # Examples
885 ///
886 /// ```no_run
887 /// # use nano_wal::{Wal, WalOptions};
888 /// # let wal = Wal::new("./wal", WalOptions::default())?;
889 /// println!("Active segments: {}", wal.active_segment_count());
890 /// # Ok::<(), nano_wal::WalError>(())
891 /// ```
892 pub fn active_segment_count(&self) -> usize {
893 self.active_segments.len()
894 }
895
896 /// Shuts down WAL and removes all storage.
897 ///
898 /// # Errors
899 ///
900 /// Returns `WalError::Io` if removal fails.
901 ///
902 /// # Examples
903 ///
904 /// ```no_run
905 /// # use nano_wal::{Wal, WalOptions};
906 /// # let mut wal = Wal::new("./wal", WalOptions::default())?;
907 /// wal.shutdown()?;
908 /// # Ok::<(), nano_wal::WalError>(())
909 /// ```
910 pub fn shutdown(&mut self) -> Result<()> {
911 self.active_segments.clear();
912 fs::remove_dir_all(&self.dir)?;
913 Ok(())
914 }
915}