Skip to main content

bones_core/
shard.rs

1//! Time-sharded event file management.
2//!
3//! Events are stored in monthly shard files under `.bones/events/YYYY-MM.events`.
4//! This module manages the directory layout, shard rotation, atomic append
5//! operations, and replay (reading all shards in chronological order).
6//!
7//! # Directory Layout
8//!
9//! ```text
10//! .bones/
11//!   events/
12//!     2026-01.events      # sealed shard
13//!     2026-01.manifest    # manifest for sealed shard
14//!     2026-02.events      # active shard
15//!   cache/
16//!     clock               # monotonic wall-clock file (microseconds)
17//!   lock                  # repo-wide advisory lock
18//! ```
19//!
20//! # Invariants
21//!
22//! - Sealed (frozen) shards are never modified after rotation.
23//! - The active shard is the only one that receives appends.
24//! - The active shard is derived from `list_shards().last()` (sorted filenames).
25//! - Each append uses `O_APPEND` + `write_all` + `flush` for crash consistency.
26//! - Torn-write recovery truncates incomplete trailing lines on startup.
27//! - Monotonic timestamps: `wall_ts_us = max(system_time_us, last + 1)`.
28
29use std::fs::{self, OpenOptions};
30use std::io::{self, Write as IoWrite};
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use chrono::Datelike;
35
36use crate::event::writer::shard_header;
37use crate::lock::ShardLock;
38
39// ---------------------------------------------------------------------------
40// Error types
41// ---------------------------------------------------------------------------
42
43/// Errors that can occur during shard operations.
44#[derive(Debug, thiserror::Error)]
45pub enum ShardError {
46    /// I/O error during shard operations.
47    #[error("shard I/O error: {0}")]
48    Io(#[from] io::Error),
49
50    /// Lock acquisition failed.
51    #[error("lock error: {0}")]
52    Lock(#[from] crate::lock::LockError),
53
54    /// The `.bones` directory does not exist and could not be created.
55    #[error("failed to initialize .bones directory: {0}")]
56    InitFailed(io::Error),
57
58    /// Shard file name does not match expected `YYYY-MM.events` pattern.
59    #[error("invalid shard filename: {0}")]
60    InvalidShardName(String),
61
62    /// Shard file has a corrupted or missing header.
63    #[error("corrupted shard {path}: {reason}")]
64    CorruptedShard {
65        /// Path to the corrupted shard file.
66        path: PathBuf,
67        /// Description of what is wrong.
68        reason: String,
69    },
70}
71
72/// A shard integrity problem found during validation.
73#[derive(Debug, Clone)]
74pub struct ShardIntegrityIssue {
75    /// Name of the shard file (e.g. `"2026-01.events"`).
76    pub shard_name: String,
77    /// Human-readable description of the problem.
78    pub problem: String,
79}
80
81// ---------------------------------------------------------------------------
82// Manifest
83// ---------------------------------------------------------------------------
84
85/// Manifest for a sealed shard file, recording integrity metadata.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardManifest {
88    /// Shard file name (e.g., `"2026-01.events"`).
89    pub shard_name: String,
90    /// Number of event lines (excluding comments and blanks).
91    pub event_count: u64,
92    /// Total byte length of the shard file.
93    pub byte_len: u64,
94    /// BLAKE3 hash of the entire shard file contents.
95    pub file_hash: String,
96}
97
98impl ShardManifest {
99    /// Serialize manifest to a human-readable format.
100    #[must_use]
101    pub fn to_string_repr(&self) -> String {
102        format!(
103            "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
104            self.shard_name, self.event_count, self.byte_len, self.file_hash
105        )
106    }
107
108    /// Parse a manifest from its string representation.
109    ///
110    /// Returns `None` if required fields are missing or unparseable.
111    #[must_use]
112    pub fn from_string_repr(s: &str) -> Option<Self> {
113        let mut shard_name = None;
114        let mut event_count = None;
115        let mut byte_len = None;
116        let mut file_hash = None;
117
118        for line in s.lines() {
119            if let Some(val) = line.strip_prefix("shard: ") {
120                shard_name = Some(val.to_string());
121            } else if let Some(val) = line.strip_prefix("event_count: ") {
122                event_count = val.parse().ok();
123            } else if let Some(val) = line.strip_prefix("byte_len: ") {
124                byte_len = val.parse().ok();
125            } else if let Some(val) = line.strip_prefix("file_hash: ") {
126                file_hash = Some(val.to_string());
127            }
128        }
129
130        Some(Self {
131            shard_name: shard_name?,
132            event_count: event_count?,
133            byte_len: byte_len?,
134            file_hash: file_hash?,
135        })
136    }
137}
138
139// ---------------------------------------------------------------------------
140// ShardManager
141// ---------------------------------------------------------------------------
142
143/// Manages time-sharded event files in a `.bones` repository.
144///
145/// The shard manager handles:
146/// - Directory initialization
147/// - Shard rotation on month boundaries
148/// - Atomic append with advisory locking
149/// - Monotonic clock maintenance
150/// - Torn-write recovery
151/// - Replay (reading all shards chronologically)
152/// - Sealed shard manifest generation
153pub struct ShardManager {
154    /// Root of the `.bones` directory.
155    bones_dir: PathBuf,
156}
157
158impl ShardManager {
159    /// Create a new `ShardManager` for the given `.bones` directory.
160    ///
161    /// Does not create directories on construction; call [`init`](Self::init)
162    /// or [`ensure_dirs`](Self::ensure_dirs) first if needed.
163    #[must_use]
164    pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
165        Self {
166            bones_dir: bones_dir.into(),
167        }
168    }
169
170    /// Path to the events directory.
171    #[must_use]
172    pub fn events_dir(&self) -> PathBuf {
173        self.bones_dir.join("events")
174    }
175
176    /// Path to the advisory lock file.
177    #[must_use]
178    pub fn lock_path(&self) -> PathBuf {
179        self.bones_dir.join("lock")
180    }
181
182    /// Path to the monotonic clock file.
183    #[must_use]
184    pub fn clock_path(&self) -> PathBuf {
185        self.bones_dir.join("cache").join("clock")
186    }
187
188    /// Generate the shard filename for a given year and month.
189    #[must_use]
190    pub fn shard_filename(year: i32, month: u32) -> String {
191        format!("{year:04}-{month:02}.events")
192    }
193
194    /// Path to a specific shard file.
195    #[must_use]
196    pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
197        self.events_dir().join(Self::shard_filename(year, month))
198    }
199
200    /// Path to a manifest file for a given shard.
201    #[must_use]
202    pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
203        self.events_dir()
204            .join(format!("{year:04}-{month:02}.manifest"))
205    }
206
207    // -----------------------------------------------------------------------
208    // Initialization
209    // -----------------------------------------------------------------------
210
211    /// Create the `.bones/events/` and `.bones/cache/` directories if they
212    /// don't exist. Idempotent.
213    ///
214    /// # Errors
215    ///
216    /// Returns [`ShardError::InitFailed`] if directory creation fails.
217    pub fn ensure_dirs(&self) -> Result<(), ShardError> {
218        fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
219        fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
220        Ok(())
221    }
222
223    /// Initialize the shard directory and create the first shard file
224    /// with the standard header if no shards exist.
225    ///
226    /// Returns the (year, month) of the active shard.
227    ///
228    /// # Errors
229    ///
230    /// Returns [`ShardError`] on I/O failure or if directories cannot be
231    /// created.
232    pub fn init(&self) -> Result<(i32, u32), ShardError> {
233        self.ensure_dirs()?;
234
235        let shards = self.list_shards()?;
236        if shards.is_empty() {
237            let (year, month) = current_year_month();
238            self.create_shard(year, month)?;
239            Ok((year, month))
240        } else if let Some(&(year, month)) = shards.last() {
241            Ok((year, month))
242        } else {
243            unreachable!("shards is non-empty")
244        }
245    }
246
247    // -----------------------------------------------------------------------
248    // Shard listing
249    // -----------------------------------------------------------------------
250
251    /// List all shard files in chronological order as (year, month) pairs.
252    ///
253    /// Shard filenames must match `YYYY-MM.events`. Invalid filenames are
254    /// silently skipped.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`ShardError::Io`] if the directory cannot be read.
259    pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
260        let events_dir = self.events_dir();
261        if !events_dir.exists() {
262            return Ok(Vec::new());
263        }
264
265        let mut shards = Vec::new();
266        for entry in fs::read_dir(&events_dir)? {
267            let entry = entry?;
268            let name = entry.file_name();
269            let name_str = name.to_string_lossy();
270            if let Some(ym) = parse_shard_filename(&name_str) {
271                shards.push(ym);
272            }
273        }
274        shards.sort_unstable();
275        Ok(shards)
276    }
277
278    /// Get the active (most recent) shard, if any.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`ShardError::Io`] if the directory cannot be read.
283    pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
284        let shards = self.list_shards()?;
285        Ok(shards.last().copied())
286    }
287
288    // -----------------------------------------------------------------------
289    // Shard creation and rotation
290    // -----------------------------------------------------------------------
291
292    /// Create a new shard file with the standard header.
293    ///
294    /// Returns the path of the created file. Does nothing if the file
295    /// already exists.
296    ///
297    /// # Errors
298    ///
299    /// Returns [`ShardError::Io`] if the file cannot be written.
300    pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
301        let path = self.shard_path(year, month);
302        if path.exists() {
303            return Ok(path);
304        }
305
306        let header = shard_header();
307        fs::write(&path, header)?;
308        Ok(path)
309    }
310
311    /// Check if the current month differs from the active shard's month.
312    /// If so, seal the old shard (generate manifest) and create a new one.
313    ///
314    /// Returns the (year, month) of the now-active shard.
315    ///
316    /// # Errors
317    ///
318    /// Returns [`ShardError`] on I/O failure during rotation.
319    pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
320        let (current_year, current_month) = current_year_month();
321        let active = self.active_shard()?;
322
323        match active {
324            Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
325            Some((y, m)) => {
326                // Seal the old shard with a manifest
327                self.write_manifest(y, m)?;
328                // Create new shard
329                self.create_shard(current_year, current_month)?;
330                Ok((current_year, current_month))
331            }
332            None => {
333                // No shards exist yet, create first one
334                self.create_shard(current_year, current_month)?;
335                Ok((current_year, current_month))
336            }
337        }
338    }
339
340    // -----------------------------------------------------------------------
341    // Manifest generation
342    // -----------------------------------------------------------------------
343
344    /// Generate and write a manifest file for a sealed shard.
345    ///
346    /// # Errors
347    ///
348    /// Returns [`ShardError::Io`] if the shard file cannot be read or the
349    /// manifest cannot be written.
350    pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
351        let shard_path = self.shard_path(year, month);
352        let content = fs::read(&shard_path)?;
353        let content_str = String::from_utf8_lossy(&content);
354
355        // Count event lines (non-comment, non-blank)
356        let event_count = content_str
357            .lines()
358            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
359            .count() as u64;
360
361        let byte_len = content.len() as u64;
362        let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
363
364        let manifest = ShardManifest {
365            shard_name: Self::shard_filename(year, month),
366            event_count,
367            byte_len,
368            file_hash,
369        };
370
371        let manifest_path = self.manifest_path(year, month);
372        fs::write(&manifest_path, manifest.to_string_repr())?;
373
374        Ok(manifest)
375    }
376
377    /// Read a manifest file if it exists.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`ShardError::Io`] if the manifest file exists but cannot
382    /// be read.
383    pub fn read_manifest(
384        &self,
385        year: i32,
386        month: u32,
387    ) -> Result<Option<ShardManifest>, ShardError> {
388        let manifest_path = self.manifest_path(year, month);
389        if !manifest_path.exists() {
390            return Ok(None);
391        }
392        let content = fs::read_to_string(&manifest_path)?;
393        Ok(ShardManifest::from_string_repr(&content))
394    }
395
396    // -----------------------------------------------------------------------
397    // Sealed shard validation
398    // -----------------------------------------------------------------------
399
400    /// Validate byte-length of sealed shards against their manifests.
401    ///
402    /// For each shard except the last (active), checks whether a `.manifest`
403    /// file exists and whether `byte_len` matches the actual file size on
404    /// disk. This is a lightweight startup check — full BLAKE3 hash
405    /// verification is deferred to `bn doctor` / `bn verify`.
406    ///
407    /// Returns a list of issues found. An empty list means all sealed shards
408    /// with manifests match.
409    ///
410    /// # Errors
411    ///
412    /// Returns [`ShardError::Io`] if shard files or manifests cannot be read.
413    pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
414        let shards = self.list_shards()?;
415        let mut issues = Vec::new();
416
417        // All shards except the last are sealed.
418        let sealed = if shards.len() > 1 {
419            &shards[..shards.len() - 1]
420        } else {
421            return Ok(issues);
422        };
423
424        for &(year, month) in sealed {
425            let shard_path = self.shard_path(year, month);
426            let shard_name = Self::shard_filename(year, month);
427
428            let Some(manifest) = self.read_manifest(year, month)? else {
429                tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
430                issues.push(ShardIntegrityIssue {
431                    shard_name: shard_name.clone(),
432                    problem: "sealed shard has no manifest file".into(),
433                });
434                continue;
435            };
436
437            let file_len = fs::metadata(&shard_path)?.len();
438            if file_len != manifest.byte_len {
439                tracing::error!(
440                    shard = %shard_name,
441                    expected = manifest.byte_len,
442                    actual = file_len,
443                    "sealed shard byte length mismatch"
444                );
445                issues.push(ShardIntegrityIssue {
446                    shard_name,
447                    problem: format!(
448                        "byte length mismatch: manifest says {} bytes, file is {} bytes",
449                        manifest.byte_len, file_len
450                    ),
451                });
452            }
453        }
454
455        Ok(issues)
456    }
457
458    // -----------------------------------------------------------------------
459    // Append
460    // -----------------------------------------------------------------------
461
462    /// Append an event line to the active shard.
463    ///
464    /// This method:
465    /// 1. Acquires the repo-wide advisory lock.
466    /// 2. Rotates shards if the month has changed.
467    /// 3. Reads and updates the monotonic clock.
468    /// 4. Appends the line using `O_APPEND` + `write_all` + `flush`.
469    /// 5. Optionally calls `sync_data` if `durable` is true.
470    /// 6. Releases the lock.
471    ///
472    /// The `line` must be a complete TSJSON line ending with `\n`.
473    ///
474    /// Returns the monotonic timestamp used.
475    ///
476    /// # Errors
477    ///
478    /// Returns [`ShardError::Lock`] if the lock cannot be acquired within
479    /// `lock_timeout`, or [`ShardError::Io`] on write failure.
480    pub fn append(
481        &self,
482        line: &str,
483        durable: bool,
484        lock_timeout: Duration,
485    ) -> Result<i64, ShardError> {
486        self.ensure_dirs()?;
487
488        let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
489
490        // Rotate if month changed
491        let (year, month) = self.rotate_if_needed()?;
492        let shard_path = self.shard_path(year, month);
493
494        // Validate shard header before appending
495        if shard_path.exists() {
496            validate_shard_header(&shard_path)?;
497        }
498
499        // Update monotonic clock
500        let ts = self.next_timestamp()?;
501
502        // Append with O_APPEND
503        let mut file = OpenOptions::new()
504            .create(true)
505            .append(true)
506            .open(&shard_path)?;
507
508        file.write_all(line.as_bytes())?;
509        file.flush()?;
510
511        if durable {
512            file.sync_data()?;
513        }
514
515        Ok(ts)
516    }
517
518    /// Append a raw line without locking or clock update.
519    ///
520    /// Used internally and in tests. The caller is responsible for
521    /// holding the lock and managing the clock.
522    ///
523    /// # Errors
524    ///
525    /// Returns [`ShardError::Io`] on write failure.
526    pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
527        let shard_path = self.shard_path(year, month);
528
529        let mut file = OpenOptions::new()
530            .create(true)
531            .append(true)
532            .open(&shard_path)?;
533
534        file.write_all(line.as_bytes())?;
535        file.flush()?;
536        Ok(())
537    }
538
539    // -----------------------------------------------------------------------
540    // Monotonic clock
541    // -----------------------------------------------------------------------
542
543    /// Read the current monotonic clock value.
544    ///
545    /// Returns 0 if the clock file doesn't exist.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`ShardError::Io`] if the clock file exists but cannot be
550    /// read.
551    pub fn read_clock(&self) -> Result<i64, ShardError> {
552        let path = self.clock_path();
553        if !path.exists() {
554            return Ok(0);
555        }
556        let content = fs::read_to_string(&path)?;
557        Ok(content.trim().parse::<i64>().unwrap_or(0))
558    }
559
560    /// Compute the next monotonic timestamp and write it to the clock file.
561    ///
562    /// `next = max(system_time_us, last + 1)`
563    ///
564    /// The caller must hold the repo lock.
565    ///
566    /// # Errors
567    ///
568    /// Returns [`ShardError::Io`] if the clock file cannot be read or
569    /// written.
570    pub fn next_timestamp(&self) -> Result<i64, ShardError> {
571        let last = self.read_clock()?;
572        let now = system_time_us();
573        let next = std::cmp::max(now, last + 1);
574        self.write_clock(next)?;
575        Ok(next)
576    }
577
578    /// Write a clock value to the clock file.
579    fn write_clock(&self, value: i64) -> Result<(), ShardError> {
580        let path = self.clock_path();
581        if let Some(parent) = path.parent() {
582            fs::create_dir_all(parent)?;
583        }
584        fs::write(&path, value.to_string())?;
585        Ok(())
586    }
587
588    // -----------------------------------------------------------------------
589    // Torn-write recovery
590    // -----------------------------------------------------------------------
591
592    /// Scan the active shard for torn writes and truncate incomplete
593    /// trailing lines.
594    ///
595    /// A torn write leaves a partial line (no terminating `\n`) at the end
596    /// of the file. This method finds the last complete newline and
597    /// truncates everything after it.
598    ///
599    /// Returns `Ok(Some(bytes_truncated))` if a torn write was repaired,
600    /// or `Ok(None)` if the file was clean.
601    ///
602    /// # Errors
603    ///
604    /// Returns [`ShardError::Io`] if the shard file cannot be read or
605    /// truncated.
606    pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
607        let Some(active) = self.active_shard()? else {
608            return Ok(None);
609        };
610
611        let shard_path = self.shard_path(active.0, active.1);
612        recover_shard_torn_write(&shard_path)
613    }
614
615    // -----------------------------------------------------------------------
616    // Replay
617    // -----------------------------------------------------------------------
618
619    /// Read all event lines from all shards in chronological order.
620    ///
621    /// Shards are read in lexicographic order (`YYYY-MM` sorts correctly).
622    /// Returns the concatenated content of all shard files.
623    ///
624    /// # Errors
625    ///
626    /// Returns [`ShardError::Io`] if any shard file cannot be read.
627    pub fn replay(&self) -> Result<String, ShardError> {
628        let shards = self.list_shards()?;
629        let mut content = String::new();
630
631        for (year, month) in shards {
632            let path = self.shard_path(year, month);
633            let shard_content = fs::read_to_string(&path)?;
634            content.push_str(&shard_content);
635        }
636
637        Ok(content)
638    }
639
640    /// Read event lines from a specific shard.
641    ///
642    /// # Errors
643    ///
644    /// Returns [`ShardError::Io`] if the shard file cannot be read.
645    pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
646        let path = self.shard_path(year, month);
647        Ok(fs::read_to_string(&path)?)
648    }
649
650    /// Compute the total concatenated byte size of all shards without reading
651    /// their full contents.
652    ///
653    /// This is used for advancing the projection cursor without paying the
654    /// cost of a full replay.
655    ///
656    /// # Errors
657    ///
658    /// Returns [`ShardError::Io`] if any shard file metadata cannot be read.
659    pub fn total_content_len(&self) -> Result<usize, ShardError> {
660        let shards = self.list_shards()?;
661        let mut total = 0usize;
662        for (year, month) in shards {
663            let path = self.shard_path(year, month);
664            let meta = fs::metadata(&path)?;
665            total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
666        }
667        Ok(total)
668    }
669
670    /// Read shard content starting from the given absolute byte offset in the
671    /// concatenated shard sequence.
672    ///
673    /// Sealed shards that end entirely before `offset` are skipped without
674    /// reading their contents — only their file sizes are stat(2)'d.
675    /// Only content from `offset` onward is returned, bounding memory use to
676    /// new/unseen events rather than the full log.
677    ///
678    /// Returns `(new_content, total_len)` where:
679    /// - `new_content` is the bytes from `offset` to the end of all shards.
680    /// - `total_len` is the total byte size of all shards concatenated (usable
681    ///   as the new cursor offset after processing `new_content`).
682    ///
683    /// # Errors
684    ///
685    /// Returns [`ShardError::Io`] if shard metadata or file reads fail.
686    pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
687        let shards = self.list_shards()?;
688        let mut cumulative: usize = 0;
689        let mut result = String::new();
690        let mut found_start = false;
691
692        for (year, month) in shards {
693            let path = self.shard_path(year, month);
694            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
695
696            let shard_end = cumulative.saturating_add(shard_len);
697
698            if shard_end <= offset {
699                // This shard ends at or before the cursor — skip entirely.
700                cumulative = shard_end;
701                continue;
702            }
703
704            // This shard overlaps with or is entirely after the cursor.
705            let shard_content = fs::read_to_string(&path)?;
706
707            if found_start {
708                result.push_str(&shard_content);
709            } else {
710                // Calculate the within-shard byte offset.
711                let within = offset.saturating_sub(cumulative);
712                // Guard: within must not exceed shard content length.
713                let within = within.min(shard_content.len());
714                // Snap to a valid UTF-8 char boundary (scan forward).
715                let within = snap_to_char_boundary(&shard_content, within);
716                result.push_str(&shard_content[within..]);
717                found_start = true;
718            }
719
720            cumulative = shard_end;
721        }
722
723        Ok((result, cumulative))
724    }
725
726    /// Read bytes from the concatenated shard sequence in `[start_offset, end_offset)`.
727    ///
728    /// Only shards that overlap with the requested range are read.
729    /// Shards entirely outside the range are stat(2)'d but not read.
730    ///
731    /// This is used to read a small window around the projection cursor for
732    /// hash validation without loading the full shard content.
733    ///
734    /// # Errors
735    ///
736    /// Returns [`ShardError::Io`] if any shard file cannot be read.
737    pub fn read_content_range(
738        &self,
739        start_offset: usize,
740        end_offset: usize,
741    ) -> Result<String, ShardError> {
742        if start_offset >= end_offset {
743            return Ok(String::new());
744        }
745
746        let shards = self.list_shards()?;
747        let mut cumulative: usize = 0;
748        let mut result = String::new();
749
750        for (year, month) in shards {
751            let path = self.shard_path(year, month);
752            let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
753            let shard_end = cumulative.saturating_add(shard_len);
754
755            if shard_end <= start_offset {
756                // Shard ends before our range — skip without reading.
757                cumulative = shard_end;
758                continue;
759            }
760
761            if cumulative >= end_offset {
762                // Shard starts after our range — done.
763                break;
764            }
765
766            let shard_content = fs::read_to_string(&path)?;
767
768            // Clip to the slice of this shard that overlaps with [start, end).
769            let within_start = if cumulative < start_offset {
770                (start_offset - cumulative).min(shard_content.len())
771            } else {
772                0
773            };
774            let within_end = if shard_end > end_offset {
775                (end_offset - cumulative).min(shard_content.len())
776            } else {
777                shard_content.len()
778            };
779
780            // Snap both offsets to valid UTF-8 char boundaries.
781            let within_start = snap_to_char_boundary(&shard_content, within_start);
782            let within_end = snap_to_char_boundary(&shard_content, within_end);
783            if within_start < within_end {
784                result.push_str(&shard_content[within_start..within_end]);
785            }
786            cumulative = shard_end;
787        }
788
789        Ok(result)
790    }
791
792    /// Count event lines across all shards (excluding comments and blanks).
793    ///
794    /// # Errors
795    ///
796    /// Returns [`ShardError::Io`] if any shard file cannot be read.
797    pub fn event_count(&self) -> Result<u64, ShardError> {
798        let content = self.replay()?;
799        let count = content
800            .lines()
801            .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
802            .count();
803        Ok(count as u64)
804    }
805
806    /// Iterate over all event lines across all shards.
807    ///
808    /// Yields `(absolute_offset, line_content)` pairs.
809    ///
810    /// # Errors
811    ///
812    /// Returns [`ShardError::Io`] if directory or shard reading fails.
813    pub fn replay_lines(
814        &self,
815    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
816        self.replay_lines_from_offset(0)
817    }
818
819    /// Iterate over event lines starting from a given absolute byte offset.
820    ///
821    /// Yields `(absolute_offset, line_content)` pairs.
822    ///
823    /// # Errors
824    ///
825    /// Returns [`ShardError::Io`] if directory or shard reading fails.
826    pub fn replay_lines_from_offset(
827        &self,
828        offset: usize,
829    ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
830        let shards = self.list_shards()?;
831        let bones_dir = self.bones_dir.clone();
832
833        Ok(ShardLineIterator {
834            shards,
835            current_shard_idx: 0,
836            current_reader: None,
837            cumulative_offset: 0,
838            bones_dir,
839        }
840        .skip_to_offset(offset))
841    }
842
843    /// Check if the repository has any event shards.
844    ///
845    /// # Errors
846    ///
847    /// Returns [`ShardError::Io`] if the events directory cannot be read.
848    pub fn is_empty(&self) -> Result<bool, ShardError> {
849        let shards = self.list_shards()?;
850        Ok(shards.is_empty())
851    }
852}
853
854// ---------------------------------------------------------------------------
855// ShardLineIterator
856// ---------------------------------------------------------------------------
857
858struct ShardLineIterator {
859    shards: Vec<(i32, u32)>,
860    current_shard_idx: usize,
861    current_reader: Option<io::BufReader<fs::File>>,
862    cumulative_offset: usize,
863    bones_dir: PathBuf,
864}
865
866impl ShardLineIterator {
867    fn skip_to_offset(mut self, offset: usize) -> Self {
868        // Fast-forward shards using metadata
869        while self.current_shard_idx < self.shards.len() {
870            let (year, month) = self.shards[self.current_shard_idx];
871            let shard_path = self
872                .bones_dir
873                .join("events")
874                .join(ShardManager::shard_filename(year, month));
875            if let Ok(meta) = fs::metadata(shard_path) {
876                let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
877                if self.cumulative_offset + shard_len <= offset {
878                    self.cumulative_offset += shard_len;
879                    self.current_shard_idx += 1;
880                    continue;
881                }
882            }
883            break;
884        }
885
886        // Now we are at the shard that contains the offset.
887        // The first call to next() will open it and we'll need to skip
888        // the 'within-shard' offset.
889        // We can handle this by storing the skip amount.
890        self.cumulative_offset = offset;
891        self
892    }
893}
894
895impl Iterator for ShardLineIterator {
896    type Item = io::Result<(usize, String)>;
897
898    fn next(&mut self) -> Option<Self::Item> {
899        use std::io::{BufRead, Seek, SeekFrom};
900
901        loop {
902            if self.current_reader.is_none() {
903                if self.current_shard_idx >= self.shards.len() {
904                    return None;
905                }
906
907                let (year, month) = self.shards[self.current_shard_idx];
908                let shard_path = self
909                    .bones_dir
910                    .join("events")
911                    .join(ShardManager::shard_filename(year, month));
912
913                // Legacy compat: skip forwarding-pointer shards created by
914                // an older non-Unix symlink fallback.  These are tiny files
915                // whose content is just a `YYYY-MM.events` filename.
916                if is_forwarding_pointer(&shard_path) {
917                    tracing::warn!(
918                        shard = %shard_path.display(),
919                        "skipping legacy forwarding-pointer shard during replay"
920                    );
921                    if let Ok(meta) = fs::metadata(&shard_path) {
922                        self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
923                    }
924                    self.current_shard_idx += 1;
925                    continue;
926                }
927
928                // Validate shard header before reading.
929                if let Err(e) = validate_shard_header(&shard_path) {
930                    tracing::error!(
931                        shard = %shard_path.display(),
932                        error = %e,
933                        "shard header validation failed"
934                    );
935                    return Some(Err(io::Error::new(
936                        io::ErrorKind::InvalidData,
937                        e.to_string(),
938                    )));
939                }
940
941                let mut file = match fs::File::open(shard_path) {
942                    Ok(f) => f,
943                    Err(e) => return Some(Err(e)),
944                };
945
946                // Calculate cumulative offset before this shard
947                let mut cumulative_before = 0;
948                for i in 0..self.current_shard_idx {
949                    let (y, m) = self.shards[i];
950                    let p = self
951                        .bones_dir
952                        .join("events")
953                        .join(ShardManager::shard_filename(y, m));
954                    if let Ok(meta) = fs::metadata(p) {
955                        cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
956                    }
957                }
958
959                if self.cumulative_offset > cumulative_before {
960                    let within = self.cumulative_offset - cumulative_before;
961                    if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
962                        return Some(Err(e));
963                    }
964                }
965
966                self.current_reader = Some(io::BufReader::new(file));
967            }
968
969            let reader = self
970                .current_reader
971                .as_mut()
972                .expect("reader was just set above");
973            let mut line = String::new();
974            let offset = self.cumulative_offset;
975
976            match reader.read_line(&mut line) {
977                Ok(0) => {
978                    // EOF for this shard
979                    self.current_reader = None;
980                    self.current_shard_idx += 1;
981                }
982                Ok(n) => {
983                    self.cumulative_offset += n;
984                    return Some(Ok((offset, line)));
985                }
986                Err(e) => return Some(Err(e)),
987            }
988        }
989    }
990}
991
992// ---------------------------------------------------------------------------
993// Helpers
994// ---------------------------------------------------------------------------
995
996/// Snap a byte offset to a valid UTF-8 char boundary within `s`.
997///
998/// If `offset` is already a char boundary (or equals `s.len()`), returns it
999/// unchanged.  Otherwise scans forward to the next boundary.  Returns
1000/// `s.len()` if no valid boundary exists after `offset`.
1001const fn snap_to_char_boundary(s: &str, offset: usize) -> usize {
1002    if offset >= s.len() {
1003        return s.len();
1004    }
1005    // str::is_char_boundary is O(1) — just checks the byte's top bits.
1006    let mut pos = offset;
1007    while pos < s.len() && !s.is_char_boundary(pos) {
1008        pos += 1;
1009    }
1010    pos
1011}
1012
1013/// Get the current year and month from system time.
1014#[must_use]
1015fn current_year_month() -> (i32, u32) {
1016    let now = chrono::Utc::now();
1017    (now.year(), now.month())
1018}
1019
1020/// Get the current system time in microseconds since Unix epoch.
1021#[allow(clippy::cast_possible_truncation)]
1022#[must_use]
1023fn system_time_us() -> i64 {
1024    SystemTime::now()
1025        .duration_since(UNIX_EPOCH)
1026        .map(|d| d.as_micros() as i64)
1027        .unwrap_or(0)
1028}
1029
1030/// Check if `path` is a legacy forwarding-pointer shard.
1031///
1032/// Forwarding pointers are tiny files (<=30 bytes) whose content is just a
1033/// `YYYY-MM.events` filename.  They were created by an older non-Unix symlink
1034/// fallback that has since been removed.  Real shards start with a multi-line
1035/// header far exceeding 30 bytes.
1036fn is_forwarding_pointer(path: &Path) -> bool {
1037    let Ok(meta) = fs::metadata(path) else {
1038        return false;
1039    };
1040    if meta.len() > 30 {
1041        return false;
1042    }
1043    let Ok(content) = fs::read_to_string(path) else {
1044        return false;
1045    };
1046    parse_shard_filename(content.trim()).is_some()
1047}
1048
1049/// Validate that a shard file starts with the expected header line.
1050///
1051/// Reads only the first line and checks it matches [`SHARD_HEADER`].
1052/// Empty files or files starting with the correct header pass.
1053/// Forwarding-pointer shards are silently accepted (legacy compat).
1054///
1055/// # Errors
1056///
1057/// Returns [`ShardError::CorruptedShard`] if the first line doesn't match,
1058/// or [`ShardError::Io`] if the file can't be read.
1059pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
1060    use crate::event::writer::SHARD_HEADER;
1061    use std::io::{BufRead, BufReader};
1062
1063    // Skip forwarding-pointer shards (legacy compat).
1064    if is_forwarding_pointer(path) {
1065        return Ok(());
1066    }
1067
1068    let file = fs::File::open(path)?;
1069    let mut reader = BufReader::new(file);
1070    let mut first_line = String::new();
1071    let n = reader.read_line(&mut first_line)?;
1072
1073    // Empty file is ok (will get header on first write).
1074    if n == 0 {
1075        return Ok(());
1076    }
1077
1078    let trimmed = first_line.trim_end();
1079    if trimmed != SHARD_HEADER {
1080        return Err(ShardError::CorruptedShard {
1081            path: path.to_path_buf(),
1082            reason: format!(
1083                "expected header '{}', found '{}'",
1084                SHARD_HEADER,
1085                &trimmed[..trimmed.len().min(80)]
1086            ),
1087        });
1088    }
1089
1090    Ok(())
1091}
1092
1093/// Parse a shard filename like `"2026-02.events"` into (year, month).
1094fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
1095    let stem = name.strip_suffix(".events")?;
1096    // Must not be "current"
1097    if stem == "current" {
1098        return None;
1099    }
1100    let (year_str, month_str) = stem.split_once('-')?;
1101    let year: i32 = year_str.parse().ok()?;
1102    let month: u32 = month_str.parse().ok()?;
1103    if !(1..=12).contains(&month) {
1104        return None;
1105    }
1106    Some((year, month))
1107}
1108
1109/// Recover torn writes for a specific shard file.
1110///
1111/// Returns `Ok(Some(bytes_truncated))` if repair was needed,
1112/// or `Ok(None)` if the file was clean.
1113fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
1114    let metadata = fs::metadata(path)?;
1115    let file_len = metadata.len();
1116    if file_len == 0 {
1117        return Ok(None);
1118    }
1119
1120    let content = fs::read(path)?;
1121
1122    // Find the last newline
1123    let last_newline = content.iter().rposition(|&b| b == b'\n');
1124
1125    if let Some(pos) = last_newline {
1126        let expected_len = (pos + 1) as u64;
1127        if expected_len < file_len {
1128            // There are bytes after the last newline — torn write
1129            let truncated = file_len - expected_len;
1130            let file = OpenOptions::new().write(true).open(path)?;
1131            file.set_len(expected_len)?;
1132            Ok(Some(truncated))
1133        } else {
1134            // File ends with newline — clean
1135            Ok(None)
1136        }
1137    } else {
1138        // No newline at all — entire content is a torn write
1139        // (or a corrupt file). Truncate to zero.
1140        let file = OpenOptions::new().write(true).open(path)?;
1141        file.set_len(0)?;
1142        Ok(Some(file_len))
1143    }
1144}
1145
1146// ---------------------------------------------------------------------------
1147// Tests
1148// ---------------------------------------------------------------------------
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::*;
1153    use tempfile::TempDir;
1154
1155    #[test]
1156    fn snap_to_char_boundary_ascii() {
1157        let s = "hello world";
1158        assert_eq!(snap_to_char_boundary(s, 0), 0);
1159        assert_eq!(snap_to_char_boundary(s, 5), 5);
1160        assert_eq!(snap_to_char_boundary(s, 11), 11); // len
1161        assert_eq!(snap_to_char_boundary(s, 100), 11); // beyond len
1162    }
1163
1164    #[test]
1165    fn snap_to_char_boundary_emoji() {
1166        // ✅ is 3 bytes (E2 9C 85), 🎉 is 4 bytes (F0 9F 8E 89)
1167        let s = "ab✅cd🎉ef";
1168        // a=0, b=1, ✅=2..5, c=5, d=6, 🎉=7..11, e=11, f=12
1169        assert_eq!(snap_to_char_boundary(s, 2), 2); // start of ✅
1170        assert_eq!(snap_to_char_boundary(s, 3), 5); // inside ✅ → snaps to 'c'
1171        assert_eq!(snap_to_char_boundary(s, 4), 5); // inside ✅ → snaps to 'c'
1172        assert_eq!(snap_to_char_boundary(s, 5), 5); // 'c'
1173        assert_eq!(snap_to_char_boundary(s, 8), 11); // inside 🎉 → snaps to 'e'
1174        assert_eq!(snap_to_char_boundary(s, 9), 11);
1175        assert_eq!(snap_to_char_boundary(s, 10), 11);
1176    }
1177
1178    fn setup() -> (TempDir, ShardManager) {
1179        let tmp = TempDir::new().expect("tempdir");
1180        let bones_dir = tmp.path().join(".bones");
1181        let mgr = ShardManager::new(&bones_dir);
1182        (tmp, mgr)
1183    }
1184
1185    // -----------------------------------------------------------------------
1186    // parse_shard_filename
1187    // -----------------------------------------------------------------------
1188
1189    #[test]
1190    fn parse_valid_shard_filenames() {
1191        assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1192        assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1193        assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1194    }
1195
1196    #[test]
1197    fn parse_invalid_shard_filenames() {
1198        assert_eq!(parse_shard_filename("current.events"), None);
1199        assert_eq!(parse_shard_filename("2026-13.events"), None); // month > 12
1200        assert_eq!(parse_shard_filename("2026-00.events"), None); // month 0
1201        assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1202        assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1203        assert_eq!(parse_shard_filename(""), None);
1204    }
1205
1206    // -----------------------------------------------------------------------
1207    // ShardManager::new, paths
1208    // -----------------------------------------------------------------------
1209
1210    #[test]
1211    fn shard_manager_paths() {
1212        let mgr = ShardManager::new("/repo/.bones");
1213        assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1214        assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1215        assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1216        assert_eq!(
1217            mgr.shard_path(2026, 2),
1218            PathBuf::from("/repo/.bones/events/2026-02.events")
1219        );
1220        assert_eq!(
1221            mgr.manifest_path(2026, 1),
1222            PathBuf::from("/repo/.bones/events/2026-01.manifest")
1223        );
1224    }
1225
1226    #[test]
1227    fn shard_filename_format() {
1228        assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1229        assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1230        assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1231    }
1232
1233    // -----------------------------------------------------------------------
1234    // ensure_dirs / init
1235    // -----------------------------------------------------------------------
1236
1237    #[test]
1238    fn ensure_dirs_creates_directories() {
1239        let (_tmp, mgr) = setup();
1240        mgr.ensure_dirs().expect("should create dirs");
1241        assert!(mgr.events_dir().exists());
1242        assert!(mgr.bones_dir.join("cache").exists());
1243    }
1244
1245    #[test]
1246    fn ensure_dirs_is_idempotent() {
1247        let (_tmp, mgr) = setup();
1248        mgr.ensure_dirs().expect("first");
1249        mgr.ensure_dirs().expect("second");
1250        assert!(mgr.events_dir().exists());
1251    }
1252
1253    #[test]
1254    fn init_creates_first_shard() {
1255        let (_tmp, mgr) = setup();
1256        let (year, month) = mgr.init().expect("init");
1257
1258        let (expected_year, expected_month) = current_year_month();
1259        assert_eq!(year, expected_year);
1260        assert_eq!(month, expected_month);
1261
1262        // Shard file exists with header
1263        let shard_path = mgr.shard_path(year, month);
1264        assert!(shard_path.exists());
1265        let content = fs::read_to_string(&shard_path).expect("read");
1266        assert!(content.starts_with("# bones event log v1"));
1267    }
1268
1269    #[test]
1270    fn init_is_idempotent() {
1271        let (_tmp, mgr) = setup();
1272        let first = mgr.init().expect("first");
1273        let second = mgr.init().expect("second");
1274        assert_eq!(first, second);
1275    }
1276
1277    // -----------------------------------------------------------------------
1278    // Shard listing
1279    // -----------------------------------------------------------------------
1280
1281    #[test]
1282    fn list_shards_empty() {
1283        let (_tmp, mgr) = setup();
1284        mgr.ensure_dirs().expect("dirs");
1285        let shards = mgr.list_shards().expect("list");
1286        assert!(shards.is_empty());
1287    }
1288
1289    #[test]
1290    fn list_shards_returns_sorted() {
1291        let (_tmp, mgr) = setup();
1292        mgr.ensure_dirs().expect("dirs");
1293
1294        // Create shards in reverse order
1295        mgr.create_shard(2026, 3).expect("create");
1296        mgr.create_shard(2026, 1).expect("create");
1297        mgr.create_shard(2026, 2).expect("create");
1298
1299        let shards = mgr.list_shards().expect("list");
1300        assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1301    }
1302
1303    #[test]
1304    fn list_shards_skips_non_shard_files() {
1305        let (_tmp, mgr) = setup();
1306        mgr.ensure_dirs().expect("dirs");
1307        mgr.create_shard(2026, 1).expect("create");
1308
1309        // Create non-shard files
1310        fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1311        fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1312
1313        let shards = mgr.list_shards().expect("list");
1314        assert_eq!(shards, vec![(2026, 1)]);
1315    }
1316
1317    #[test]
1318    fn list_shards_no_events_dir() {
1319        let (_tmp, mgr) = setup();
1320        // Don't create any dirs
1321        let shards = mgr.list_shards().expect("list");
1322        assert!(shards.is_empty());
1323    }
1324
1325    // -----------------------------------------------------------------------
1326    // Shard creation
1327    // -----------------------------------------------------------------------
1328
1329    #[test]
1330    fn create_shard_writes_header() {
1331        let (_tmp, mgr) = setup();
1332        mgr.ensure_dirs().expect("dirs");
1333        let path = mgr.create_shard(2026, 2).expect("create");
1334
1335        let content = fs::read_to_string(&path).expect("read");
1336        assert!(content.starts_with("# bones event log v1"));
1337        assert!(content.contains("# fields:"));
1338        assert_eq!(content.lines().count(), 2);
1339    }
1340
1341    #[test]
1342    fn create_shard_idempotent() {
1343        let (_tmp, mgr) = setup();
1344        mgr.ensure_dirs().expect("dirs");
1345        let p1 = mgr.create_shard(2026, 2).expect("first");
1346        // Write something extra
1347        fs::write(&p1, "modified").expect("write");
1348        // Second create should NOT overwrite
1349        let p2 = mgr.create_shard(2026, 2).expect("second");
1350        assert_eq!(p1, p2);
1351        let content = fs::read_to_string(&p2).expect("read");
1352        assert_eq!(content, "modified");
1353    }
1354
1355    // -----------------------------------------------------------------------
1356    // Monotonic clock
1357    // -----------------------------------------------------------------------
1358
1359    #[test]
1360    fn clock_starts_at_zero() {
1361        let (_tmp, mgr) = setup();
1362        mgr.ensure_dirs().expect("dirs");
1363        let ts = mgr.read_clock().expect("read");
1364        assert_eq!(ts, 0);
1365    }
1366
1367    #[test]
1368    fn clock_is_monotonic() {
1369        let (_tmp, mgr) = setup();
1370        mgr.ensure_dirs().expect("dirs");
1371        let t1 = mgr.next_timestamp().expect("t1");
1372        let t2 = mgr.next_timestamp().expect("t2");
1373        let t3 = mgr.next_timestamp().expect("t3");
1374        assert!(t2 > t1);
1375        assert!(t3 > t2);
1376    }
1377
1378    #[test]
1379    fn clock_reads_back_written_value() {
1380        let (_tmp, mgr) = setup();
1381        mgr.ensure_dirs().expect("dirs");
1382        mgr.write_clock(42_000_000).expect("write");
1383        let ts = mgr.read_clock().expect("read");
1384        assert_eq!(ts, 42_000_000);
1385    }
1386
1387    #[test]
1388    fn clock_never_goes_backward() {
1389        let (_tmp, mgr) = setup();
1390        mgr.ensure_dirs().expect("dirs");
1391
1392        // Set clock far in the future
1393        let future = system_time_us() + 10_000_000;
1394        mgr.write_clock(future).expect("write");
1395
1396        let next = mgr.next_timestamp().expect("next");
1397        assert!(next > future, "clock should advance past future value");
1398    }
1399
1400    // -----------------------------------------------------------------------
1401    // Append
1402    // -----------------------------------------------------------------------
1403
1404    #[test]
1405    fn append_raw_adds_line() {
1406        let (_tmp, mgr) = setup();
1407        mgr.ensure_dirs().expect("dirs");
1408        mgr.create_shard(2026, 2).expect("create");
1409
1410        mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1411        mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1412
1413        let content = mgr.read_shard(2026, 2).expect("read");
1414        assert!(content.contains("event line 1"));
1415        assert!(content.contains("event line 2"));
1416    }
1417
1418    #[test]
1419    fn append_with_lock() {
1420        let (_tmp, mgr) = setup();
1421        mgr.init().expect("init");
1422
1423        let _ts = mgr
1424            .append("test event line\n", false, Duration::from_secs(1))
1425            .expect("append");
1426
1427        let content = mgr.replay().expect("replay");
1428        assert!(content.contains("test event line"));
1429    }
1430
1431    #[test]
1432    fn append_returns_monotonic_timestamps() {
1433        let (_tmp, mgr) = setup();
1434        mgr.init().expect("init");
1435
1436        let t1 = mgr
1437            .append("line1\n", false, Duration::from_secs(1))
1438            .expect("t1");
1439        let t2 = mgr
1440            .append("line2\n", false, Duration::from_secs(1))
1441            .expect("t2");
1442
1443        assert!(t2 > t1);
1444    }
1445
1446    // -----------------------------------------------------------------------
1447    // Torn-write recovery
1448    // -----------------------------------------------------------------------
1449
1450    #[test]
1451    fn recover_clean_file() {
1452        let (_tmp, mgr) = setup();
1453        mgr.init().expect("init");
1454
1455        let (y, m) = current_year_month();
1456        mgr.append_raw(y, m, "complete line\n").expect("append");
1457
1458        let recovered = mgr.recover_torn_writes().expect("recover");
1459        assert_eq!(recovered, None);
1460    }
1461
1462    #[test]
1463    fn recover_torn_write_truncates() {
1464        let (_tmp, mgr) = setup();
1465        let (y, m) = mgr.init().expect("init");
1466        let shard_path = mgr.shard_path(y, m);
1467
1468        // Write a complete line followed by a partial line
1469        {
1470            let mut f = OpenOptions::new()
1471                .append(true)
1472                .open(&shard_path)
1473                .expect("open");
1474            f.write_all(b"complete line\npartial line without newline")
1475                .expect("write");
1476            f.flush().expect("flush");
1477        }
1478
1479        let recovered = mgr.recover_torn_writes().expect("recover");
1480        assert!(recovered.is_some());
1481
1482        let truncated = recovered.expect("checked is_some");
1483        assert_eq!(truncated, "partial line without newline".len() as u64);
1484
1485        // Verify file now ends with newline
1486        let content = fs::read_to_string(&shard_path).expect("read");
1487        assert!(content.ends_with('\n'));
1488        assert!(content.contains("complete line"));
1489        assert!(!content.contains("partial line without newline"));
1490    }
1491
1492    #[test]
1493    fn recover_no_newline_at_all() {
1494        let (_tmp, mgr) = setup();
1495        let (y, m) = mgr.init().expect("init");
1496        let shard_path = mgr.shard_path(y, m);
1497
1498        // Overwrite entire file with no newlines
1499        fs::write(&shard_path, "no newlines here").expect("write");
1500
1501        let recovered = mgr.recover_torn_writes().expect("recover");
1502        assert_eq!(recovered, Some("no newlines here".len() as u64));
1503
1504        // File should be empty
1505        let content = fs::read_to_string(&shard_path).expect("read");
1506        assert!(content.is_empty());
1507    }
1508
1509    #[test]
1510    fn recover_empty_file() {
1511        let (_tmp, mgr) = setup();
1512        let (y, m) = mgr.init().expect("init");
1513        let shard_path = mgr.shard_path(y, m);
1514
1515        // Empty the file
1516        fs::write(&shard_path, "").expect("write");
1517
1518        let recovered = mgr.recover_torn_writes().expect("recover");
1519        assert_eq!(recovered, None);
1520    }
1521
1522    #[test]
1523    fn recover_no_active_shard() {
1524        let (_tmp, mgr) = setup();
1525        mgr.ensure_dirs().expect("dirs");
1526
1527        let recovered = mgr.recover_torn_writes().expect("recover");
1528        assert_eq!(recovered, None);
1529    }
1530
1531    // -----------------------------------------------------------------------
1532    // Replay
1533    // -----------------------------------------------------------------------
1534
1535    #[test]
1536    fn replay_empty_repo() {
1537        let (_tmp, mgr) = setup();
1538        mgr.ensure_dirs().expect("dirs");
1539        let content = mgr.replay().expect("replay");
1540        assert!(content.is_empty());
1541    }
1542
1543    #[test]
1544    fn replay_single_shard() {
1545        let (_tmp, mgr) = setup();
1546        mgr.ensure_dirs().expect("dirs");
1547        mgr.create_shard(2026, 1).expect("create");
1548        mgr.append_raw(2026, 1, "event-a\n").expect("append");
1549
1550        let content = mgr.replay().expect("replay");
1551        assert!(content.contains("event-a"));
1552    }
1553
1554    #[test]
1555    fn replay_multiple_shards_in_order() {
1556        let (_tmp, mgr) = setup();
1557        mgr.ensure_dirs().expect("dirs");
1558
1559        mgr.create_shard(2026, 1).expect("create");
1560        mgr.create_shard(2026, 2).expect("create");
1561        mgr.create_shard(2026, 3).expect("create");
1562
1563        mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1564        mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1565        mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1566
1567        let content = mgr.replay().expect("replay");
1568
1569        // Events should appear in chronological order
1570        let jan_pos = content.find("event-jan").expect("jan");
1571        let feb_pos = content.find("event-feb").expect("feb");
1572        let mar_pos = content.find("event-mar").expect("mar");
1573        assert!(jan_pos < feb_pos);
1574        assert!(feb_pos < mar_pos);
1575    }
1576
1577    // -----------------------------------------------------------------------
1578    // Event count
1579    // -----------------------------------------------------------------------
1580
1581    #[test]
1582    fn event_count_empty() {
1583        let (_tmp, mgr) = setup();
1584        mgr.ensure_dirs().expect("dirs");
1585        assert_eq!(mgr.event_count().expect("count"), 0);
1586    }
1587
1588    #[test]
1589    fn event_count_excludes_comments_and_blanks() {
1590        let (_tmp, mgr) = setup();
1591        mgr.ensure_dirs().expect("dirs");
1592        mgr.create_shard(2026, 1).expect("create");
1593        // Header has 2 comment lines, then we add events
1594        mgr.append_raw(2026, 1, "event1\n").expect("append");
1595        mgr.append_raw(2026, 1, "event2\n").expect("append");
1596        mgr.append_raw(2026, 1, "\n").expect("blank");
1597
1598        assert_eq!(mgr.event_count().expect("count"), 2);
1599    }
1600
1601    // -----------------------------------------------------------------------
1602    // is_empty
1603    // -----------------------------------------------------------------------
1604
1605    #[test]
1606    fn is_empty_no_shards() {
1607        let (_tmp, mgr) = setup();
1608        mgr.ensure_dirs().expect("dirs");
1609        assert!(mgr.is_empty().expect("empty"));
1610    }
1611
1612    #[test]
1613    fn is_empty_with_shards() {
1614        let (_tmp, mgr) = setup();
1615        mgr.init().expect("init");
1616        assert!(!mgr.is_empty().expect("empty"));
1617    }
1618
1619    // -----------------------------------------------------------------------
1620    // Manifest
1621    // -----------------------------------------------------------------------
1622
1623    #[test]
1624    fn write_and_read_manifest() {
1625        let (_tmp, mgr) = setup();
1626        mgr.ensure_dirs().expect("dirs");
1627        mgr.create_shard(2026, 1).expect("create");
1628        mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1629        mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1630
1631        let written = mgr.write_manifest(2026, 1).expect("write manifest");
1632        assert_eq!(written.shard_name, "2026-01.events");
1633        assert_eq!(written.event_count, 2);
1634        assert!(written.byte_len > 0);
1635        assert!(written.file_hash.starts_with("blake3:"));
1636
1637        let read = mgr
1638            .read_manifest(2026, 1)
1639            .expect("read")
1640            .expect("should exist");
1641        assert_eq!(read, written);
1642    }
1643
1644    #[test]
1645    fn manifest_roundtrip() {
1646        let manifest = ShardManifest {
1647            shard_name: "2026-01.events".into(),
1648            event_count: 42,
1649            byte_len: 12345,
1650            file_hash: "blake3:abcdef0123456789".into(),
1651        };
1652
1653        let repr = manifest.to_string_repr();
1654        let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1655        assert_eq!(parsed, manifest);
1656    }
1657
1658    #[test]
1659    fn read_manifest_missing() {
1660        let (_tmp, mgr) = setup();
1661        mgr.ensure_dirs().expect("dirs");
1662        let result = mgr.read_manifest(2026, 1).expect("read");
1663        assert!(result.is_none());
1664    }
1665
1666    #[test]
1667    fn manifest_event_count_excludes_comments() {
1668        let (_tmp, mgr) = setup();
1669        mgr.ensure_dirs().expect("dirs");
1670        mgr.create_shard(2026, 1).expect("create");
1671        // Header has 2 comment lines
1672        mgr.append_raw(2026, 1, "event1\n").expect("append");
1673
1674        let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1675        // Only 1 event line, not the 2 header lines
1676        assert_eq!(manifest.event_count, 1);
1677    }
1678
1679    // -----------------------------------------------------------------------
1680    // Rotation
1681    // -----------------------------------------------------------------------
1682
1683    #[test]
1684    fn rotate_creates_shard_if_none_exist() {
1685        let (_tmp, mgr) = setup();
1686        mgr.ensure_dirs().expect("dirs");
1687
1688        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1689        let (ey, em) = current_year_month();
1690        assert_eq!((y, m), (ey, em));
1691
1692        assert!(mgr.shard_path(y, m).exists());
1693    }
1694
1695    #[test]
1696    fn rotate_no_op_same_month() {
1697        let (_tmp, mgr) = setup();
1698        let (y, m) = mgr.init().expect("init");
1699
1700        let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1701        assert_eq!((y, m), (y2, m2));
1702    }
1703
1704    #[test]
1705    fn rotate_different_month_seals_and_creates() {
1706        let (_tmp, mgr) = setup();
1707        mgr.ensure_dirs().expect("dirs");
1708
1709        // Create an old shard
1710        mgr.create_shard(2025, 11).expect("create");
1711        mgr.append_raw(2025, 11, "old event\n").expect("append");
1712
1713        // Rotate should seal old and create new
1714        let (y, m) = mgr.rotate_if_needed().expect("rotate");
1715        let (ey, em) = current_year_month();
1716        assert_eq!((y, m), (ey, em));
1717
1718        // Old shard should have a manifest
1719        assert!(mgr.manifest_path(2025, 11).exists());
1720
1721        // New shard should exist
1722        assert!(mgr.shard_path(ey, em).exists());
1723    }
1724
1725    // -----------------------------------------------------------------------
1726    // Frozen shards
1727    // -----------------------------------------------------------------------
1728
1729    #[test]
1730    fn frozen_shard_not_modified_by_append() {
1731        let (_tmp, mgr) = setup();
1732        mgr.ensure_dirs().expect("dirs");
1733
1734        // Create and populate old shard
1735        mgr.create_shard(2025, 6).expect("create");
1736        mgr.append_raw(2025, 6, "old event\n").expect("append");
1737        let old_content = mgr.read_shard(2025, 6).expect("read");
1738
1739        // Init creates current month shard
1740        mgr.init().expect("init");
1741
1742        // Append only goes to active shard
1743        mgr.append("new event\n", false, Duration::from_secs(1))
1744            .expect("append");
1745
1746        // Old shard is unchanged
1747        let after_content = mgr.read_shard(2025, 6).expect("read");
1748        assert_eq!(old_content, after_content);
1749    }
1750
1751    // -----------------------------------------------------------------------
1752    // system_time_us
1753    // -----------------------------------------------------------------------
1754
1755    #[test]
1756    fn system_time_us_is_positive() {
1757        let ts = system_time_us();
1758        assert!(ts > 0, "system time should be positive: {ts}");
1759    }
1760
1761    #[test]
1762    fn system_time_us_is_reasonable() {
1763        let ts = system_time_us();
1764        // Should be after 2020-01-01 in microseconds
1765        let jan_2020_us: i64 = 1_577_836_800_000_000;
1766        assert!(ts > jan_2020_us, "system time too small: {ts}");
1767    }
1768
1769    // -----------------------------------------------------------------------
1770    // total_content_len
1771    // -----------------------------------------------------------------------
1772
1773    #[test]
1774    fn total_content_len_empty_repo() {
1775        let (_tmp, mgr) = setup();
1776        mgr.ensure_dirs().expect("dirs");
1777        let len = mgr.total_content_len().expect("len");
1778        assert_eq!(len, 0);
1779    }
1780
1781    #[test]
1782    fn total_content_len_single_shard() {
1783        let (_tmp, mgr) = setup();
1784        mgr.ensure_dirs().expect("dirs");
1785        mgr.create_shard(2026, 1).expect("create");
1786        mgr.append_raw(2026, 1, "line1\n").expect("append");
1787        mgr.append_raw(2026, 1, "line2\n").expect("append");
1788
1789        let full = mgr.replay().expect("replay");
1790        let len = mgr.total_content_len().expect("len");
1791        assert_eq!(len, full.len());
1792    }
1793
1794    #[test]
1795    fn total_content_len_multiple_shards() {
1796        let (_tmp, mgr) = setup();
1797        mgr.ensure_dirs().expect("dirs");
1798        mgr.create_shard(2026, 1).expect("shard 1");
1799        mgr.create_shard(2026, 2).expect("shard 2");
1800        mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1801        mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1802
1803        let full = mgr.replay().expect("replay");
1804        let len = mgr.total_content_len().expect("len");
1805        assert_eq!(len, full.len(), "total_content_len must match replay len");
1806    }
1807
1808    // -----------------------------------------------------------------------
1809    // read_content_range
1810    // -----------------------------------------------------------------------
1811
1812    #[test]
1813    fn read_content_range_empty_range() {
1814        let (_tmp, mgr) = setup();
1815        mgr.ensure_dirs().expect("dirs");
1816        mgr.create_shard(2026, 1).expect("create");
1817        mgr.append_raw(2026, 1, "event\n").expect("append");
1818
1819        let result = mgr.read_content_range(5, 5).expect("range");
1820        assert!(result.is_empty());
1821    }
1822
1823    #[test]
1824    fn read_content_range_within_single_shard() {
1825        let (_tmp, mgr) = setup();
1826        mgr.ensure_dirs().expect("dirs");
1827        mgr.create_shard(2026, 1).expect("create");
1828        // shard header is 2 lines; add a known event line
1829        mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1830
1831        let full = mgr.replay().expect("replay");
1832        // Find the position of "ABCDEF"
1833        let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1834        let range = mgr.read_content_range(pos, pos + 7).expect("range");
1835        assert_eq!(range, "ABCDEF\n");
1836    }
1837
1838    #[test]
1839    fn read_content_range_across_shard_boundary() {
1840        let (_tmp, mgr) = setup();
1841        mgr.ensure_dirs().expect("dirs");
1842        mgr.create_shard(2026, 1).expect("shard 1");
1843        mgr.create_shard(2026, 2).expect("shard 2");
1844        mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1845        mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1846
1847        let full = mgr.replay().expect("replay");
1848        // Read entire concatenation as a range
1849        let range = mgr.read_content_range(0, full.len()).expect("full range");
1850        assert_eq!(range, full);
1851    }
1852
1853    #[test]
1854    fn read_content_range_beyond_end() {
1855        let (_tmp, mgr) = setup();
1856        mgr.ensure_dirs().expect("dirs");
1857        mgr.create_shard(2026, 1).expect("create");
1858        mgr.append_raw(2026, 1, "event\n").expect("append");
1859
1860        let full = mgr.replay().expect("replay");
1861        // Requesting a range beyond the end should return empty
1862        let range = mgr
1863            .read_content_range(full.len(), full.len() + 100)
1864            .expect("beyond end");
1865        assert!(range.is_empty());
1866    }
1867
1868    // -----------------------------------------------------------------------
1869    // replay_from_offset
1870    // -----------------------------------------------------------------------
1871
1872    #[test]
1873    fn replay_from_offset_zero_returns_full_content() {
1874        let (_tmp, mgr) = setup();
1875        mgr.ensure_dirs().expect("dirs");
1876        mgr.create_shard(2026, 1).expect("create");
1877        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1878        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1879
1880        let full = mgr.replay().expect("full replay");
1881        let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1882        assert_eq!(from_zero, full);
1883        assert_eq!(total_len, full.len());
1884    }
1885
1886    #[test]
1887    fn replay_from_offset_skips_content_before_cursor() {
1888        let (_tmp, mgr) = setup();
1889        mgr.ensure_dirs().expect("dirs");
1890        mgr.create_shard(2026, 1).expect("create");
1891        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1892        mgr.append_raw(2026, 1, "event2\n").expect("e2");
1893        mgr.append_raw(2026, 1, "event3\n").expect("e3");
1894
1895        let full = mgr.replay().expect("full replay");
1896
1897        // Find offset just after event2
1898        let cursor = full.find("event3").expect("event3 in content");
1899        let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1900        assert_eq!(tail, "event3\n");
1901        assert_eq!(total_len, full.len());
1902    }
1903
1904    #[test]
1905    fn replay_from_offset_at_end_returns_empty() {
1906        let (_tmp, mgr) = setup();
1907        mgr.ensure_dirs().expect("dirs");
1908        mgr.create_shard(2026, 1).expect("create");
1909        mgr.append_raw(2026, 1, "event1\n").expect("e1");
1910
1911        let full = mgr.replay().expect("full replay");
1912        let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1913        assert!(tail.is_empty(), "tail should be empty at end of content");
1914        assert_eq!(total_len, full.len());
1915    }
1916
1917    #[test]
1918    fn replay_from_offset_skips_sealed_shards_before_cursor() {
1919        let (_tmp, mgr) = setup();
1920        mgr.ensure_dirs().expect("dirs");
1921
1922        // Two shards: a sealed shard (jan) and an active shard (feb)
1923        mgr.create_shard(2026, 1).expect("jan");
1924        mgr.create_shard(2026, 2).expect("feb");
1925        mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1926        mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1927        mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1928        mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1929
1930        let full = mgr.replay().expect("full replay");
1931        let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1932
1933        // Cursor is at the end of the jan shard — feb events are new
1934        let (tail, total_len) = mgr
1935            .replay_from_offset(jan_shard_len)
1936            .expect("from feb start");
1937        assert!(
1938            !tail.contains("jan-event"),
1939            "jan events should not appear in tail"
1940        );
1941        assert!(tail.contains("feb-event1"), "feb events must be in tail");
1942        assert!(tail.contains("feb-event2"), "feb events must be in tail");
1943        assert_eq!(total_len, full.len());
1944    }
1945
1946    #[test]
1947    fn replay_from_offset_total_len_equals_total_content_len() {
1948        let (_tmp, mgr) = setup();
1949        mgr.ensure_dirs().expect("dirs");
1950        mgr.create_shard(2026, 1).expect("shard 1");
1951        mgr.create_shard(2026, 2).expect("shard 2");
1952        mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1953        mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1954
1955        let total = mgr.total_content_len().expect("total_content_len");
1956        let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1957        assert_eq!(
1958            total, replay_total,
1959            "total_content_len and replay_from_offset total must agree"
1960        );
1961    }
1962
1963    /// A shard whose content is just a `YYYY-MM.events` filename (the
1964    /// non-Unix symlink fallback) must be silently skipped during replay,
1965    /// not treated as a parse error.
1966    #[test]
1967    fn replay_lines_skips_forwarding_pointer_shard() {
1968        let dir = tempfile::tempdir().expect("tmpdir");
1969        let mgr = ShardManager::new(dir.path());
1970        mgr.ensure_dirs().expect("dirs");
1971
1972        // Jan shard has real events.
1973        mgr.create_shard(2026, 1).expect("create jan");
1974        mgr.append_raw(2026, 1, "# bones event log v1\n")
1975            .expect("header");
1976        mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
1977
1978        // Feb shard is a forwarding pointer (non-Unix symlink fallback).
1979        let feb_path = dir.path().join("events").join("2026-02.events");
1980        fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
1981
1982        // Mar shard has real events too.
1983        mgr.create_shard(2026, 3).expect("create mar");
1984        mgr.append_raw(2026, 3, "# bones event log v1\n")
1985            .expect("mar header");
1986        mgr.append_raw(2026, 3, "another-event-line\n")
1987            .expect("mar event");
1988
1989        let lines: Vec<String> = mgr
1990            .replay_lines()
1991            .expect("replay_lines")
1992            .map(|r| r.expect("line").1)
1993            .collect();
1994
1995        // Should see Jan and Mar lines; Feb forwarding pointer is silently skipped.
1996        assert!(
1997            lines.iter().any(|l| l.contains("real-event-line")),
1998            "jan event missing"
1999        );
2000        assert!(
2001            lines.iter().any(|l| l.contains("another-event-line")),
2002            "mar event missing"
2003        );
2004        assert!(
2005            !lines.iter().any(|l| l.contains("2026-03.events")),
2006            "forwarding pointer content must not appear in replay"
2007        );
2008    }
2009}