Skip to main content

bones_core/
shard.rs

1//! Time-sharded event file management.
2//!
3//! Events are stored in monthly shard files under `.bones/events/YYYY-MM.events`.
4//! This module manages the directory layout, shard rotation, atomic append
5//! operations, the `current.events` symlink, and replay (reading all shards
6//! in chronological order).
7//!
8//! # Directory Layout
9//!
10//! ```text
11//! .bones/
12//!   events/
13//!     2026-01.events      # sealed shard
14//!     2026-01.manifest    # manifest for sealed shard
15//!     2026-02.events      # active shard
16//!     current.events -> 2026-02.events   # symlink to active
17//!   cache/
18//!     clock               # monotonic wall-clock file (microseconds)
19//!   lock                  # repo-wide advisory lock
20//! ```
21//!
22//! # Invariants
23//!
24//! - Sealed (frozen) shards are never modified after rotation.
25//! - The active shard is the only one that receives appends.
26//! - `current.events` always points to the active shard.
27//! - Each append uses `O_APPEND` + `write_all` + `flush` for crash consistency.
28//! - Torn-write recovery truncates incomplete trailing lines on startup.
29//! - Monotonic timestamps: `wall_ts_us = max(system_time_us, last + 1)`.
30
31use std::fs::{self, OpenOptions};
32use std::io::{self, Write as IoWrite};
33use std::path::{Path, PathBuf};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use chrono::Datelike;
37
38use crate::event::writer::shard_header;
39use crate::lock::ShardLock;
40
41// ---------------------------------------------------------------------------
42// Error types
43// ---------------------------------------------------------------------------
44
45/// Errors that can occur during shard operations.
46#[derive(Debug, thiserror::Error)]
47pub enum ShardError {
48    /// I/O error during shard operations.
49    #[error("shard I/O error: {0}")]
50    Io(#[from] io::Error),
51
52    /// Lock acquisition failed.
53    #[error("lock error: {0}")]
54    Lock(#[from] crate::lock::LockError),
55
56    /// The `.bones` directory does not exist and could not be created.
57    #[error("failed to initialize .bones directory: {0}")]
58    InitFailed(io::Error),
59
60    /// Shard file name does not match expected `YYYY-MM.events` pattern.
61    #[error("invalid shard filename: {0}")]
62    InvalidShardName(String),
63}
64
65// ---------------------------------------------------------------------------
66// Manifest
67// ---------------------------------------------------------------------------
68
69/// Manifest for a sealed shard file, recording integrity metadata.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct ShardManifest {
72    /// Shard file name (e.g., `"2026-01.events"`).
73    pub shard_name: String,
74    /// Number of event lines (excluding comments and blanks).
75    pub event_count: u64,
76    /// Total byte length of the shard file.
77    pub byte_len: u64,
78    /// BLAKE3 hash of the entire shard file contents.
79    pub file_hash: String,
80}
81
82impl ShardManifest {
83    /// Serialize manifest to a human-readable format.
84    #[must_use]
85    pub fn to_string_repr(&self) -> String {
86        format!(
87            "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
88            self.shard_name, self.event_count, self.byte_len, self.file_hash
89        )
90    }
91
92    /// Parse a manifest from its string representation.
93    ///
94    /// Returns `None` if required fields are missing or unparseable.
95    #[must_use]
96    pub fn from_string_repr(s: &str) -> Option<Self> {
97        let mut shard_name = None;
98        let mut event_count = None;
99        let mut byte_len = None;
100        let mut file_hash = None;
101
102        for line in s.lines() {
103            if let Some(val) = line.strip_prefix("shard: ") {
104                shard_name = Some(val.to_string());
105            } else if let Some(val) = line.strip_prefix("event_count: ") {
106                event_count = val.parse().ok();
107            } else if let Some(val) = line.strip_prefix("byte_len: ") {
108                byte_len = val.parse().ok();
109            } else if let Some(val) = line.strip_prefix("file_hash: ") {
110                file_hash = Some(val.to_string());
111            }
112        }
113
114        Some(Self {
115            shard_name: shard_name?,
116            event_count: event_count?,
117            byte_len: byte_len?,
118            file_hash: file_hash?,
119        })
120    }
121}
122
123// ---------------------------------------------------------------------------
124// ShardManager
125// ---------------------------------------------------------------------------
126
127/// Manages time-sharded event files in a `.bones` repository.
128///
129/// The shard manager handles:
130/// - Directory initialization
131/// - Shard rotation on month boundaries
132/// - Atomic append with advisory locking
133/// - Monotonic clock maintenance
134/// - Torn-write recovery
135/// - Replay (reading all shards chronologically)
136/// - Sealed shard manifest generation
137pub struct ShardManager {
138    /// Root of the `.bones` directory.
139    bones_dir: PathBuf,
140}
141
142impl ShardManager {
143    /// Create a new `ShardManager` for the given `.bones` directory.
144    ///
145    /// Does not create directories on construction; call [`init`](Self::init)
146    /// or [`ensure_dirs`](Self::ensure_dirs) first if needed.
147    #[must_use]
148    pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
149        Self {
150            bones_dir: bones_dir.into(),
151        }
152    }
153
154    /// Path to the events directory.
155    #[must_use]
156    pub fn events_dir(&self) -> PathBuf {
157        self.bones_dir.join("events")
158    }
159
160    /// Path to the advisory lock file.
161    #[must_use]
162    pub fn lock_path(&self) -> PathBuf {
163        self.bones_dir.join("lock")
164    }
165
166    /// Path to the monotonic clock file.
167    #[must_use]
168    pub fn clock_path(&self) -> PathBuf {
169        self.bones_dir.join("cache").join("clock")
170    }
171
172    /// Path to the `current.events` symlink.
173    #[must_use]
174    pub fn current_symlink(&self) -> PathBuf {
175        self.events_dir().join("current.events")
176    }
177
178    /// Generate the shard filename for a given year and month.
179    #[must_use]
180    pub fn shard_filename(year: i32, month: u32) -> String {
181        format!("{year:04}-{month:02}.events")
182    }
183
184    /// Path to a specific shard file.
185    #[must_use]
186    pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
187        self.events_dir().join(Self::shard_filename(year, month))
188    }
189
190    /// Path to a manifest file for a given shard.
191    #[must_use]
192    pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
193        self.events_dir()
194            .join(format!("{year:04}-{month:02}.manifest"))
195    }
196
197    // -----------------------------------------------------------------------
198    // Initialization
199    // -----------------------------------------------------------------------
200
201    /// Create the `.bones/events/` and `.bones/cache/` directories if they
202    /// don't exist. Idempotent.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`ShardError::InitFailed`] if directory creation fails.
207    pub fn ensure_dirs(&self) -> Result<(), ShardError> {
208        fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
209        fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
210        Ok(())
211    }
212
213    /// Initialize the shard directory and create the first shard file
214    /// with the standard header if no shards exist.
215    ///
216    /// Returns the (year, month) of the active shard.
217    ///
218    /// # Errors
219    ///
220    /// Returns [`ShardError`] on I/O failure or if directories cannot be
221    /// created.
222    pub fn init(&self) -> Result<(i32, u32), ShardError> {
223        self.ensure_dirs()?;
224
225        let shards = self.list_shards()?;
226        if shards.is_empty() {
227            let (year, month) = current_year_month();
228            self.create_shard(year, month)?;
229            self.update_symlink(year, month)?;
230            Ok((year, month))
231        } else if let Some(&(year, month)) = shards.last() {
232            self.update_symlink(year, month)?;
233            Ok((year, month))
234        } else {
235            unreachable!("shards is non-empty")
236        }
237    }
238
239    // -----------------------------------------------------------------------
240    // Shard listing
241    // -----------------------------------------------------------------------
242
243    /// List all shard files in chronological order as (year, month) pairs.
244    ///
245    /// Shard filenames must match `YYYY-MM.events`. Invalid filenames are
246    /// silently skipped.
247    ///
248    /// # Errors
249    ///
250    /// Returns [`ShardError::Io`] if the directory cannot be read.
251    pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
252        let events_dir = self.events_dir();
253        if !events_dir.exists() {
254            return Ok(Vec::new());
255        }
256
257        let mut shards = Vec::new();
258        for entry in fs::read_dir(&events_dir)? {
259            let entry = entry?;
260            let name = entry.file_name();
261            let name_str = name.to_string_lossy();
262            if let Some(ym) = parse_shard_filename(&name_str) {
263                shards.push(ym);
264            }
265        }
266        shards.sort_unstable();
267        Ok(shards)
268    }
269
270    /// Get the active (most recent) shard, if any.
271    ///
272    /// # Errors
273    ///
274    /// Returns [`ShardError::Io`] if the directory cannot be read.
275    pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
276        let shards = self.list_shards()?;
277        Ok(shards.last().copied())
278    }
279
280    // -----------------------------------------------------------------------
281    // Shard creation and rotation
282    // -----------------------------------------------------------------------
283
284    /// Create a new shard file with the standard header.
285    ///
286    /// Returns the path of the created file. Does nothing if the file
287    /// already exists.
288    ///
289    /// # Errors
290    ///
291    /// Returns [`ShardError::Io`] if the file cannot be written.
292    pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
293        let path = self.shard_path(year, month);
294        if path.exists() {
295            return Ok(path);
296        }
297
298        let header = shard_header();
299        fs::write(&path, header)?;
300        Ok(path)
301    }
302
303    /// Update the `current.events` symlink to point to the given shard.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`ShardError::Io`] if the symlink cannot be created.
308    pub fn update_symlink(&self, year: i32, month: u32) -> Result<(), ShardError> {
309        let symlink = self.current_symlink();
310        let target = Self::shard_filename(year, month);
311
312        // Remove existing symlink (or file) if present
313        if symlink.exists() || symlink.symlink_metadata().is_ok() {
314            fs::remove_file(&symlink)?;
315        }
316
317        #[cfg(unix)]
318        std::os::unix::fs::symlink(&target, &symlink)?;
319
320        #[cfg(not(unix))]
321        fs::write(&symlink, &target)?;
322
323        Ok(())
324    }
325
326    /// Check if the current month differs from the active shard's month.
327    /// If so, seal the old shard (generate manifest) and create a new one.
328    ///
329    /// Returns the (year, month) of the now-active shard.
330    ///
331    /// # Errors
332    ///
333    /// Returns [`ShardError`] on I/O failure during rotation.
334    pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
335        let (current_year, current_month) = current_year_month();
336        let active = self.active_shard()?;
337
338        match active {
339            Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
340            Some((y, m)) => {
341                // Seal the old shard with a manifest
342                self.write_manifest(y, m)?;
343                // Create new shard
344                self.create_shard(current_year, current_month)?;
345                self.update_symlink(current_year, current_month)?;
346                Ok((current_year, current_month))
347            }
348            None => {
349                // No shards exist yet, create first one
350                self.create_shard(current_year, current_month)?;
351                self.update_symlink(current_year, current_month)?;
352                Ok((current_year, current_month))
353            }
354        }
355    }
356
357    // -----------------------------------------------------------------------
358    // Manifest generation
359    // -----------------------------------------------------------------------
360
361    /// Generate and write a manifest file for a sealed shard.
362    ///
363    /// # Errors
364    ///
365    /// Returns [`ShardError::Io`] if the shard file cannot be read or the
366    /// manifest cannot be written.
367    pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
368        let shard_path = self.shard_path(year, month);
369        let content = fs::read(&shard_path)?;
370        let content_str = String::from_utf8_lossy(&content);
371
372        // Count event lines (non-comment, non-blank)
373        let event_count = content_str
374            .lines()
375            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
376            .count() as u64;
377
378        let byte_len = content.len() as u64;
379        let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
380
381        let manifest = ShardManifest {
382            shard_name: Self::shard_filename(year, month),
383            event_count,
384            byte_len,
385            file_hash,
386        };
387
388        let manifest_path = self.manifest_path(year, month);
389        fs::write(&manifest_path, manifest.to_string_repr())?;
390
391        Ok(manifest)
392    }
393
394    /// Read a manifest file if it exists.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`ShardError::Io`] if the manifest file exists but cannot
399    /// be read.
400    pub fn read_manifest(
401        &self,
402        year: i32,
403        month: u32,
404    ) -> Result<Option<ShardManifest>, ShardError> {
405        let manifest_path = self.manifest_path(year, month);
406        if !manifest_path.exists() {
407            return Ok(None);
408        }
409        let content = fs::read_to_string(&manifest_path)?;
410        Ok(ShardManifest::from_string_repr(&content))
411    }
412
413    // -----------------------------------------------------------------------
414    // Append
415    // -----------------------------------------------------------------------
416
417    /// Append an event line to the active shard.
418    ///
419    /// This method:
420    /// 1. Acquires the repo-wide advisory lock.
421    /// 2. Rotates shards if the month has changed.
422    /// 3. Reads and updates the monotonic clock.
423    /// 4. Appends the line using `O_APPEND` + `write_all` + `flush`.
424    /// 5. Optionally calls `sync_data` if `durable` is true.
425    /// 6. Releases the lock.
426    ///
427    /// The `line` must be a complete TSJSON line ending with `\n`.
428    ///
429    /// Returns the monotonic timestamp used.
430    ///
431    /// # Errors
432    ///
433    /// Returns [`ShardError::Lock`] if the lock cannot be acquired within
434    /// `lock_timeout`, or [`ShardError::Io`] on write failure.
435    pub fn append(
436        &self,
437        line: &str,
438        durable: bool,
439        lock_timeout: Duration,
440    ) -> Result<i64, ShardError> {
441        self.ensure_dirs()?;
442
443        let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
444
445        // Rotate if month changed
446        let (year, month) = self.rotate_if_needed()?;
447        let shard_path = self.shard_path(year, month);
448
449        // Update monotonic clock
450        let ts = self.next_timestamp()?;
451
452        // Append with O_APPEND
453        let mut file = OpenOptions::new()
454            .create(true)
455            .append(true)
456            .open(&shard_path)?;
457
458        file.write_all(line.as_bytes())?;
459        file.flush()?;
460
461        if durable {
462            file.sync_data()?;
463        }
464
465        Ok(ts)
466    }
467
468    /// Append a raw line without locking or clock update.
469    ///
470    /// Used internally and in tests. The caller is responsible for
471    /// holding the lock and managing the clock.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`ShardError::Io`] on write failure.
476    pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
477        let shard_path = self.shard_path(year, month);
478
479        let mut file = OpenOptions::new()
480            .create(true)
481            .append(true)
482            .open(&shard_path)?;
483
484        file.write_all(line.as_bytes())?;
485        file.flush()?;
486        Ok(())
487    }
488
489    // -----------------------------------------------------------------------
490    // Monotonic clock
491    // -----------------------------------------------------------------------
492
493    /// Read the current monotonic clock value.
494    ///
495    /// Returns 0 if the clock file doesn't exist.
496    ///
497    /// # Errors
498    ///
499    /// Returns [`ShardError::Io`] if the clock file exists but cannot be
500    /// read.
501    pub fn read_clock(&self) -> Result<i64, ShardError> {
502        let path = self.clock_path();
503        if !path.exists() {
504            return Ok(0);
505        }
506        let content = fs::read_to_string(&path)?;
507        Ok(content.trim().parse::<i64>().unwrap_or(0))
508    }
509
510    /// Compute the next monotonic timestamp and write it to the clock file.
511    ///
512    /// `next = max(system_time_us, last + 1)`
513    ///
514    /// The caller must hold the repo lock.
515    ///
516    /// # Errors
517    ///
518    /// Returns [`ShardError::Io`] if the clock file cannot be read or
519    /// written.
520    pub fn next_timestamp(&self) -> Result<i64, ShardError> {
521        let last = self.read_clock()?;
522        let now = system_time_us();
523        let next = std::cmp::max(now, last + 1);
524        self.write_clock(next)?;
525        Ok(next)
526    }
527
528    /// Write a clock value to the clock file.
529    fn write_clock(&self, value: i64) -> Result<(), ShardError> {
530        let path = self.clock_path();
531        if let Some(parent) = path.parent() {
532            fs::create_dir_all(parent)?;
533        }
534        fs::write(&path, value.to_string())?;
535        Ok(())
536    }
537
538    // -----------------------------------------------------------------------
539    // Torn-write recovery
540    // -----------------------------------------------------------------------
541
542    /// Scan the active shard for torn writes and truncate incomplete
543    /// trailing lines.
544    ///
545    /// A torn write leaves a partial line (no terminating `\n`) at the end
546    /// of the file. This method finds the last complete newline and
547    /// truncates everything after it.
548    ///
549    /// Returns `Ok(Some(bytes_truncated))` if a torn write was repaired,
550    /// or `Ok(None)` if the file was clean.
551    ///
552    /// # Errors
553    ///
554    /// Returns [`ShardError::Io`] if the shard file cannot be read or
555    /// truncated.
556    pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
557        let Some(active) = self.active_shard()? else {
558            return Ok(None);
559        };
560
561        let shard_path = self.shard_path(active.0, active.1);
562        recover_shard_torn_write(&shard_path)
563    }
564
565    // -----------------------------------------------------------------------
566    // Replay
567    // -----------------------------------------------------------------------
568
569    /// Read all event lines from all shards in chronological order.
570    ///
571    /// Shards are read in lexicographic order (`YYYY-MM` sorts correctly).
572    /// Returns the concatenated content of all shard files.
573    ///
574    /// # Errors
575    ///
576    /// Returns [`ShardError::Io`] if any shard file cannot be read.
577    pub fn replay(&self) -> Result<String, ShardError> {
578        let shards = self.list_shards()?;
579        let mut content = String::new();
580
581        for (year, month) in shards {
582            let path = self.shard_path(year, month);
583            let shard_content = fs::read_to_string(&path)?;
584            content.push_str(&shard_content);
585        }
586
587        Ok(content)
588    }
589
590    /// Read event lines from a specific shard.
591    ///
592    /// # Errors
593    ///
594    /// Returns [`ShardError::Io`] if the shard file cannot be read.
595    pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
596        let path = self.shard_path(year, month);
597        Ok(fs::read_to_string(&path)?)
598    }
599
600    /// Compute the total concatenated byte size of all shards without reading
601    /// their full contents.
602    ///
603    /// This is used for advancing the projection cursor without paying the
604    /// cost of a full replay.
605    ///
606    /// # Errors
607    ///
608    /// Returns [`ShardError::Io`] if any shard file metadata cannot be read.
609    pub fn total_content_len(&self) -> Result<usize, ShardError> {
610        let shards = self.list_shards()?;
611        let mut total = 0usize;
612        for (year, month) in shards {
613            let path = self.shard_path(year, month);
614            let meta = fs::metadata(&path)?;
615            total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
616        }
617        Ok(total)
618    }
619
620    /// Read shard content starting from the given absolute byte offset in the
621    /// concatenated shard sequence.
622    ///
623    /// Sealed shards that end entirely before `offset` are skipped without
624    /// reading their contents — only their file sizes are stat(2)'d.
625    /// Only content from `offset` onward is returned, bounding memory use to
626    /// new/unseen events rather than the full log.
627    ///
628    /// Returns `(new_content, total_len)` where:
629    /// - `new_content` is the bytes from `offset` to the end of all shards.
630    /// - `total_len` is the total byte size of all shards concatenated (usable
631    ///   as the new cursor offset after processing `new_content`).
632    ///
633    /// # Errors
634    ///
635    /// Returns [`ShardError::Io`] if shard metadata or file reads fail.
636    pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
637        let shards = self.list_shards()?;
638        let mut cumulative: usize = 0;
639        let mut result = String::new();
640        let mut found_start = false;
641
642        for (year, month) in shards {
643            let path = self.shard_path(year, month);
644            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
645
646            let shard_end = cumulative.saturating_add(shard_len);
647
648            if shard_end <= offset {
649                // This shard ends at or before the cursor — skip entirely.
650                cumulative = shard_end;
651                continue;
652            }
653
654            // This shard overlaps with or is entirely after the cursor.
655            let shard_content = fs::read_to_string(&path)?;
656
657            if found_start {
658                result.push_str(&shard_content);
659            } else {
660                // Calculate the within-shard byte offset.
661                let within = offset.saturating_sub(cumulative);
662                // Guard: within must not exceed shard content length.
663                let within = within.min(shard_content.len());
664                result.push_str(&shard_content[within..]);
665                found_start = true;
666            }
667
668            cumulative = shard_end;
669        }
670
671        Ok((result, cumulative))
672    }
673
674    /// Read bytes from the concatenated shard sequence in `[start_offset, end_offset)`.
675    ///
676    /// Only shards that overlap with the requested range are read.
677    /// Shards entirely outside the range are stat(2)'d but not read.
678    ///
679    /// This is used to read a small window around the projection cursor for
680    /// hash validation without loading the full shard content.
681    ///
682    /// # Errors
683    ///
684    /// Returns [`ShardError::Io`] if any shard file cannot be read.
685    pub fn read_content_range(
686        &self,
687        start_offset: usize,
688        end_offset: usize,
689    ) -> Result<String, ShardError> {
690        if start_offset >= end_offset {
691            return Ok(String::new());
692        }
693
694        let shards = self.list_shards()?;
695        let mut cumulative: usize = 0;
696        let mut result = String::new();
697
698        for (year, month) in shards {
699            let path = self.shard_path(year, month);
700            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
701            let shard_end = cumulative.saturating_add(shard_len);
702
703            if shard_end <= start_offset {
704                // Shard ends before our range — skip without reading.
705                cumulative = shard_end;
706                continue;
707            }
708
709            if cumulative >= end_offset {
710                // Shard starts after our range — done.
711                break;
712            }
713
714            let shard_content = fs::read_to_string(&path)?;
715
716            // Clip to the slice of this shard that overlaps with [start, end).
717            let within_start = if cumulative < start_offset {
718                (start_offset - cumulative).min(shard_content.len())
719            } else {
720                0
721            };
722            let within_end = if shard_end > end_offset {
723                (end_offset - cumulative).min(shard_content.len())
724            } else {
725                shard_content.len()
726            };
727
728            result.push_str(&shard_content[within_start..within_end]);
729            cumulative = shard_end;
730        }
731
732        Ok(result)
733    }
734
735    /// Count event lines across all shards (excluding comments and blanks).
736    ///
737    /// # Errors
738    ///
739    /// Returns [`ShardError::Io`] if any shard file cannot be read.
740    pub fn event_count(&self) -> Result<u64, ShardError> {
741        let content = self.replay()?;
742        let count = content
743            .lines()
744            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
745            .count();
746        Ok(count as u64)
747    }
748
749    /// Iterate over all event lines across all shards.
750    ///
751    /// Yields `(absolute_offset, line_content)` pairs.
752    ///
753    /// # Errors
754    ///
755    /// Returns [`ShardError::Io`] if directory or shard reading fails.
756    pub fn replay_lines(
757        &self,
758    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
759        self.replay_lines_from_offset(0)
760    }
761
762    /// Iterate over event lines starting from a given absolute byte offset.
763    ///
764    /// Yields `(absolute_offset, line_content)` pairs.
765    ///
766    /// # Errors
767    ///
768    /// Returns [`ShardError::Io`] if directory or shard reading fails.
769    pub fn replay_lines_from_offset(
770        &self,
771        offset: usize,
772    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
773        let shards = self.list_shards()?;
774        let bones_dir = self.bones_dir.clone();
775
776        Ok(ShardLineIterator {
777            shards,
778            current_shard_idx: 0,
779            current_reader: None,
780            cumulative_offset: 0,
781            bones_dir,
782        }
783        .skip_to_offset(offset))
784    }
785
786    /// Check if the repository has any event shards.
787    ///
788    /// # Errors
789    ///
790    /// Returns [`ShardError::Io`] if the events directory cannot be read.
791    pub fn is_empty(&self) -> Result<bool, ShardError> {
792        let shards = self.list_shards()?;
793        Ok(shards.is_empty())
794    }
795}
796
797// ---------------------------------------------------------------------------
798// ShardLineIterator
799// ---------------------------------------------------------------------------
800
801struct ShardLineIterator {
802    shards: Vec<(i32, u32)>,
803    current_shard_idx: usize,
804    current_reader: Option<io::BufReader<fs::File>>,
805    cumulative_offset: usize,
806    bones_dir: PathBuf,
807}
808
809impl ShardLineIterator {
810    fn skip_to_offset(mut self, offset: usize) -> Self {
811        // Fast-forward shards using metadata
812        while self.current_shard_idx < self.shards.len() {
813            let (year, month) = self.shards[self.current_shard_idx];
814            let shard_path = self
815                .bones_dir
816                .join("events")
817                .join(ShardManager::shard_filename(year, month));
818            if let Ok(meta) = fs::metadata(shard_path) {
819                let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
820                if self.cumulative_offset + shard_len <= offset {
821                    self.cumulative_offset += shard_len;
822                    self.current_shard_idx += 1;
823                    continue;
824                }
825            }
826            break;
827        }
828
829        // Now we are at the shard that contains the offset.
830        // The first call to next() will open it and we'll need to skip
831        // the 'within-shard' offset.
832        // We can handle this by storing the skip amount.
833        self.cumulative_offset = offset;
834        self
835    }
836}
837
838impl Iterator for ShardLineIterator {
839    type Item = io::Result<(usize, String)>;
840
841    fn next(&mut self) -> Option<Self::Item> {
842        use std::io::{BufRead, Seek, SeekFrom};
843
844        loop {
845            if self.current_reader.is_none() {
846                if self.current_shard_idx >= self.shards.len() {
847                    return None;
848                }
849
850                let (year, month) = self.shards[self.current_shard_idx];
851                let shard_path = self
852                    .bones_dir
853                    .join("events")
854                    .join(ShardManager::shard_filename(year, month));
855                let mut file = match fs::File::open(shard_path) {
856                    Ok(f) => f,
857                    Err(e) => return Some(Err(e)),
858                };
859
860                // Calculate cumulative offset before this shard
861                let mut cumulative_before = 0;
862                for i in 0..self.current_shard_idx {
863                    let (y, m) = self.shards[i];
864                    let p = self
865                        .bones_dir
866                        .join("events")
867                        .join(ShardManager::shard_filename(y, m));
868                    if let Ok(meta) = fs::metadata(p) {
869                        cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
870                    }
871                }
872
873                if self.cumulative_offset > cumulative_before {
874                    let within = self.cumulative_offset - cumulative_before;
875                    if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
876                        return Some(Err(e));
877                    }
878                }
879
880                self.current_reader = Some(io::BufReader::new(file));
881            }
882
883            let reader = self
884                .current_reader
885                .as_mut()
886                .expect("reader was just set above");
887            let mut line = String::new();
888            let offset = self.cumulative_offset;
889
890            match reader.read_line(&mut line) {
891                Ok(0) => {
892                    // EOF for this shard
893                    self.current_reader = None;
894                    self.current_shard_idx += 1;
895                }
896                Ok(n) => {
897                    self.cumulative_offset += n;
898                    return Some(Ok((offset, line)));
899                }
900                Err(e) => return Some(Err(e)),
901            }
902        }
903    }
904}
905
906// ---------------------------------------------------------------------------
907// Helpers
908// ---------------------------------------------------------------------------
909
910/// Get the current year and month from system time.
911#[must_use]
912fn current_year_month() -> (i32, u32) {
913    let now = chrono::Utc::now();
914    (now.year(), now.month())
915}
916
917/// Get the current system time in microseconds since Unix epoch.
918#[allow(clippy::cast_possible_truncation)]
919#[must_use]
920fn system_time_us() -> i64 {
921    SystemTime::now()
922        .duration_since(UNIX_EPOCH)
923        .map(|d| d.as_micros() as i64)
924        .unwrap_or(0)
925}
926
927/// Parse a shard filename like `"2026-02.events"` into (year, month).
928fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
929    let stem = name.strip_suffix(".events")?;
930    // Must not be "current"
931    if stem == "current" {
932        return None;
933    }
934    let (year_str, month_str) = stem.split_once('-')?;
935    let year: i32 = year_str.parse().ok()?;
936    let month: u32 = month_str.parse().ok()?;
937    if !(1..=12).contains(&month) {
938        return None;
939    }
940    Some((year, month))
941}
942
943/// Recover torn writes for a specific shard file.
944///
945/// Returns `Ok(Some(bytes_truncated))` if repair was needed,
946/// or `Ok(None)` if the file was clean.
947fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
948    let metadata = fs::metadata(path)?;
949    let file_len = metadata.len();
950    if file_len == 0 {
951        return Ok(None);
952    }
953
954    let content = fs::read(path)?;
955
956    // Find the last newline
957    let last_newline = content.iter().rposition(|&b| b == b'\n');
958
959    if let Some(pos) = last_newline {
960        let expected_len = (pos + 1) as u64;
961        if expected_len < file_len {
962            // There are bytes after the last newline — torn write
963            let truncated = file_len - expected_len;
964            let file = OpenOptions::new().write(true).open(path)?;
965            file.set_len(expected_len)?;
966            Ok(Some(truncated))
967        } else {
968            // File ends with newline — clean
969            Ok(None)
970        }
971    } else {
972        // No newline at all — entire content is a torn write
973        // (or a corrupt file). Truncate to zero.
974        let file = OpenOptions::new().write(true).open(path)?;
975        file.set_len(0)?;
976        Ok(Some(file_len))
977    }
978}
979
980// ---------------------------------------------------------------------------
981// Tests
982// ---------------------------------------------------------------------------
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987    use tempfile::TempDir;
988
989    fn setup() -> (TempDir, ShardManager) {
990        let tmp = TempDir::new().expect("tempdir");
991        let bones_dir = tmp.path().join(".bones");
992        let mgr = ShardManager::new(&bones_dir);
993        (tmp, mgr)
994    }
995
996    // -----------------------------------------------------------------------
997    // parse_shard_filename
998    // -----------------------------------------------------------------------
999
1000    #[test]
1001    fn parse_valid_shard_filenames() {
1002        assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1003        assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1004        assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1005    }
1006
1007    #[test]
1008    fn parse_invalid_shard_filenames() {
1009        assert_eq!(parse_shard_filename("current.events"), None);
1010        assert_eq!(parse_shard_filename("2026-13.events"), None); // month > 12
1011        assert_eq!(parse_shard_filename("2026-00.events"), None); // month 0
1012        assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1013        assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1014        assert_eq!(parse_shard_filename(""), None);
1015    }
1016
1017    // -----------------------------------------------------------------------
1018    // ShardManager::new, paths
1019    // -----------------------------------------------------------------------
1020
1021    #[test]
1022    fn shard_manager_paths() {
1023        let mgr = ShardManager::new("/repo/.bones");
1024        assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1025        assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1026        assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1027        assert_eq!(
1028            mgr.current_symlink(),
1029            PathBuf::from("/repo/.bones/events/current.events")
1030        );
1031        assert_eq!(
1032            mgr.shard_path(2026, 2),
1033            PathBuf::from("/repo/.bones/events/2026-02.events")
1034        );
1035        assert_eq!(
1036            mgr.manifest_path(2026, 1),
1037            PathBuf::from("/repo/.bones/events/2026-01.manifest")
1038        );
1039    }
1040
1041    #[test]
1042    fn shard_filename_format() {
1043        assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1044        assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1045        assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1046    }
1047
1048    // -----------------------------------------------------------------------
1049    // ensure_dirs / init
1050    // -----------------------------------------------------------------------
1051
1052    #[test]
1053    fn ensure_dirs_creates_directories() {
1054        let (_tmp, mgr) = setup();
1055        mgr.ensure_dirs().expect("should create dirs");
1056        assert!(mgr.events_dir().exists());
1057        assert!(mgr.bones_dir.join("cache").exists());
1058    }
1059
1060    #[test]
1061    fn ensure_dirs_is_idempotent() {
1062        let (_tmp, mgr) = setup();
1063        mgr.ensure_dirs().expect("first");
1064        mgr.ensure_dirs().expect("second");
1065        assert!(mgr.events_dir().exists());
1066    }
1067
1068    #[test]
1069    fn init_creates_first_shard() {
1070        let (_tmp, mgr) = setup();
1071        let (year, month) = mgr.init().expect("init");
1072
1073        let (expected_year, expected_month) = current_year_month();
1074        assert_eq!(year, expected_year);
1075        assert_eq!(month, expected_month);
1076
1077        // Shard file exists with header
1078        let shard_path = mgr.shard_path(year, month);
1079        assert!(shard_path.exists());
1080        let content = fs::read_to_string(&shard_path).expect("read");
1081        assert!(content.starts_with("# bones event log v1"));
1082
1083        // Symlink exists
1084        let symlink = mgr.current_symlink();
1085        assert!(symlink.exists() || symlink.symlink_metadata().is_ok());
1086    }
1087
1088    #[test]
1089    fn init_is_idempotent() {
1090        let (_tmp, mgr) = setup();
1091        let first = mgr.init().expect("first");
1092        let second = mgr.init().expect("second");
1093        assert_eq!(first, second);
1094    }
1095
1096    // -----------------------------------------------------------------------
1097    // Shard listing
1098    // -----------------------------------------------------------------------
1099
1100    #[test]
1101    fn list_shards_empty() {
1102        let (_tmp, mgr) = setup();
1103        mgr.ensure_dirs().expect("dirs");
1104        let shards = mgr.list_shards().expect("list");
1105        assert!(shards.is_empty());
1106    }
1107
1108    #[test]
1109    fn list_shards_returns_sorted() {
1110        let (_tmp, mgr) = setup();
1111        mgr.ensure_dirs().expect("dirs");
1112
1113        // Create shards in reverse order
1114        mgr.create_shard(2026, 3).expect("create");
1115        mgr.create_shard(2026, 1).expect("create");
1116        mgr.create_shard(2026, 2).expect("create");
1117
1118        let shards = mgr.list_shards().expect("list");
1119        assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1120    }
1121
1122    #[test]
1123    fn list_shards_skips_non_shard_files() {
1124        let (_tmp, mgr) = setup();
1125        mgr.ensure_dirs().expect("dirs");
1126        mgr.create_shard(2026, 1).expect("create");
1127
1128        // Create non-shard files
1129        fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1130        fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1131
1132        let shards = mgr.list_shards().expect("list");
1133        assert_eq!(shards, vec![(2026, 1)]);
1134    }
1135
1136    #[test]
1137    fn list_shards_no_events_dir() {
1138        let (_tmp, mgr) = setup();
1139        // Don't create any dirs
1140        let shards = mgr.list_shards().expect("list");
1141        assert!(shards.is_empty());
1142    }
1143
1144    // -----------------------------------------------------------------------
1145    // Shard creation
1146    // -----------------------------------------------------------------------
1147
1148    #[test]
1149    fn create_shard_writes_header() {
1150        let (_tmp, mgr) = setup();
1151        mgr.ensure_dirs().expect("dirs");
1152        let path = mgr.create_shard(2026, 2).expect("create");
1153
1154        let content = fs::read_to_string(&path).expect("read");
1155        assert!(content.starts_with("# bones event log v1"));
1156        assert!(content.contains("# fields:"));
1157        assert_eq!(content.lines().count(), 2);
1158    }
1159
1160    #[test]
1161    fn create_shard_idempotent() {
1162        let (_tmp, mgr) = setup();
1163        mgr.ensure_dirs().expect("dirs");
1164        let p1 = mgr.create_shard(2026, 2).expect("first");
1165        // Write something extra
1166        fs::write(&p1, "modified").expect("write");
1167        // Second create should NOT overwrite
1168        let p2 = mgr.create_shard(2026, 2).expect("second");
1169        assert_eq!(p1, p2);
1170        let content = fs::read_to_string(&p2).expect("read");
1171        assert_eq!(content, "modified");
1172    }
1173
1174    // -----------------------------------------------------------------------
1175    // Symlink
1176    // -----------------------------------------------------------------------
1177
1178    #[test]
1179    fn update_symlink_creates_link() {
1180        let (_tmp, mgr) = setup();
1181        mgr.ensure_dirs().expect("dirs");
1182        mgr.create_shard(2026, 2).expect("create");
1183        mgr.update_symlink(2026, 2).expect("symlink");
1184
1185        let symlink = mgr.current_symlink();
1186        assert!(symlink.symlink_metadata().is_ok());
1187
1188        #[cfg(unix)]
1189        {
1190            let target = fs::read_link(&symlink).expect("readlink");
1191            assert_eq!(target, PathBuf::from("2026-02.events"));
1192        }
1193    }
1194
1195    #[test]
1196    fn update_symlink_replaces_existing() {
1197        let (_tmp, mgr) = setup();
1198        mgr.ensure_dirs().expect("dirs");
1199        mgr.create_shard(2026, 1).expect("create");
1200        mgr.create_shard(2026, 2).expect("create");
1201
1202        mgr.update_symlink(2026, 1).expect("first");
1203        mgr.update_symlink(2026, 2).expect("second");
1204
1205        #[cfg(unix)]
1206        {
1207            let target = fs::read_link(&mgr.current_symlink()).expect("readlink");
1208            assert_eq!(target, PathBuf::from("2026-02.events"));
1209        }
1210    }
1211
1212    // -----------------------------------------------------------------------
1213    // Monotonic clock
1214    // -----------------------------------------------------------------------
1215
1216    #[test]
1217    fn clock_starts_at_zero() {
1218        let (_tmp, mgr) = setup();
1219        mgr.ensure_dirs().expect("dirs");
1220        let ts = mgr.read_clock().expect("read");
1221        assert_eq!(ts, 0);
1222    }
1223
1224    #[test]
1225    fn clock_is_monotonic() {
1226        let (_tmp, mgr) = setup();
1227        mgr.ensure_dirs().expect("dirs");
1228        let t1 = mgr.next_timestamp().expect("t1");
1229        let t2 = mgr.next_timestamp().expect("t2");
1230        let t3 = mgr.next_timestamp().expect("t3");
1231        assert!(t2 > t1);
1232        assert!(t3 > t2);
1233    }
1234
1235    #[test]
1236    fn clock_reads_back_written_value() {
1237        let (_tmp, mgr) = setup();
1238        mgr.ensure_dirs().expect("dirs");
1239        mgr.write_clock(42_000_000).expect("write");
1240        let ts = mgr.read_clock().expect("read");
1241        assert_eq!(ts, 42_000_000);
1242    }
1243
1244    #[test]
1245    fn clock_never_goes_backward() {
1246        let (_tmp, mgr) = setup();
1247        mgr.ensure_dirs().expect("dirs");
1248
1249        // Set clock far in the future
1250        let future = system_time_us() + 10_000_000;
1251        mgr.write_clock(future).expect("write");
1252
1253        let next = mgr.next_timestamp().expect("next");
1254        assert!(next > future, "clock should advance past future value");
1255    }
1256
1257    // -----------------------------------------------------------------------
1258    // Append
1259    // -----------------------------------------------------------------------
1260
1261    #[test]
1262    fn append_raw_adds_line() {
1263        let (_tmp, mgr) = setup();
1264        mgr.ensure_dirs().expect("dirs");
1265        mgr.create_shard(2026, 2).expect("create");
1266
1267        mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1268        mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1269
1270        let content = mgr.read_shard(2026, 2).expect("read");
1271        assert!(content.contains("event line 1"));
1272        assert!(content.contains("event line 2"));
1273    }
1274
1275    #[test]
1276    fn append_with_lock() {
1277        let (_tmp, mgr) = setup();
1278        mgr.init().expect("init");
1279
1280        let _ts = mgr
1281            .append("test event line\n", false, Duration::from_secs(1))
1282            .expect("append");
1283
1284        let content = mgr.replay().expect("replay");
1285        assert!(content.contains("test event line"));
1286    }
1287
1288    #[test]
1289    fn append_returns_monotonic_timestamps() {
1290        let (_tmp, mgr) = setup();
1291        mgr.init().expect("init");
1292
1293        let t1 = mgr
1294            .append("line1\n", false, Duration::from_secs(1))
1295            .expect("t1");
1296        let t2 = mgr
1297            .append("line2\n", false, Duration::from_secs(1))
1298            .expect("t2");
1299
1300        assert!(t2 > t1);
1301    }
1302
1303    // -----------------------------------------------------------------------
1304    // Torn-write recovery
1305    // -----------------------------------------------------------------------
1306
1307    #[test]
1308    fn recover_clean_file() {
1309        let (_tmp, mgr) = setup();
1310        mgr.init().expect("init");
1311
1312        let (y, m) = current_year_month();
1313        mgr.append_raw(y, m, "complete line\n").expect("append");
1314
1315        let recovered = mgr.recover_torn_writes().expect("recover");
1316        assert_eq!(recovered, None);
1317    }
1318
1319    #[test]
1320    fn recover_torn_write_truncates() {
1321        let (_tmp, mgr) = setup();
1322        let (y, m) = mgr.init().expect("init");
1323        let shard_path = mgr.shard_path(y, m);
1324
1325        // Write a complete line followed by a partial line
1326        {
1327            let mut f = OpenOptions::new()
1328                .append(true)
1329                .open(&shard_path)
1330                .expect("open");
1331            f.write_all(b"complete line\npartial line without newline")
1332                .expect("write");
1333            f.flush().expect("flush");
1334        }
1335
1336        let recovered = mgr.recover_torn_writes().expect("recover");
1337        assert!(recovered.is_some());
1338
1339        let truncated = recovered.expect("checked is_some");
1340        assert_eq!(truncated, "partial line without newline".len() as u64);
1341
1342        // Verify file now ends with newline
1343        let content = fs::read_to_string(&shard_path).expect("read");
1344        assert!(content.ends_with('\n'));
1345        assert!(content.contains("complete line"));
1346        assert!(!content.contains("partial line without newline"));
1347    }
1348
1349    #[test]
1350    fn recover_no_newline_at_all() {
1351        let (_tmp, mgr) = setup();
1352        let (y, m) = mgr.init().expect("init");
1353        let shard_path = mgr.shard_path(y, m);
1354
1355        // Overwrite entire file with no newlines
1356        fs::write(&shard_path, "no newlines here").expect("write");
1357
1358        let recovered = mgr.recover_torn_writes().expect("recover");
1359        assert_eq!(recovered, Some("no newlines here".len() as u64));
1360
1361        // File should be empty
1362        let content = fs::read_to_string(&shard_path).expect("read");
1363        assert!(content.is_empty());
1364    }
1365
1366    #[test]
1367    fn recover_empty_file() {
1368        let (_tmp, mgr) = setup();
1369        let (y, m) = mgr.init().expect("init");
1370        let shard_path = mgr.shard_path(y, m);
1371
1372        // Empty the file
1373        fs::write(&shard_path, "").expect("write");
1374
1375        let recovered = mgr.recover_torn_writes().expect("recover");
1376        assert_eq!(recovered, None);
1377    }
1378
1379    #[test]
1380    fn recover_no_active_shard() {
1381        let (_tmp, mgr) = setup();
1382        mgr.ensure_dirs().expect("dirs");
1383
1384        let recovered = mgr.recover_torn_writes().expect("recover");
1385        assert_eq!(recovered, None);
1386    }
1387
1388    // -----------------------------------------------------------------------
1389    // Replay
1390    // -----------------------------------------------------------------------
1391
1392    #[test]
1393    fn replay_empty_repo() {
1394        let (_tmp, mgr) = setup();
1395        mgr.ensure_dirs().expect("dirs");
1396        let content = mgr.replay().expect("replay");
1397        assert!(content.is_empty());
1398    }
1399
1400    #[test]
1401    fn replay_single_shard() {
1402        let (_tmp, mgr) = setup();
1403        mgr.ensure_dirs().expect("dirs");
1404        mgr.create_shard(2026, 1).expect("create");
1405        mgr.append_raw(2026, 1, "event-a\n").expect("append");
1406
1407        let content = mgr.replay().expect("replay");
1408        assert!(content.contains("event-a"));
1409    }
1410
1411    #[test]
1412    fn replay_multiple_shards_in_order() {
1413        let (_tmp, mgr) = setup();
1414        mgr.ensure_dirs().expect("dirs");
1415
1416        mgr.create_shard(2026, 1).expect("create");
1417        mgr.create_shard(2026, 2).expect("create");
1418        mgr.create_shard(2026, 3).expect("create");
1419
1420        mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1421        mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1422        mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1423
1424        let content = mgr.replay().expect("replay");
1425
1426        // Events should appear in chronological order
1427        let jan_pos = content.find("event-jan").expect("jan");
1428        let feb_pos = content.find("event-feb").expect("feb");
1429        let mar_pos = content.find("event-mar").expect("mar");
1430        assert!(jan_pos < feb_pos);
1431        assert!(feb_pos < mar_pos);
1432    }
1433
1434    // -----------------------------------------------------------------------
1435    // Event count
1436    // -----------------------------------------------------------------------
1437
1438    #[test]
1439    fn event_count_empty() {
1440        let (_tmp, mgr) = setup();
1441        mgr.ensure_dirs().expect("dirs");
1442        assert_eq!(mgr.event_count().expect("count"), 0);
1443    }
1444
1445    #[test]
1446    fn event_count_excludes_comments_and_blanks() {
1447        let (_tmp, mgr) = setup();
1448        mgr.ensure_dirs().expect("dirs");
1449        mgr.create_shard(2026, 1).expect("create");
1450        // Header has 2 comment lines, then we add events
1451        mgr.append_raw(2026, 1, "event1\n").expect("append");
1452        mgr.append_raw(2026, 1, "event2\n").expect("append");
1453        mgr.append_raw(2026, 1, "\n").expect("blank");
1454
1455        assert_eq!(mgr.event_count().expect("count"), 2);
1456    }
1457
1458    // -----------------------------------------------------------------------
1459    // is_empty
1460    // -----------------------------------------------------------------------
1461
1462    #[test]
1463    fn is_empty_no_shards() {
1464        let (_tmp, mgr) = setup();
1465        mgr.ensure_dirs().expect("dirs");
1466        assert!(mgr.is_empty().expect("empty"));
1467    }
1468
1469    #[test]
1470    fn is_empty_with_shards() {
1471        let (_tmp, mgr) = setup();
1472        mgr.init().expect("init");
1473        assert!(!mgr.is_empty().expect("empty"));
1474    }
1475
1476    // -----------------------------------------------------------------------
1477    // Manifest
1478    // -----------------------------------------------------------------------
1479
1480    #[test]
1481    fn write_and_read_manifest() {
1482        let (_tmp, mgr) = setup();
1483        mgr.ensure_dirs().expect("dirs");
1484        mgr.create_shard(2026, 1).expect("create");
1485        mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1486        mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1487
1488        let written = mgr.write_manifest(2026, 1).expect("write manifest");
1489        assert_eq!(written.shard_name, "2026-01.events");
1490        assert_eq!(written.event_count, 2);
1491        assert!(written.byte_len > 0);
1492        assert!(written.file_hash.starts_with("blake3:"));
1493
1494        let read = mgr
1495            .read_manifest(2026, 1)
1496            .expect("read")
1497            .expect("should exist");
1498        assert_eq!(read, written);
1499    }
1500
1501    #[test]
1502    fn manifest_roundtrip() {
1503        let manifest = ShardManifest {
1504            shard_name: "2026-01.events".into(),
1505            event_count: 42,
1506            byte_len: 12345,
1507            file_hash: "blake3:abcdef0123456789".into(),
1508        };
1509
1510        let repr = manifest.to_string_repr();
1511        let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1512        assert_eq!(parsed, manifest);
1513    }
1514
1515    #[test]
1516    fn read_manifest_missing() {
1517        let (_tmp, mgr) = setup();
1518        mgr.ensure_dirs().expect("dirs");
1519        let result = mgr.read_manifest(2026, 1).expect("read");
1520        assert!(result.is_none());
1521    }
1522
1523    #[test]
1524    fn manifest_event_count_excludes_comments() {
1525        let (_tmp, mgr) = setup();
1526        mgr.ensure_dirs().expect("dirs");
1527        mgr.create_shard(2026, 1).expect("create");
1528        // Header has 2 comment lines
1529        mgr.append_raw(2026, 1, "event1\n").expect("append");
1530
1531        let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1532        // Only 1 event line, not the 2 header lines
1533        assert_eq!(manifest.event_count, 1);
1534    }
1535
1536    // -----------------------------------------------------------------------
1537    // Rotation
1538    // -----------------------------------------------------------------------
1539
1540    #[test]
1541    fn rotate_creates_shard_if_none_exist() {
1542        let (_tmp, mgr) = setup();
1543        mgr.ensure_dirs().expect("dirs");
1544
1545        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1546        let (ey, em) = current_year_month();
1547        assert_eq!((y, m), (ey, em));
1548
1549        assert!(mgr.shard_path(y, m).exists());
1550    }
1551
1552    #[test]
1553    fn rotate_no_op_same_month() {
1554        let (_tmp, mgr) = setup();
1555        let (y, m) = mgr.init().expect("init");
1556
1557        let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1558        assert_eq!((y, m), (y2, m2));
1559    }
1560
1561    #[test]
1562    fn rotate_different_month_seals_and_creates() {
1563        let (_tmp, mgr) = setup();
1564        mgr.ensure_dirs().expect("dirs");
1565
1566        // Create an old shard
1567        mgr.create_shard(2025, 11).expect("create");
1568        mgr.append_raw(2025, 11, "old event\n").expect("append");
1569        mgr.update_symlink(2025, 11).expect("symlink");
1570
1571        // Rotate should seal old and create new
1572        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1573        let (ey, em) = current_year_month();
1574        assert_eq!((y, m), (ey, em));
1575
1576        // Old shard should have a manifest
1577        assert!(mgr.manifest_path(2025, 11).exists());
1578
1579        // New shard should exist
1580        assert!(mgr.shard_path(ey, em).exists());
1581
1582        // Symlink should point to new shard
1583        #[cfg(unix)]
1584        {
1585            let target = fs::read_link(mgr.current_symlink()).expect("readlink");
1586            assert_eq!(target, PathBuf::from(ShardManager::shard_filename(ey, em)));
1587        }
1588    }
1589
1590    // -----------------------------------------------------------------------
1591    // Frozen shards
1592    // -----------------------------------------------------------------------
1593
1594    #[test]
1595    fn frozen_shard_not_modified_by_append() {
1596        let (_tmp, mgr) = setup();
1597        mgr.ensure_dirs().expect("dirs");
1598
1599        // Create and populate old shard
1600        mgr.create_shard(2025, 6).expect("create");
1601        mgr.append_raw(2025, 6, "old event\n").expect("append");
1602        let old_content = mgr.read_shard(2025, 6).expect("read");
1603
1604        // Init creates current month shard
1605        mgr.init().expect("init");
1606
1607        // Append only goes to active shard
1608        mgr.append("new event\n", false, Duration::from_secs(1))
1609            .expect("append");
1610
1611        // Old shard is unchanged
1612        let after_content = mgr.read_shard(2025, 6).expect("read");
1613        assert_eq!(old_content, after_content);
1614    }
1615
1616    // -----------------------------------------------------------------------
1617    // system_time_us
1618    // -----------------------------------------------------------------------
1619
1620    #[test]
1621    fn system_time_us_is_positive() {
1622        let ts = system_time_us();
1623        assert!(ts > 0, "system time should be positive: {ts}");
1624    }
1625
1626    #[test]
1627    fn system_time_us_is_reasonable() {
1628        let ts = system_time_us();
1629        // Should be after 2020-01-01 in microseconds
1630        let jan_2020_us: i64 = 1_577_836_800_000_000;
1631        assert!(ts > jan_2020_us, "system time too small: {ts}");
1632    }
1633
1634    // -----------------------------------------------------------------------
1635    // total_content_len
1636    // -----------------------------------------------------------------------
1637
1638    #[test]
1639    fn total_content_len_empty_repo() {
1640        let (_tmp, mgr) = setup();
1641        mgr.ensure_dirs().expect("dirs");
1642        let len = mgr.total_content_len().expect("len");
1643        assert_eq!(len, 0);
1644    }
1645
1646    #[test]
1647    fn total_content_len_single_shard() {
1648        let (_tmp, mgr) = setup();
1649        mgr.ensure_dirs().expect("dirs");
1650        mgr.create_shard(2026, 1).expect("create");
1651        mgr.append_raw(2026, 1, "line1\n").expect("append");
1652        mgr.append_raw(2026, 1, "line2\n").expect("append");
1653
1654        let full = mgr.replay().expect("replay");
1655        let len = mgr.total_content_len().expect("len");
1656        assert_eq!(len, full.len());
1657    }
1658
1659    #[test]
1660    fn total_content_len_multiple_shards() {
1661        let (_tmp, mgr) = setup();
1662        mgr.ensure_dirs().expect("dirs");
1663        mgr.create_shard(2026, 1).expect("shard 1");
1664        mgr.create_shard(2026, 2).expect("shard 2");
1665        mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1666        mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1667
1668        let full = mgr.replay().expect("replay");
1669        let len = mgr.total_content_len().expect("len");
1670        assert_eq!(len, full.len(), "total_content_len must match replay len");
1671    }
1672
1673    // -----------------------------------------------------------------------
1674    // read_content_range
1675    // -----------------------------------------------------------------------
1676
1677    #[test]
1678    fn read_content_range_empty_range() {
1679        let (_tmp, mgr) = setup();
1680        mgr.ensure_dirs().expect("dirs");
1681        mgr.create_shard(2026, 1).expect("create");
1682        mgr.append_raw(2026, 1, "event\n").expect("append");
1683
1684        let result = mgr.read_content_range(5, 5).expect("range");
1685        assert!(result.is_empty());
1686    }
1687
1688    #[test]
1689    fn read_content_range_within_single_shard() {
1690        let (_tmp, mgr) = setup();
1691        mgr.ensure_dirs().expect("dirs");
1692        mgr.create_shard(2026, 1).expect("create");
1693        // shard header is 2 lines; add a known event line
1694        mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1695
1696        let full = mgr.replay().expect("replay");
1697        // Find the position of "ABCDEF"
1698        let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1699        let range = mgr.read_content_range(pos, pos + 7).expect("range");
1700        assert_eq!(range, "ABCDEF\n");
1701    }
1702
1703    #[test]
1704    fn read_content_range_across_shard_boundary() {
1705        let (_tmp, mgr) = setup();
1706        mgr.ensure_dirs().expect("dirs");
1707        mgr.create_shard(2026, 1).expect("shard 1");
1708        mgr.create_shard(2026, 2).expect("shard 2");
1709        mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1710        mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1711
1712        let full = mgr.replay().expect("replay");
1713        // Read entire concatenation as a range
1714        let range = mgr.read_content_range(0, full.len()).expect("full range");
1715        assert_eq!(range, full);
1716    }
1717
1718    #[test]
1719    fn read_content_range_beyond_end() {
1720        let (_tmp, mgr) = setup();
1721        mgr.ensure_dirs().expect("dirs");
1722        mgr.create_shard(2026, 1).expect("create");
1723        mgr.append_raw(2026, 1, "event\n").expect("append");
1724
1725        let full = mgr.replay().expect("replay");
1726        // Requesting a range beyond the end should return empty
1727        let range = mgr
1728            .read_content_range(full.len(), full.len() + 100)
1729            .expect("beyond end");
1730        assert!(range.is_empty());
1731    }
1732
1733    // -----------------------------------------------------------------------
1734    // replay_from_offset
1735    // -----------------------------------------------------------------------
1736
1737    #[test]
1738    fn replay_from_offset_zero_returns_full_content() {
1739        let (_tmp, mgr) = setup();
1740        mgr.ensure_dirs().expect("dirs");
1741        mgr.create_shard(2026, 1).expect("create");
1742        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1743        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1744
1745        let full = mgr.replay().expect("full replay");
1746        let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1747        assert_eq!(from_zero, full);
1748        assert_eq!(total_len, full.len());
1749    }
1750
1751    #[test]
1752    fn replay_from_offset_skips_content_before_cursor() {
1753        let (_tmp, mgr) = setup();
1754        mgr.ensure_dirs().expect("dirs");
1755        mgr.create_shard(2026, 1).expect("create");
1756        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1757        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1758        mgr.append_raw(2026, 1, "event3\n").expect("e3");
1759
1760        let full = mgr.replay().expect("full replay");
1761
1762        // Find offset just after event2
1763        let cursor = full.find("event3").expect("event3 in content");
1764        let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1765        assert_eq!(tail, "event3\n");
1766        assert_eq!(total_len, full.len());
1767    }
1768
1769    #[test]
1770    fn replay_from_offset_at_end_returns_empty() {
1771        let (_tmp, mgr) = setup();
1772        mgr.ensure_dirs().expect("dirs");
1773        mgr.create_shard(2026, 1).expect("create");
1774        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1775
1776        let full = mgr.replay().expect("full replay");
1777        let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1778        assert!(tail.is_empty(), "tail should be empty at end of content");
1779        assert_eq!(total_len, full.len());
1780    }
1781
1782    #[test]
1783    fn replay_from_offset_skips_sealed_shards_before_cursor() {
1784        let (_tmp, mgr) = setup();
1785        mgr.ensure_dirs().expect("dirs");
1786
1787        // Two shards: a sealed shard (jan) and an active shard (feb)
1788        mgr.create_shard(2026, 1).expect("jan");
1789        mgr.create_shard(2026, 2).expect("feb");
1790        mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1791        mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1792        mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1793        mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1794
1795        let full = mgr.replay().expect("full replay");
1796        let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1797
1798        // Cursor is at the end of the jan shard — feb events are new
1799        let (tail, total_len) = mgr
1800            .replay_from_offset(jan_shard_len)
1801            .expect("from feb start");
1802        assert!(
1803            !tail.contains("jan-event"),
1804            "jan events should not appear in tail"
1805        );
1806        assert!(tail.contains("feb-event1"), "feb events must be in tail");
1807        assert!(tail.contains("feb-event2"), "feb events must be in tail");
1808        assert_eq!(total_len, full.len());
1809    }
1810
1811    #[test]
1812    fn replay_from_offset_total_len_equals_total_content_len() {
1813        let (_tmp, mgr) = setup();
1814        mgr.ensure_dirs().expect("dirs");
1815        mgr.create_shard(2026, 1).expect("shard 1");
1816        mgr.create_shard(2026, 2).expect("shard 2");
1817        mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1818        mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1819
1820        let total = mgr.total_content_len().expect("total_content_len");
1821        let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1822        assert_eq!(
1823            total, replay_total,
1824            "total_content_len and replay_from_offset total must agree"
1825        );
1826    }
1827}