Skip to main content

bones_core/
shard.rs

1//! Time-sharded event file management.
2//!
3//! Events are stored in monthly shard files under `.bones/events/YYYY-MM.events`.
4//! This module manages the directory layout, shard rotation, atomic append
5//! operations, and replay (reading all shards in chronological order).
6//!
7//! # Directory Layout
8//!
9//! ```text
10//! .bones/
11//!   events/
12//!     2026-01.events      # sealed shard
13//!     2026-01.manifest    # manifest for sealed shard
14//!     2026-02.events      # active shard
15//!   cache/
16//!     clock               # monotonic wall-clock file (microseconds)
17//!   lock                  # repo-wide advisory lock
18//! ```
19//!
20//! # Invariants
21//!
22//! - Sealed (frozen) shards are never modified after rotation.
23//! - The active shard is the only one that receives appends.
24//! - The active shard is derived from `list_shards().last()` (sorted filenames).
25//! - Each append uses `O_APPEND` + `write_all` + `flush` for crash consistency.
26//! - Torn-write recovery truncates incomplete trailing lines on startup.
27//! - Monotonic timestamps: `wall_ts_us = max(system_time_us, last + 1)`.
28
29use std::fs::{self, OpenOptions};
30use std::io::{self, Write as IoWrite};
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use chrono::Datelike;
35
36use crate::event::writer::shard_header;
37use crate::lock::ShardLock;
38
39// ---------------------------------------------------------------------------
40// Error types
41// ---------------------------------------------------------------------------
42
43/// Errors that can occur during shard operations.
44#[derive(Debug, thiserror::Error)]
45pub enum ShardError {
46    /// I/O error during shard operations.
47    #[error("shard I/O error: {0}")]
48    Io(#[from] io::Error),
49
50    /// Lock acquisition failed.
51    #[error("lock error: {0}")]
52    Lock(#[from] crate::lock::LockError),
53
54    /// The `.bones` directory does not exist and could not be created.
55    #[error("failed to initialize .bones directory: {0}")]
56    InitFailed(io::Error),
57
58    /// Shard file name does not match expected `YYYY-MM.events` pattern.
59    #[error("invalid shard filename: {0}")]
60    InvalidShardName(String),
61
62    /// Shard file has a corrupted or missing header.
63    #[error("corrupted shard {path}: {reason}")]
64    CorruptedShard {
65        /// Path to the corrupted shard file.
66        path: PathBuf,
67        /// Description of what is wrong.
68        reason: String,
69    },
70}
71
72/// A shard integrity problem found during validation.
73#[derive(Debug, Clone)]
74pub struct ShardIntegrityIssue {
75    /// Name of the shard file (e.g. `"2026-01.events"`).
76    pub shard_name: String,
77    /// Human-readable description of the problem.
78    pub problem: String,
79}
80
81// ---------------------------------------------------------------------------
82// Manifest
83// ---------------------------------------------------------------------------
84
85/// Manifest for a sealed shard file, recording integrity metadata.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardManifest {
88    /// Shard file name (e.g., `"2026-01.events"`).
89    pub shard_name: String,
90    /// Number of event lines (excluding comments and blanks).
91    pub event_count: u64,
92    /// Total byte length of the shard file.
93    pub byte_len: u64,
94    /// BLAKE3 hash of the entire shard file contents.
95    pub file_hash: String,
96}
97
98impl ShardManifest {
99    /// Serialize manifest to a human-readable format.
100    #[must_use]
101    pub fn to_string_repr(&self) -> String {
102        format!(
103            "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
104            self.shard_name, self.event_count, self.byte_len, self.file_hash
105        )
106    }
107
108    /// Parse a manifest from its string representation.
109    ///
110    /// Returns `None` if required fields are missing or unparseable.
111    #[must_use]
112    pub fn from_string_repr(s: &str) -> Option<Self> {
113        let mut shard_name = None;
114        let mut event_count = None;
115        let mut byte_len = None;
116        let mut file_hash = None;
117
118        for line in s.lines() {
119            if let Some(val) = line.strip_prefix("shard: ") {
120                shard_name = Some(val.to_string());
121            } else if let Some(val) = line.strip_prefix("event_count: ") {
122                event_count = val.parse().ok();
123            } else if let Some(val) = line.strip_prefix("byte_len: ") {
124                byte_len = val.parse().ok();
125            } else if let Some(val) = line.strip_prefix("file_hash: ") {
126                file_hash = Some(val.to_string());
127            }
128        }
129
130        Some(Self {
131            shard_name: shard_name?,
132            event_count: event_count?,
133            byte_len: byte_len?,
134            file_hash: file_hash?,
135        })
136    }
137}
138
139// ---------------------------------------------------------------------------
140// ShardManager
141// ---------------------------------------------------------------------------
142
143/// Manages time-sharded event files in a `.bones` repository.
144///
145/// The shard manager handles:
146/// - Directory initialization
147/// - Shard rotation on month boundaries
148/// - Atomic append with advisory locking
149/// - Monotonic clock maintenance
150/// - Torn-write recovery
151/// - Replay (reading all shards chronologically)
152/// - Sealed shard manifest generation
153pub struct ShardManager {
154    /// Root of the `.bones` directory.
155    bones_dir: PathBuf,
156}
157
158impl ShardManager {
159    /// Create a new `ShardManager` for the given `.bones` directory.
160    ///
161    /// Does not create directories on construction; call [`init`](Self::init)
162    /// or [`ensure_dirs`](Self::ensure_dirs) first if needed.
163    #[must_use]
164    pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
165        Self {
166            bones_dir: bones_dir.into(),
167        }
168    }
169
170    /// Path to the events directory.
171    #[must_use]
172    pub fn events_dir(&self) -> PathBuf {
173        self.bones_dir.join("events")
174    }
175
176    /// Path to the advisory lock file.
177    #[must_use]
178    pub fn lock_path(&self) -> PathBuf {
179        self.bones_dir.join("lock")
180    }
181
182    /// Path to the monotonic clock file.
183    #[must_use]
184    pub fn clock_path(&self) -> PathBuf {
185        self.bones_dir.join("cache").join("clock")
186    }
187
188    /// Generate the shard filename for a given year and month.
189    #[must_use]
190    pub fn shard_filename(year: i32, month: u32) -> String {
191        format!("{year:04}-{month:02}.events")
192    }
193
194    /// Path to a specific shard file.
195    #[must_use]
196    pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
197        self.events_dir().join(Self::shard_filename(year, month))
198    }
199
200    /// Path to a manifest file for a given shard.
201    #[must_use]
202    pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
203        self.events_dir()
204            .join(format!("{year:04}-{month:02}.manifest"))
205    }
206
207    // -----------------------------------------------------------------------
208    // Initialization
209    // -----------------------------------------------------------------------
210
211    /// Create the `.bones/events/` and `.bones/cache/` directories if they
212    /// don't exist. Idempotent.
213    ///
214    /// # Errors
215    ///
216    /// Returns [`ShardError::InitFailed`] if directory creation fails.
217    pub fn ensure_dirs(&self) -> Result<(), ShardError> {
218        fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
219        fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
220        Ok(())
221    }
222
223    /// Initialize the shard directory and create the first shard file
224    /// with the standard header if no shards exist.
225    ///
226    /// Returns the (year, month) of the active shard.
227    ///
228    /// # Errors
229    ///
230    /// Returns [`ShardError`] on I/O failure or if directories cannot be
231    /// created.
232    pub fn init(&self) -> Result<(i32, u32), ShardError> {
233        self.ensure_dirs()?;
234
235        let shards = self.list_shards()?;
236        if shards.is_empty() {
237            let (year, month) = current_year_month();
238            self.create_shard(year, month)?;
239            Ok((year, month))
240        } else if let Some(&(year, month)) = shards.last() {
241            Ok((year, month))
242        } else {
243            unreachable!("shards is non-empty")
244        }
245    }
246
247    // -----------------------------------------------------------------------
248    // Shard listing
249    // -----------------------------------------------------------------------
250
251    /// List all shard files in chronological order as (year, month) pairs.
252    ///
253    /// Shard filenames must match `YYYY-MM.events`. Invalid filenames are
254    /// silently skipped.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`ShardError::Io`] if the directory cannot be read.
259    pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
260        let events_dir = self.events_dir();
261        if !events_dir.exists() {
262            return Ok(Vec::new());
263        }
264
265        let mut shards = Vec::new();
266        for entry in fs::read_dir(&events_dir)? {
267            let entry = entry?;
268            let name = entry.file_name();
269            let name_str = name.to_string_lossy();
270            if let Some(ym) = parse_shard_filename(&name_str) {
271                shards.push(ym);
272            }
273        }
274        shards.sort_unstable();
275        Ok(shards)
276    }
277
278    /// Get the active (most recent) shard, if any.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`ShardError::Io`] if the directory cannot be read.
283    pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
284        let shards = self.list_shards()?;
285        Ok(shards.last().copied())
286    }
287
288    // -----------------------------------------------------------------------
289    // Shard creation and rotation
290    // -----------------------------------------------------------------------
291
292    /// Create a new shard file with the standard header.
293    ///
294    /// Returns the path of the created file. Does nothing if the file
295    /// already exists.
296    ///
297    /// # Errors
298    ///
299    /// Returns [`ShardError::Io`] if the file cannot be written.
300    pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
301        let path = self.shard_path(year, month);
302        if path.exists() {
303            return Ok(path);
304        }
305
306        let header = shard_header();
307        fs::write(&path, header)?;
308        Ok(path)
309    }
310
311    /// Check if the current month differs from the active shard's month.
312    /// If so, seal the old shard (generate manifest) and create a new one.
313    ///
314    /// Returns the (year, month) of the now-active shard.
315    ///
316    /// # Errors
317    ///
318    /// Returns [`ShardError`] on I/O failure during rotation.
319    pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
320        let (current_year, current_month) = current_year_month();
321        let active = self.active_shard()?;
322
323        match active {
324            Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
325            Some((y, m)) => {
326                // Seal the old shard with a manifest
327                self.write_manifest(y, m)?;
328                // Create new shard
329                self.create_shard(current_year, current_month)?;
330                Ok((current_year, current_month))
331            }
332            None => {
333                // No shards exist yet, create first one
334                self.create_shard(current_year, current_month)?;
335                Ok((current_year, current_month))
336            }
337        }
338    }
339
340    // -----------------------------------------------------------------------
341    // Manifest generation
342    // -----------------------------------------------------------------------
343
344    /// Generate and write a manifest file for a sealed shard.
345    ///
346    /// # Errors
347    ///
348    /// Returns [`ShardError::Io`] if the shard file cannot be read or the
349    /// manifest cannot be written.
350    pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
351        let shard_path = self.shard_path(year, month);
352        let content = fs::read(&shard_path)?;
353        let content_str = String::from_utf8_lossy(&content);
354
355        // Count event lines (non-comment, non-blank)
356        let event_count = content_str
357            .lines()
358            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
359            .count() as u64;
360
361        let byte_len = content.len() as u64;
362        let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
363
364        let manifest = ShardManifest {
365            shard_name: Self::shard_filename(year, month),
366            event_count,
367            byte_len,
368            file_hash,
369        };
370
371        let manifest_path = self.manifest_path(year, month);
372        fs::write(&manifest_path, manifest.to_string_repr())?;
373
374        Ok(manifest)
375    }
376
377    /// Read a manifest file if it exists.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`ShardError::Io`] if the manifest file exists but cannot
382    /// be read.
383    pub fn read_manifest(
384        &self,
385        year: i32,
386        month: u32,
387    ) -> Result<Option<ShardManifest>, ShardError> {
388        let manifest_path = self.manifest_path(year, month);
389        if !manifest_path.exists() {
390            return Ok(None);
391        }
392        let content = fs::read_to_string(&manifest_path)?;
393        Ok(ShardManifest::from_string_repr(&content))
394    }
395
396    // -----------------------------------------------------------------------
397    // Sealed shard validation
398    // -----------------------------------------------------------------------
399
400    /// Validate byte-length of sealed shards against their manifests.
401    ///
402    /// For each shard except the last (active), checks whether a `.manifest`
403    /// file exists and whether `byte_len` matches the actual file size on
404    /// disk. This is a lightweight startup check — full BLAKE3 hash
405    /// verification is deferred to `bn doctor` / `bn verify`.
406    ///
407    /// Returns a list of issues found. An empty list means all sealed shards
408    /// with manifests match.
409    ///
410    /// # Errors
411    ///
412    /// Returns [`ShardError::Io`] if shard files or manifests cannot be read.
413    pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
414        let shards = self.list_shards()?;
415        let mut issues = Vec::new();
416
417        // All shards except the last are sealed.
418        let sealed = if shards.len() > 1 {
419            &shards[..shards.len() - 1]
420        } else {
421            return Ok(issues);
422        };
423
424        for &(year, month) in sealed {
425            let shard_path = self.shard_path(year, month);
426            let shard_name = Self::shard_filename(year, month);
427
428            let Some(manifest) = self.read_manifest(year, month)? else {
429                tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
430                issues.push(ShardIntegrityIssue {
431                    shard_name: shard_name.clone(),
432                    problem: "sealed shard has no manifest file".into(),
433                });
434                continue;
435            };
436
437            let file_len = fs::metadata(&shard_path)?.len();
438            if file_len != manifest.byte_len {
439                tracing::error!(
440                    shard = %shard_name,
441                    expected = manifest.byte_len,
442                    actual = file_len,
443                    "sealed shard byte length mismatch"
444                );
445                issues.push(ShardIntegrityIssue {
446                    shard_name,
447                    problem: format!(
448                        "byte length mismatch: manifest says {} bytes, file is {} bytes",
449                        manifest.byte_len, file_len
450                    ),
451                });
452            }
453        }
454
455        Ok(issues)
456    }
457
458    // -----------------------------------------------------------------------
459    // Append
460    // -----------------------------------------------------------------------
461
462    /// Append an event line to the active shard.
463    ///
464    /// This method:
465    /// 1. Acquires the repo-wide advisory lock.
466    /// 2. Rotates shards if the month has changed.
467    /// 3. Reads and updates the monotonic clock.
468    /// 4. Appends the line using `O_APPEND` + `write_all` + `flush`.
469    /// 5. Optionally calls `sync_data` if `durable` is true.
470    /// 6. Releases the lock.
471    ///
472    /// The `line` must be a complete TSJSON line ending with `\n`.
473    ///
474    /// Returns the monotonic timestamp used.
475    ///
476    /// # Errors
477    ///
478    /// Returns [`ShardError::Lock`] if the lock cannot be acquired within
479    /// `lock_timeout`, or [`ShardError::Io`] on write failure.
480    pub fn append(
481        &self,
482        line: &str,
483        durable: bool,
484        lock_timeout: Duration,
485    ) -> Result<i64, ShardError> {
486        self.ensure_dirs()?;
487
488        let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
489
490        // Rotate if month changed
491        let (year, month) = self.rotate_if_needed()?;
492        let shard_path = self.shard_path(year, month);
493
494        // Validate shard header before appending
495        if shard_path.exists() {
496            validate_shard_header(&shard_path)?;
497        }
498
499        // Update monotonic clock
500        let ts = self.next_timestamp()?;
501
502        // Append with O_APPEND
503        let mut file = OpenOptions::new()
504            .create(true)
505            .append(true)
506            .open(&shard_path)?;
507
508        file.write_all(line.as_bytes())?;
509        file.flush()?;
510
511        if durable {
512            file.sync_data()?;
513        }
514
515        Ok(ts)
516    }
517
518    /// Append a raw line without locking or clock update.
519    ///
520    /// Used internally and in tests. The caller is responsible for
521    /// holding the lock and managing the clock.
522    ///
523    /// # Errors
524    ///
525    /// Returns [`ShardError::Io`] on write failure.
526    pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
527        let shard_path = self.shard_path(year, month);
528
529        let mut file = OpenOptions::new()
530            .create(true)
531            .append(true)
532            .open(&shard_path)?;
533
534        file.write_all(line.as_bytes())?;
535        file.flush()?;
536        Ok(())
537    }
538
539    // -----------------------------------------------------------------------
540    // Monotonic clock
541    // -----------------------------------------------------------------------
542
543    /// Read the current monotonic clock value.
544    ///
545    /// Returns 0 if the clock file doesn't exist.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`ShardError::Io`] if the clock file exists but cannot be
550    /// read.
551    pub fn read_clock(&self) -> Result<i64, ShardError> {
552        let path = self.clock_path();
553        if !path.exists() {
554            return Ok(0);
555        }
556        let content = fs::read_to_string(&path)?;
557        Ok(content.trim().parse::<i64>().unwrap_or(0))
558    }
559
560    /// Compute the next monotonic timestamp and write it to the clock file.
561    ///
562    /// `next = max(system_time_us, last + 1)`
563    ///
564    /// The caller must hold the repo lock.
565    ///
566    /// # Errors
567    ///
568    /// Returns [`ShardError::Io`] if the clock file cannot be read or
569    /// written.
570    pub fn next_timestamp(&self) -> Result<i64, ShardError> {
571        let last = self.read_clock()?;
572        let now = system_time_us();
573        let next = std::cmp::max(now, last + 1);
574        self.write_clock(next)?;
575        Ok(next)
576    }
577
578    /// Write a clock value to the clock file.
579    fn write_clock(&self, value: i64) -> Result<(), ShardError> {
580        let path = self.clock_path();
581        if let Some(parent) = path.parent() {
582            fs::create_dir_all(parent)?;
583        }
584        fs::write(&path, value.to_string())?;
585        Ok(())
586    }
587
588    // -----------------------------------------------------------------------
589    // Torn-write recovery
590    // -----------------------------------------------------------------------
591
592    /// Scan the active shard for torn writes and truncate incomplete
593    /// trailing lines.
594    ///
595    /// A torn write leaves a partial line (no terminating `\n`) at the end
596    /// of the file. This method finds the last complete newline and
597    /// truncates everything after it.
598    ///
599    /// Returns `Ok(Some(bytes_truncated))` if a torn write was repaired,
600    /// or `Ok(None)` if the file was clean.
601    ///
602    /// # Errors
603    ///
604    /// Returns [`ShardError::Io`] if the shard file cannot be read or
605    /// truncated.
606    pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
607        let Some(active) = self.active_shard()? else {
608            return Ok(None);
609        };
610
611        let shard_path = self.shard_path(active.0, active.1);
612        recover_shard_torn_write(&shard_path)
613    }
614
615    // -----------------------------------------------------------------------
616    // Replay
617    // -----------------------------------------------------------------------
618
619    /// Read all event lines from all shards in chronological order.
620    ///
621    /// Shards are read in lexicographic order (`YYYY-MM` sorts correctly).
622    /// Returns the concatenated content of all shard files.
623    ///
624    /// # Errors
625    ///
626    /// Returns [`ShardError::Io`] if any shard file cannot be read.
627    pub fn replay(&self) -> Result<String, ShardError> {
628        let shards = self.list_shards()?;
629        let mut content = String::new();
630
631        for (year, month) in shards {
632            let path = self.shard_path(year, month);
633            let shard_content = fs::read_to_string(&path)?;
634            content.push_str(&shard_content);
635        }
636
637        Ok(content)
638    }
639
640    /// Read event lines from a specific shard.
641    ///
642    /// # Errors
643    ///
644    /// Returns [`ShardError::Io`] if the shard file cannot be read.
645    pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
646        let path = self.shard_path(year, month);
647        Ok(fs::read_to_string(&path)?)
648    }
649
650    /// Compute the total concatenated byte size of all shards without reading
651    /// their full contents.
652    ///
653    /// This is used for advancing the projection cursor without paying the
654    /// cost of a full replay.
655    ///
656    /// # Errors
657    ///
658    /// Returns [`ShardError::Io`] if any shard file metadata cannot be read.
659    pub fn total_content_len(&self) -> Result<usize, ShardError> {
660        let shards = self.list_shards()?;
661        let mut total = 0usize;
662        for (year, month) in shards {
663            let path = self.shard_path(year, month);
664            let meta = fs::metadata(&path)?;
665            total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
666        }
667        Ok(total)
668    }
669
670    /// Read shard content starting from the given absolute byte offset in the
671    /// concatenated shard sequence.
672    ///
673    /// Sealed shards that end entirely before `offset` are skipped without
674    /// reading their contents — only their file sizes are stat(2)'d.
675    /// Only content from `offset` onward is returned, bounding memory use to
676    /// new/unseen events rather than the full log.
677    ///
678    /// Returns `(new_content, total_len)` where:
679    /// - `new_content` is the bytes from `offset` to the end of all shards.
680    /// - `total_len` is the total byte size of all shards concatenated (usable
681    ///   as the new cursor offset after processing `new_content`).
682    ///
683    /// # Errors
684    ///
685    /// Returns [`ShardError::Io`] if shard metadata or file reads fail.
686    pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
687        let shards = self.list_shards()?;
688        let mut cumulative: usize = 0;
689        let mut result = String::new();
690        let mut found_start = false;
691
692        for (year, month) in shards {
693            let path = self.shard_path(year, month);
694            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
695
696            let shard_end = cumulative.saturating_add(shard_len);
697
698            if shard_end <= offset {
699                // This shard ends at or before the cursor — skip entirely.
700                cumulative = shard_end;
701                continue;
702            }
703
704            // This shard overlaps with or is entirely after the cursor.
705            let shard_content = fs::read_to_string(&path)?;
706
707            if found_start {
708                result.push_str(&shard_content);
709            } else {
710                // Calculate the within-shard byte offset.
711                let within = offset.saturating_sub(cumulative);
712                // Guard: within must not exceed shard content length.
713                let within = within.min(shard_content.len());
714                result.push_str(&shard_content[within..]);
715                found_start = true;
716            }
717
718            cumulative = shard_end;
719        }
720
721        Ok((result, cumulative))
722    }
723
724    /// Read bytes from the concatenated shard sequence in `[start_offset, end_offset)`.
725    ///
726    /// Only shards that overlap with the requested range are read.
727    /// Shards entirely outside the range are stat(2)'d but not read.
728    ///
729    /// This is used to read a small window around the projection cursor for
730    /// hash validation without loading the full shard content.
731    ///
732    /// # Errors
733    ///
734    /// Returns [`ShardError::Io`] if any shard file cannot be read.
735    pub fn read_content_range(
736        &self,
737        start_offset: usize,
738        end_offset: usize,
739    ) -> Result<String, ShardError> {
740        if start_offset >= end_offset {
741            return Ok(String::new());
742        }
743
744        let shards = self.list_shards()?;
745        let mut cumulative: usize = 0;
746        let mut result = String::new();
747
748        for (year, month) in shards {
749            let path = self.shard_path(year, month);
750            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
751            let shard_end = cumulative.saturating_add(shard_len);
752
753            if shard_end <= start_offset {
754                // Shard ends before our range — skip without reading.
755                cumulative = shard_end;
756                continue;
757            }
758
759            if cumulative >= end_offset {
760                // Shard starts after our range — done.
761                break;
762            }
763
764            let shard_content = fs::read_to_string(&path)?;
765
766            // Clip to the slice of this shard that overlaps with [start, end).
767            let within_start = if cumulative < start_offset {
768                (start_offset - cumulative).min(shard_content.len())
769            } else {
770                0
771            };
772            let within_end = if shard_end > end_offset {
773                (end_offset - cumulative).min(shard_content.len())
774            } else {
775                shard_content.len()
776            };
777
778            result.push_str(&shard_content[within_start..within_end]);
779            cumulative = shard_end;
780        }
781
782        Ok(result)
783    }
784
785    /// Count event lines across all shards (excluding comments and blanks).
786    ///
787    /// # Errors
788    ///
789    /// Returns [`ShardError::Io`] if any shard file cannot be read.
790    pub fn event_count(&self) -> Result<u64, ShardError> {
791        let content = self.replay()?;
792        let count = content
793            .lines()
794            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
795            .count();
796        Ok(count as u64)
797    }
798
799    /// Iterate over all event lines across all shards.
800    ///
801    /// Yields `(absolute_offset, line_content)` pairs.
802    ///
803    /// # Errors
804    ///
805    /// Returns [`ShardError::Io`] if directory or shard reading fails.
806    pub fn replay_lines(
807        &self,
808    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
809        self.replay_lines_from_offset(0)
810    }
811
812    /// Iterate over event lines starting from a given absolute byte offset.
813    ///
814    /// Yields `(absolute_offset, line_content)` pairs.
815    ///
816    /// # Errors
817    ///
818    /// Returns [`ShardError::Io`] if directory or shard reading fails.
819    pub fn replay_lines_from_offset(
820        &self,
821        offset: usize,
822    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
823        let shards = self.list_shards()?;
824        let bones_dir = self.bones_dir.clone();
825
826        Ok(ShardLineIterator {
827            shards,
828            current_shard_idx: 0,
829            current_reader: None,
830            cumulative_offset: 0,
831            bones_dir,
832        }
833        .skip_to_offset(offset))
834    }
835
836    /// Check if the repository has any event shards.
837    ///
838    /// # Errors
839    ///
840    /// Returns [`ShardError::Io`] if the events directory cannot be read.
841    pub fn is_empty(&self) -> Result<bool, ShardError> {
842        let shards = self.list_shards()?;
843        Ok(shards.is_empty())
844    }
845}
846
847// ---------------------------------------------------------------------------
848// ShardLineIterator
849// ---------------------------------------------------------------------------
850
851struct ShardLineIterator {
852    shards: Vec<(i32, u32)>,
853    current_shard_idx: usize,
854    current_reader: Option<io::BufReader<fs::File>>,
855    cumulative_offset: usize,
856    bones_dir: PathBuf,
857}
858
859impl ShardLineIterator {
860    fn skip_to_offset(mut self, offset: usize) -> Self {
861        // Fast-forward shards using metadata
862        while self.current_shard_idx < self.shards.len() {
863            let (year, month) = self.shards[self.current_shard_idx];
864            let shard_path = self
865                .bones_dir
866                .join("events")
867                .join(ShardManager::shard_filename(year, month));
868            if let Ok(meta) = fs::metadata(shard_path) {
869                let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
870                if self.cumulative_offset + shard_len <= offset {
871                    self.cumulative_offset += shard_len;
872                    self.current_shard_idx += 1;
873                    continue;
874                }
875            }
876            break;
877        }
878
879        // Now we are at the shard that contains the offset.
880        // The first call to next() will open it and we'll need to skip
881        // the 'within-shard' offset.
882        // We can handle this by storing the skip amount.
883        self.cumulative_offset = offset;
884        self
885    }
886}
887
888impl Iterator for ShardLineIterator {
889    type Item = io::Result<(usize, String)>;
890
891    fn next(&mut self) -> Option<Self::Item> {
892        use std::io::{BufRead, Seek, SeekFrom};
893
894        loop {
895            if self.current_reader.is_none() {
896                if self.current_shard_idx >= self.shards.len() {
897                    return None;
898                }
899
900                let (year, month) = self.shards[self.current_shard_idx];
901                let shard_path = self
902                    .bones_dir
903                    .join("events")
904                    .join(ShardManager::shard_filename(year, month));
905
906                // Legacy compat: skip forwarding-pointer shards created by
907                // an older non-Unix symlink fallback.  These are tiny files
908                // whose content is just a `YYYY-MM.events` filename.
909                if is_forwarding_pointer(&shard_path) {
910                    tracing::warn!(
911                        shard = %shard_path.display(),
912                        "skipping legacy forwarding-pointer shard during replay"
913                    );
914                    if let Ok(meta) = fs::metadata(&shard_path) {
915                        self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
916                    }
917                    self.current_shard_idx += 1;
918                    continue;
919                }
920
921                // Validate shard header before reading.
922                if let Err(e) = validate_shard_header(&shard_path) {
923                    tracing::error!(
924                        shard = %shard_path.display(),
925                        error = %e,
926                        "shard header validation failed"
927                    );
928                    return Some(Err(io::Error::new(
929                        io::ErrorKind::InvalidData,
930                        e.to_string(),
931                    )));
932                }
933
934                let mut file = match fs::File::open(shard_path) {
935                    Ok(f) => f,
936                    Err(e) => return Some(Err(e)),
937                };
938
939                // Calculate cumulative offset before this shard
940                let mut cumulative_before = 0;
941                for i in 0..self.current_shard_idx {
942                    let (y, m) = self.shards[i];
943                    let p = self
944                        .bones_dir
945                        .join("events")
946                        .join(ShardManager::shard_filename(y, m));
947                    if let Ok(meta) = fs::metadata(p) {
948                        cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
949                    }
950                }
951
952                if self.cumulative_offset > cumulative_before {
953                    let within = self.cumulative_offset - cumulative_before;
954                    if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
955                        return Some(Err(e));
956                    }
957                }
958
959                self.current_reader = Some(io::BufReader::new(file));
960            }
961
962            let reader = self
963                .current_reader
964                .as_mut()
965                .expect("reader was just set above");
966            let mut line = String::new();
967            let offset = self.cumulative_offset;
968
969            match reader.read_line(&mut line) {
970                Ok(0) => {
971                    // EOF for this shard
972                    self.current_reader = None;
973                    self.current_shard_idx += 1;
974                }
975                Ok(n) => {
976                    self.cumulative_offset += n;
977                    return Some(Ok((offset, line)));
978                }
979                Err(e) => return Some(Err(e)),
980            }
981        }
982    }
983}
984
985// ---------------------------------------------------------------------------
986// Helpers
987// ---------------------------------------------------------------------------
988
989/// Get the current year and month from system time.
990#[must_use]
991fn current_year_month() -> (i32, u32) {
992    let now = chrono::Utc::now();
993    (now.year(), now.month())
994}
995
996/// Get the current system time in microseconds since Unix epoch.
997#[allow(clippy::cast_possible_truncation)]
998#[must_use]
999fn system_time_us() -> i64 {
1000    SystemTime::now()
1001        .duration_since(UNIX_EPOCH)
1002        .map(|d| d.as_micros() as i64)
1003        .unwrap_or(0)
1004}
1005
1006/// Check if `path` is a legacy forwarding-pointer shard.
1007///
1008/// Forwarding pointers are tiny files (<=30 bytes) whose content is just a
1009/// `YYYY-MM.events` filename.  They were created by an older non-Unix symlink
1010/// fallback that has since been removed.  Real shards start with a multi-line
1011/// header far exceeding 30 bytes.
1012fn is_forwarding_pointer(path: &Path) -> bool {
1013    let Ok(meta) = fs::metadata(path) else {
1014        return false;
1015    };
1016    if meta.len() > 30 {
1017        return false;
1018    }
1019    let Ok(content) = fs::read_to_string(path) else {
1020        return false;
1021    };
1022    parse_shard_filename(content.trim()).is_some()
1023}
1024
1025/// Validate that a shard file starts with the expected header line.
1026///
1027/// Reads only the first line and checks it matches [`SHARD_HEADER`].
1028/// Empty files or files starting with the correct header pass.
1029/// Forwarding-pointer shards are silently accepted (legacy compat).
1030///
1031/// # Errors
1032///
1033/// Returns [`ShardError::CorruptedShard`] if the first line doesn't match,
1034/// or [`ShardError::Io`] if the file can't be read.
1035pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
1036    use crate::event::writer::SHARD_HEADER;
1037    use std::io::{BufRead, BufReader};
1038
1039    // Skip forwarding-pointer shards (legacy compat).
1040    if is_forwarding_pointer(path) {
1041        return Ok(());
1042    }
1043
1044    let file = fs::File::open(path)?;
1045    let mut reader = BufReader::new(file);
1046    let mut first_line = String::new();
1047    let n = reader.read_line(&mut first_line)?;
1048
1049    // Empty file is ok (will get header on first write).
1050    if n == 0 {
1051        return Ok(());
1052    }
1053
1054    let trimmed = first_line.trim_end();
1055    if trimmed != SHARD_HEADER {
1056        return Err(ShardError::CorruptedShard {
1057            path: path.to_path_buf(),
1058            reason: format!(
1059                "expected header '{}', found '{}'",
1060                SHARD_HEADER,
1061                &trimmed[..trimmed.len().min(80)]
1062            ),
1063        });
1064    }
1065
1066    Ok(())
1067}
1068
1069/// Parse a shard filename like `"2026-02.events"` into (year, month).
1070fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
1071    let stem = name.strip_suffix(".events")?;
1072    // Must not be "current"
1073    if stem == "current" {
1074        return None;
1075    }
1076    let (year_str, month_str) = stem.split_once('-')?;
1077    let year: i32 = year_str.parse().ok()?;
1078    let month: u32 = month_str.parse().ok()?;
1079    if !(1..=12).contains(&month) {
1080        return None;
1081    }
1082    Some((year, month))
1083}
1084
1085/// Recover torn writes for a specific shard file.
1086///
1087/// Returns `Ok(Some(bytes_truncated))` if repair was needed,
1088/// or `Ok(None)` if the file was clean.
1089fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
1090    let metadata = fs::metadata(path)?;
1091    let file_len = metadata.len();
1092    if file_len == 0 {
1093        return Ok(None);
1094    }
1095
1096    let content = fs::read(path)?;
1097
1098    // Find the last newline
1099    let last_newline = content.iter().rposition(|&b| b == b'\n');
1100
1101    if let Some(pos) = last_newline {
1102        let expected_len = (pos + 1) as u64;
1103        if expected_len < file_len {
1104            // There are bytes after the last newline — torn write
1105            let truncated = file_len - expected_len;
1106            let file = OpenOptions::new().write(true).open(path)?;
1107            file.set_len(expected_len)?;
1108            Ok(Some(truncated))
1109        } else {
1110            // File ends with newline — clean
1111            Ok(None)
1112        }
1113    } else {
1114        // No newline at all — entire content is a torn write
1115        // (or a corrupt file). Truncate to zero.
1116        let file = OpenOptions::new().write(true).open(path)?;
1117        file.set_len(0)?;
1118        Ok(Some(file_len))
1119    }
1120}
1121
1122// ---------------------------------------------------------------------------
1123// Tests
1124// ---------------------------------------------------------------------------
1125
1126#[cfg(test)]
1127mod tests {
1128    use super::*;
1129    use tempfile::TempDir;
1130
1131    fn setup() -> (TempDir, ShardManager) {
1132        let tmp = TempDir::new().expect("tempdir");
1133        let bones_dir = tmp.path().join(".bones");
1134        let mgr = ShardManager::new(&bones_dir);
1135        (tmp, mgr)
1136    }
1137
1138    // -----------------------------------------------------------------------
1139    // parse_shard_filename
1140    // -----------------------------------------------------------------------
1141
1142    #[test]
1143    fn parse_valid_shard_filenames() {
1144        assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1145        assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1146        assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1147    }
1148
1149    #[test]
1150    fn parse_invalid_shard_filenames() {
1151        assert_eq!(parse_shard_filename("current.events"), None);
1152        assert_eq!(parse_shard_filename("2026-13.events"), None); // month > 12
1153        assert_eq!(parse_shard_filename("2026-00.events"), None); // month 0
1154        assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1155        assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1156        assert_eq!(parse_shard_filename(""), None);
1157    }
1158
1159    // -----------------------------------------------------------------------
1160    // ShardManager::new, paths
1161    // -----------------------------------------------------------------------
1162
1163    #[test]
1164    fn shard_manager_paths() {
1165        let mgr = ShardManager::new("/repo/.bones");
1166        assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1167        assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1168        assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1169        assert_eq!(
1170            mgr.shard_path(2026, 2),
1171            PathBuf::from("/repo/.bones/events/2026-02.events")
1172        );
1173        assert_eq!(
1174            mgr.manifest_path(2026, 1),
1175            PathBuf::from("/repo/.bones/events/2026-01.manifest")
1176        );
1177    }
1178
1179    #[test]
1180    fn shard_filename_format() {
1181        assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1182        assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1183        assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1184    }
1185
1186    // -----------------------------------------------------------------------
1187    // ensure_dirs / init
1188    // -----------------------------------------------------------------------
1189
1190    #[test]
1191    fn ensure_dirs_creates_directories() {
1192        let (_tmp, mgr) = setup();
1193        mgr.ensure_dirs().expect("should create dirs");
1194        assert!(mgr.events_dir().exists());
1195        assert!(mgr.bones_dir.join("cache").exists());
1196    }
1197
1198    #[test]
1199    fn ensure_dirs_is_idempotent() {
1200        let (_tmp, mgr) = setup();
1201        mgr.ensure_dirs().expect("first");
1202        mgr.ensure_dirs().expect("second");
1203        assert!(mgr.events_dir().exists());
1204    }
1205
1206    #[test]
1207    fn init_creates_first_shard() {
1208        let (_tmp, mgr) = setup();
1209        let (year, month) = mgr.init().expect("init");
1210
1211        let (expected_year, expected_month) = current_year_month();
1212        assert_eq!(year, expected_year);
1213        assert_eq!(month, expected_month);
1214
1215        // Shard file exists with header
1216        let shard_path = mgr.shard_path(year, month);
1217        assert!(shard_path.exists());
1218        let content = fs::read_to_string(&shard_path).expect("read");
1219        assert!(content.starts_with("# bones event log v1"));
1220    }
1221
1222    #[test]
1223    fn init_is_idempotent() {
1224        let (_tmp, mgr) = setup();
1225        let first = mgr.init().expect("first");
1226        let second = mgr.init().expect("second");
1227        assert_eq!(first, second);
1228    }
1229
1230    // -----------------------------------------------------------------------
1231    // Shard listing
1232    // -----------------------------------------------------------------------
1233
1234    #[test]
1235    fn list_shards_empty() {
1236        let (_tmp, mgr) = setup();
1237        mgr.ensure_dirs().expect("dirs");
1238        let shards = mgr.list_shards().expect("list");
1239        assert!(shards.is_empty());
1240    }
1241
1242    #[test]
1243    fn list_shards_returns_sorted() {
1244        let (_tmp, mgr) = setup();
1245        mgr.ensure_dirs().expect("dirs");
1246
1247        // Create shards in reverse order
1248        mgr.create_shard(2026, 3).expect("create");
1249        mgr.create_shard(2026, 1).expect("create");
1250        mgr.create_shard(2026, 2).expect("create");
1251
1252        let shards = mgr.list_shards().expect("list");
1253        assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1254    }
1255
1256    #[test]
1257    fn list_shards_skips_non_shard_files() {
1258        let (_tmp, mgr) = setup();
1259        mgr.ensure_dirs().expect("dirs");
1260        mgr.create_shard(2026, 1).expect("create");
1261
1262        // Create non-shard files
1263        fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1264        fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1265
1266        let shards = mgr.list_shards().expect("list");
1267        assert_eq!(shards, vec![(2026, 1)]);
1268    }
1269
1270    #[test]
1271    fn list_shards_no_events_dir() {
1272        let (_tmp, mgr) = setup();
1273        // Don't create any dirs
1274        let shards = mgr.list_shards().expect("list");
1275        assert!(shards.is_empty());
1276    }
1277
1278    // -----------------------------------------------------------------------
1279    // Shard creation
1280    // -----------------------------------------------------------------------
1281
1282    #[test]
1283    fn create_shard_writes_header() {
1284        let (_tmp, mgr) = setup();
1285        mgr.ensure_dirs().expect("dirs");
1286        let path = mgr.create_shard(2026, 2).expect("create");
1287
1288        let content = fs::read_to_string(&path).expect("read");
1289        assert!(content.starts_with("# bones event log v1"));
1290        assert!(content.contains("# fields:"));
1291        assert_eq!(content.lines().count(), 2);
1292    }
1293
1294    #[test]
1295    fn create_shard_idempotent() {
1296        let (_tmp, mgr) = setup();
1297        mgr.ensure_dirs().expect("dirs");
1298        let p1 = mgr.create_shard(2026, 2).expect("first");
1299        // Write something extra
1300        fs::write(&p1, "modified").expect("write");
1301        // Second create should NOT overwrite
1302        let p2 = mgr.create_shard(2026, 2).expect("second");
1303        assert_eq!(p1, p2);
1304        let content = fs::read_to_string(&p2).expect("read");
1305        assert_eq!(content, "modified");
1306    }
1307
1308    // -----------------------------------------------------------------------
1309    // Monotonic clock
1310    // -----------------------------------------------------------------------
1311
1312    #[test]
1313    fn clock_starts_at_zero() {
1314        let (_tmp, mgr) = setup();
1315        mgr.ensure_dirs().expect("dirs");
1316        let ts = mgr.read_clock().expect("read");
1317        assert_eq!(ts, 0);
1318    }
1319
1320    #[test]
1321    fn clock_is_monotonic() {
1322        let (_tmp, mgr) = setup();
1323        mgr.ensure_dirs().expect("dirs");
1324        let t1 = mgr.next_timestamp().expect("t1");
1325        let t2 = mgr.next_timestamp().expect("t2");
1326        let t3 = mgr.next_timestamp().expect("t3");
1327        assert!(t2 > t1);
1328        assert!(t3 > t2);
1329    }
1330
1331    #[test]
1332    fn clock_reads_back_written_value() {
1333        let (_tmp, mgr) = setup();
1334        mgr.ensure_dirs().expect("dirs");
1335        mgr.write_clock(42_000_000).expect("write");
1336        let ts = mgr.read_clock().expect("read");
1337        assert_eq!(ts, 42_000_000);
1338    }
1339
1340    #[test]
1341    fn clock_never_goes_backward() {
1342        let (_tmp, mgr) = setup();
1343        mgr.ensure_dirs().expect("dirs");
1344
1345        // Set clock far in the future
1346        let future = system_time_us() + 10_000_000;
1347        mgr.write_clock(future).expect("write");
1348
1349        let next = mgr.next_timestamp().expect("next");
1350        assert!(next > future, "clock should advance past future value");
1351    }
1352
1353    // -----------------------------------------------------------------------
1354    // Append
1355    // -----------------------------------------------------------------------
1356
1357    #[test]
1358    fn append_raw_adds_line() {
1359        let (_tmp, mgr) = setup();
1360        mgr.ensure_dirs().expect("dirs");
1361        mgr.create_shard(2026, 2).expect("create");
1362
1363        mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1364        mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1365
1366        let content = mgr.read_shard(2026, 2).expect("read");
1367        assert!(content.contains("event line 1"));
1368        assert!(content.contains("event line 2"));
1369    }
1370
1371    #[test]
1372    fn append_with_lock() {
1373        let (_tmp, mgr) = setup();
1374        mgr.init().expect("init");
1375
1376        let _ts = mgr
1377            .append("test event line\n", false, Duration::from_secs(1))
1378            .expect("append");
1379
1380        let content = mgr.replay().expect("replay");
1381        assert!(content.contains("test event line"));
1382    }
1383
1384    #[test]
1385    fn append_returns_monotonic_timestamps() {
1386        let (_tmp, mgr) = setup();
1387        mgr.init().expect("init");
1388
1389        let t1 = mgr
1390            .append("line1\n", false, Duration::from_secs(1))
1391            .expect("t1");
1392        let t2 = mgr
1393            .append("line2\n", false, Duration::from_secs(1))
1394            .expect("t2");
1395
1396        assert!(t2 > t1);
1397    }
1398
1399    // -----------------------------------------------------------------------
1400    // Torn-write recovery
1401    // -----------------------------------------------------------------------
1402
1403    #[test]
1404    fn recover_clean_file() {
1405        let (_tmp, mgr) = setup();
1406        mgr.init().expect("init");
1407
1408        let (y, m) = current_year_month();
1409        mgr.append_raw(y, m, "complete line\n").expect("append");
1410
1411        let recovered = mgr.recover_torn_writes().expect("recover");
1412        assert_eq!(recovered, None);
1413    }
1414
1415    #[test]
1416    fn recover_torn_write_truncates() {
1417        let (_tmp, mgr) = setup();
1418        let (y, m) = mgr.init().expect("init");
1419        let shard_path = mgr.shard_path(y, m);
1420
1421        // Write a complete line followed by a partial line
1422        {
1423            let mut f = OpenOptions::new()
1424                .append(true)
1425                .open(&shard_path)
1426                .expect("open");
1427            f.write_all(b"complete line\npartial line without newline")
1428                .expect("write");
1429            f.flush().expect("flush");
1430        }
1431
1432        let recovered = mgr.recover_torn_writes().expect("recover");
1433        assert!(recovered.is_some());
1434
1435        let truncated = recovered.expect("checked is_some");
1436        assert_eq!(truncated, "partial line without newline".len() as u64);
1437
1438        // Verify file now ends with newline
1439        let content = fs::read_to_string(&shard_path).expect("read");
1440        assert!(content.ends_with('\n'));
1441        assert!(content.contains("complete line"));
1442        assert!(!content.contains("partial line without newline"));
1443    }
1444
1445    #[test]
1446    fn recover_no_newline_at_all() {
1447        let (_tmp, mgr) = setup();
1448        let (y, m) = mgr.init().expect("init");
1449        let shard_path = mgr.shard_path(y, m);
1450
1451        // Overwrite entire file with no newlines
1452        fs::write(&shard_path, "no newlines here").expect("write");
1453
1454        let recovered = mgr.recover_torn_writes().expect("recover");
1455        assert_eq!(recovered, Some("no newlines here".len() as u64));
1456
1457        // File should be empty
1458        let content = fs::read_to_string(&shard_path).expect("read");
1459        assert!(content.is_empty());
1460    }
1461
1462    #[test]
1463    fn recover_empty_file() {
1464        let (_tmp, mgr) = setup();
1465        let (y, m) = mgr.init().expect("init");
1466        let shard_path = mgr.shard_path(y, m);
1467
1468        // Empty the file
1469        fs::write(&shard_path, "").expect("write");
1470
1471        let recovered = mgr.recover_torn_writes().expect("recover");
1472        assert_eq!(recovered, None);
1473    }
1474
1475    #[test]
1476    fn recover_no_active_shard() {
1477        let (_tmp, mgr) = setup();
1478        mgr.ensure_dirs().expect("dirs");
1479
1480        let recovered = mgr.recover_torn_writes().expect("recover");
1481        assert_eq!(recovered, None);
1482    }
1483
1484    // -----------------------------------------------------------------------
1485    // Replay
1486    // -----------------------------------------------------------------------
1487
1488    #[test]
1489    fn replay_empty_repo() {
1490        let (_tmp, mgr) = setup();
1491        mgr.ensure_dirs().expect("dirs");
1492        let content = mgr.replay().expect("replay");
1493        assert!(content.is_empty());
1494    }
1495
1496    #[test]
1497    fn replay_single_shard() {
1498        let (_tmp, mgr) = setup();
1499        mgr.ensure_dirs().expect("dirs");
1500        mgr.create_shard(2026, 1).expect("create");
1501        mgr.append_raw(2026, 1, "event-a\n").expect("append");
1502
1503        let content = mgr.replay().expect("replay");
1504        assert!(content.contains("event-a"));
1505    }
1506
1507    #[test]
1508    fn replay_multiple_shards_in_order() {
1509        let (_tmp, mgr) = setup();
1510        mgr.ensure_dirs().expect("dirs");
1511
1512        mgr.create_shard(2026, 1).expect("create");
1513        mgr.create_shard(2026, 2).expect("create");
1514        mgr.create_shard(2026, 3).expect("create");
1515
1516        mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1517        mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1518        mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1519
1520        let content = mgr.replay().expect("replay");
1521
1522        // Events should appear in chronological order
1523        let jan_pos = content.find("event-jan").expect("jan");
1524        let feb_pos = content.find("event-feb").expect("feb");
1525        let mar_pos = content.find("event-mar").expect("mar");
1526        assert!(jan_pos < feb_pos);
1527        assert!(feb_pos < mar_pos);
1528    }
1529
1530    // -----------------------------------------------------------------------
1531    // Event count
1532    // -----------------------------------------------------------------------
1533
1534    #[test]
1535    fn event_count_empty() {
1536        let (_tmp, mgr) = setup();
1537        mgr.ensure_dirs().expect("dirs");
1538        assert_eq!(mgr.event_count().expect("count"), 0);
1539    }
1540
1541    #[test]
1542    fn event_count_excludes_comments_and_blanks() {
1543        let (_tmp, mgr) = setup();
1544        mgr.ensure_dirs().expect("dirs");
1545        mgr.create_shard(2026, 1).expect("create");
1546        // Header has 2 comment lines, then we add events
1547        mgr.append_raw(2026, 1, "event1\n").expect("append");
1548        mgr.append_raw(2026, 1, "event2\n").expect("append");
1549        mgr.append_raw(2026, 1, "\n").expect("blank");
1550
1551        assert_eq!(mgr.event_count().expect("count"), 2);
1552    }
1553
1554    // -----------------------------------------------------------------------
1555    // is_empty
1556    // -----------------------------------------------------------------------
1557
1558    #[test]
1559    fn is_empty_no_shards() {
1560        let (_tmp, mgr) = setup();
1561        mgr.ensure_dirs().expect("dirs");
1562        assert!(mgr.is_empty().expect("empty"));
1563    }
1564
1565    #[test]
1566    fn is_empty_with_shards() {
1567        let (_tmp, mgr) = setup();
1568        mgr.init().expect("init");
1569        assert!(!mgr.is_empty().expect("empty"));
1570    }
1571
1572    // -----------------------------------------------------------------------
1573    // Manifest
1574    // -----------------------------------------------------------------------
1575
1576    #[test]
1577    fn write_and_read_manifest() {
1578        let (_tmp, mgr) = setup();
1579        mgr.ensure_dirs().expect("dirs");
1580        mgr.create_shard(2026, 1).expect("create");
1581        mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1582        mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1583
1584        let written = mgr.write_manifest(2026, 1).expect("write manifest");
1585        assert_eq!(written.shard_name, "2026-01.events");
1586        assert_eq!(written.event_count, 2);
1587        assert!(written.byte_len > 0);
1588        assert!(written.file_hash.starts_with("blake3:"));
1589
1590        let read = mgr
1591            .read_manifest(2026, 1)
1592            .expect("read")
1593            .expect("should exist");
1594        assert_eq!(read, written);
1595    }
1596
1597    #[test]
1598    fn manifest_roundtrip() {
1599        let manifest = ShardManifest {
1600            shard_name: "2026-01.events".into(),
1601            event_count: 42,
1602            byte_len: 12345,
1603            file_hash: "blake3:abcdef0123456789".into(),
1604        };
1605
1606        let repr = manifest.to_string_repr();
1607        let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1608        assert_eq!(parsed, manifest);
1609    }
1610
1611    #[test]
1612    fn read_manifest_missing() {
1613        let (_tmp, mgr) = setup();
1614        mgr.ensure_dirs().expect("dirs");
1615        let result = mgr.read_manifest(2026, 1).expect("read");
1616        assert!(result.is_none());
1617    }
1618
1619    #[test]
1620    fn manifest_event_count_excludes_comments() {
1621        let (_tmp, mgr) = setup();
1622        mgr.ensure_dirs().expect("dirs");
1623        mgr.create_shard(2026, 1).expect("create");
1624        // Header has 2 comment lines
1625        mgr.append_raw(2026, 1, "event1\n").expect("append");
1626
1627        let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1628        // Only 1 event line, not the 2 header lines
1629        assert_eq!(manifest.event_count, 1);
1630    }
1631
1632    // -----------------------------------------------------------------------
1633    // Rotation
1634    // -----------------------------------------------------------------------
1635
1636    #[test]
1637    fn rotate_creates_shard_if_none_exist() {
1638        let (_tmp, mgr) = setup();
1639        mgr.ensure_dirs().expect("dirs");
1640
1641        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1642        let (ey, em) = current_year_month();
1643        assert_eq!((y, m), (ey, em));
1644
1645        assert!(mgr.shard_path(y, m).exists());
1646    }
1647
1648    #[test]
1649    fn rotate_no_op_same_month() {
1650        let (_tmp, mgr) = setup();
1651        let (y, m) = mgr.init().expect("init");
1652
1653        let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1654        assert_eq!((y, m), (y2, m2));
1655    }
1656
1657    #[test]
1658    fn rotate_different_month_seals_and_creates() {
1659        let (_tmp, mgr) = setup();
1660        mgr.ensure_dirs().expect("dirs");
1661
1662        // Create an old shard
1663        mgr.create_shard(2025, 11).expect("create");
1664        mgr.append_raw(2025, 11, "old event\n").expect("append");
1665
1666        // Rotate should seal old and create new
1667        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1668        let (ey, em) = current_year_month();
1669        assert_eq!((y, m), (ey, em));
1670
1671        // Old shard should have a manifest
1672        assert!(mgr.manifest_path(2025, 11).exists());
1673
1674        // New shard should exist
1675        assert!(mgr.shard_path(ey, em).exists());
1676    }
1677
1678    // -----------------------------------------------------------------------
1679    // Frozen shards
1680    // -----------------------------------------------------------------------
1681
1682    #[test]
1683    fn frozen_shard_not_modified_by_append() {
1684        let (_tmp, mgr) = setup();
1685        mgr.ensure_dirs().expect("dirs");
1686
1687        // Create and populate old shard
1688        mgr.create_shard(2025, 6).expect("create");
1689        mgr.append_raw(2025, 6, "old event\n").expect("append");
1690        let old_content = mgr.read_shard(2025, 6).expect("read");
1691
1692        // Init creates current month shard
1693        mgr.init().expect("init");
1694
1695        // Append only goes to active shard
1696        mgr.append("new event\n", false, Duration::from_secs(1))
1697            .expect("append");
1698
1699        // Old shard is unchanged
1700        let after_content = mgr.read_shard(2025, 6).expect("read");
1701        assert_eq!(old_content, after_content);
1702    }
1703
1704    // -----------------------------------------------------------------------
1705    // system_time_us
1706    // -----------------------------------------------------------------------
1707
1708    #[test]
1709    fn system_time_us_is_positive() {
1710        let ts = system_time_us();
1711        assert!(ts > 0, "system time should be positive: {ts}");
1712    }
1713
1714    #[test]
1715    fn system_time_us_is_reasonable() {
1716        let ts = system_time_us();
1717        // Should be after 2020-01-01 in microseconds
1718        let jan_2020_us: i64 = 1_577_836_800_000_000;
1719        assert!(ts > jan_2020_us, "system time too small: {ts}");
1720    }
1721
1722    // -----------------------------------------------------------------------
1723    // total_content_len
1724    // -----------------------------------------------------------------------
1725
1726    #[test]
1727    fn total_content_len_empty_repo() {
1728        let (_tmp, mgr) = setup();
1729        mgr.ensure_dirs().expect("dirs");
1730        let len = mgr.total_content_len().expect("len");
1731        assert_eq!(len, 0);
1732    }
1733
1734    #[test]
1735    fn total_content_len_single_shard() {
1736        let (_tmp, mgr) = setup();
1737        mgr.ensure_dirs().expect("dirs");
1738        mgr.create_shard(2026, 1).expect("create");
1739        mgr.append_raw(2026, 1, "line1\n").expect("append");
1740        mgr.append_raw(2026, 1, "line2\n").expect("append");
1741
1742        let full = mgr.replay().expect("replay");
1743        let len = mgr.total_content_len().expect("len");
1744        assert_eq!(len, full.len());
1745    }
1746
1747    #[test]
1748    fn total_content_len_multiple_shards() {
1749        let (_tmp, mgr) = setup();
1750        mgr.ensure_dirs().expect("dirs");
1751        mgr.create_shard(2026, 1).expect("shard 1");
1752        mgr.create_shard(2026, 2).expect("shard 2");
1753        mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1754        mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1755
1756        let full = mgr.replay().expect("replay");
1757        let len = mgr.total_content_len().expect("len");
1758        assert_eq!(len, full.len(), "total_content_len must match replay len");
1759    }
1760
1761    // -----------------------------------------------------------------------
1762    // read_content_range
1763    // -----------------------------------------------------------------------
1764
1765    #[test]
1766    fn read_content_range_empty_range() {
1767        let (_tmp, mgr) = setup();
1768        mgr.ensure_dirs().expect("dirs");
1769        mgr.create_shard(2026, 1).expect("create");
1770        mgr.append_raw(2026, 1, "event\n").expect("append");
1771
1772        let result = mgr.read_content_range(5, 5).expect("range");
1773        assert!(result.is_empty());
1774    }
1775
1776    #[test]
1777    fn read_content_range_within_single_shard() {
1778        let (_tmp, mgr) = setup();
1779        mgr.ensure_dirs().expect("dirs");
1780        mgr.create_shard(2026, 1).expect("create");
1781        // shard header is 2 lines; add a known event line
1782        mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1783
1784        let full = mgr.replay().expect("replay");
1785        // Find the position of "ABCDEF"
1786        let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1787        let range = mgr.read_content_range(pos, pos + 7).expect("range");
1788        assert_eq!(range, "ABCDEF\n");
1789    }
1790
1791    #[test]
1792    fn read_content_range_across_shard_boundary() {
1793        let (_tmp, mgr) = setup();
1794        mgr.ensure_dirs().expect("dirs");
1795        mgr.create_shard(2026, 1).expect("shard 1");
1796        mgr.create_shard(2026, 2).expect("shard 2");
1797        mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1798        mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1799
1800        let full = mgr.replay().expect("replay");
1801        // Read entire concatenation as a range
1802        let range = mgr.read_content_range(0, full.len()).expect("full range");
1803        assert_eq!(range, full);
1804    }
1805
1806    #[test]
1807    fn read_content_range_beyond_end() {
1808        let (_tmp, mgr) = setup();
1809        mgr.ensure_dirs().expect("dirs");
1810        mgr.create_shard(2026, 1).expect("create");
1811        mgr.append_raw(2026, 1, "event\n").expect("append");
1812
1813        let full = mgr.replay().expect("replay");
1814        // Requesting a range beyond the end should return empty
1815        let range = mgr
1816            .read_content_range(full.len(), full.len() + 100)
1817            .expect("beyond end");
1818        assert!(range.is_empty());
1819    }
1820
1821    // -----------------------------------------------------------------------
1822    // replay_from_offset
1823    // -----------------------------------------------------------------------
1824
1825    #[test]
1826    fn replay_from_offset_zero_returns_full_content() {
1827        let (_tmp, mgr) = setup();
1828        mgr.ensure_dirs().expect("dirs");
1829        mgr.create_shard(2026, 1).expect("create");
1830        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1831        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1832
1833        let full = mgr.replay().expect("full replay");
1834        let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1835        assert_eq!(from_zero, full);
1836        assert_eq!(total_len, full.len());
1837    }
1838
1839    #[test]
1840    fn replay_from_offset_skips_content_before_cursor() {
1841        let (_tmp, mgr) = setup();
1842        mgr.ensure_dirs().expect("dirs");
1843        mgr.create_shard(2026, 1).expect("create");
1844        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1845        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1846        mgr.append_raw(2026, 1, "event3\n").expect("e3");
1847
1848        let full = mgr.replay().expect("full replay");
1849
1850        // Find offset just after event2
1851        let cursor = full.find("event3").expect("event3 in content");
1852        let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1853        assert_eq!(tail, "event3\n");
1854        assert_eq!(total_len, full.len());
1855    }
1856
1857    #[test]
1858    fn replay_from_offset_at_end_returns_empty() {
1859        let (_tmp, mgr) = setup();
1860        mgr.ensure_dirs().expect("dirs");
1861        mgr.create_shard(2026, 1).expect("create");
1862        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1863
1864        let full = mgr.replay().expect("full replay");
1865        let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1866        assert!(tail.is_empty(), "tail should be empty at end of content");
1867        assert_eq!(total_len, full.len());
1868    }
1869
1870    #[test]
1871    fn replay_from_offset_skips_sealed_shards_before_cursor() {
1872        let (_tmp, mgr) = setup();
1873        mgr.ensure_dirs().expect("dirs");
1874
1875        // Two shards: a sealed shard (jan) and an active shard (feb)
1876        mgr.create_shard(2026, 1).expect("jan");
1877        mgr.create_shard(2026, 2).expect("feb");
1878        mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1879        mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1880        mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1881        mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1882
1883        let full = mgr.replay().expect("full replay");
1884        let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1885
1886        // Cursor is at the end of the jan shard — feb events are new
1887        let (tail, total_len) = mgr
1888            .replay_from_offset(jan_shard_len)
1889            .expect("from feb start");
1890        assert!(
1891            !tail.contains("jan-event"),
1892            "jan events should not appear in tail"
1893        );
1894        assert!(tail.contains("feb-event1"), "feb events must be in tail");
1895        assert!(tail.contains("feb-event2"), "feb events must be in tail");
1896        assert_eq!(total_len, full.len());
1897    }
1898
1899    #[test]
1900    fn replay_from_offset_total_len_equals_total_content_len() {
1901        let (_tmp, mgr) = setup();
1902        mgr.ensure_dirs().expect("dirs");
1903        mgr.create_shard(2026, 1).expect("shard 1");
1904        mgr.create_shard(2026, 2).expect("shard 2");
1905        mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1906        mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1907
1908        let total = mgr.total_content_len().expect("total_content_len");
1909        let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1910        assert_eq!(
1911            total, replay_total,
1912            "total_content_len and replay_from_offset total must agree"
1913        );
1914    }
1915
1916    /// A shard whose content is just a `YYYY-MM.events` filename (the
1917    /// non-Unix symlink fallback) must be silently skipped during replay,
1918    /// not treated as a parse error.
1919    #[test]
1920    fn replay_lines_skips_forwarding_pointer_shard() {
1921        let dir = tempfile::tempdir().expect("tmpdir");
1922        let mgr = ShardManager::new(dir.path());
1923        mgr.ensure_dirs().expect("dirs");
1924
1925        // Jan shard has real events.
1926        mgr.create_shard(2026, 1).expect("create jan");
1927        mgr.append_raw(2026, 1, "# bones event log v1\n")
1928            .expect("header");
1929        mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
1930
1931        // Feb shard is a forwarding pointer (non-Unix symlink fallback).
1932        let feb_path = dir.path().join("events").join("2026-02.events");
1933        fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
1934
1935        // Mar shard has real events too.
1936        mgr.create_shard(2026, 3).expect("create mar");
1937        mgr.append_raw(2026, 3, "# bones event log v1\n")
1938            .expect("mar header");
1939        mgr.append_raw(2026, 3, "another-event-line\n")
1940            .expect("mar event");
1941
1942        let lines: Vec<String> = mgr
1943            .replay_lines()
1944            .expect("replay_lines")
1945            .map(|r| r.expect("line").1)
1946            .collect();
1947
1948        // Should see Jan and Mar lines; Feb forwarding pointer is silently skipped.
1949        assert!(
1950            lines.iter().any(|l| l.contains("real-event-line")),
1951            "jan event missing"
1952        );
1953        assert!(
1954            lines.iter().any(|l| l.contains("another-event-line")),
1955            "mar event missing"
1956        );
1957        assert!(
1958            !lines.iter().any(|l| l.contains("2026-03.events")),
1959            "forwarding pointer content must not appear in replay"
1960        );
1961    }
1962}