1use std::fs::{self, OpenOptions};
30use std::io::{self, Write as IoWrite};
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use chrono::Datelike;
35
36use crate::event::writer::shard_header;
37use crate::lock::ShardLock;
38
39#[derive(Debug, thiserror::Error)]
45pub enum ShardError {
46 #[error("shard I/O error: {0}")]
48 Io(#[from] io::Error),
49
50 #[error("lock error: {0}")]
52 Lock(#[from] crate::lock::LockError),
53
54 #[error("failed to initialize .bones directory: {0}")]
56 InitFailed(io::Error),
57
58 #[error("invalid shard filename: {0}")]
60 InvalidShardName(String),
61
62 #[error("corrupted shard {path}: {reason}")]
64 CorruptedShard {
65 path: PathBuf,
67 reason: String,
69 },
70}
71
72#[derive(Debug, Clone)]
74pub struct ShardIntegrityIssue {
75 pub shard_name: String,
77 pub problem: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardManifest {
88 pub shard_name: String,
90 pub event_count: u64,
92 pub byte_len: u64,
94 pub file_hash: String,
96}
97
98impl ShardManifest {
99 #[must_use]
101 pub fn to_string_repr(&self) -> String {
102 format!(
103 "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
104 self.shard_name, self.event_count, self.byte_len, self.file_hash
105 )
106 }
107
108 #[must_use]
112 pub fn from_string_repr(s: &str) -> Option<Self> {
113 let mut shard_name = None;
114 let mut event_count = None;
115 let mut byte_len = None;
116 let mut file_hash = None;
117
118 for line in s.lines() {
119 if let Some(val) = line.strip_prefix("shard: ") {
120 shard_name = Some(val.to_string());
121 } else if let Some(val) = line.strip_prefix("event_count: ") {
122 event_count = val.parse().ok();
123 } else if let Some(val) = line.strip_prefix("byte_len: ") {
124 byte_len = val.parse().ok();
125 } else if let Some(val) = line.strip_prefix("file_hash: ") {
126 file_hash = Some(val.to_string());
127 }
128 }
129
130 Some(Self {
131 shard_name: shard_name?,
132 event_count: event_count?,
133 byte_len: byte_len?,
134 file_hash: file_hash?,
135 })
136 }
137}
138
139pub struct ShardManager {
154 bones_dir: PathBuf,
156}
157
158impl ShardManager {
159 #[must_use]
164 pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
165 Self {
166 bones_dir: bones_dir.into(),
167 }
168 }
169
170 #[must_use]
172 pub fn events_dir(&self) -> PathBuf {
173 self.bones_dir.join("events")
174 }
175
176 #[must_use]
178 pub fn lock_path(&self) -> PathBuf {
179 self.bones_dir.join("lock")
180 }
181
182 #[must_use]
184 pub fn clock_path(&self) -> PathBuf {
185 self.bones_dir.join("cache").join("clock")
186 }
187
188 #[must_use]
190 pub fn shard_filename(year: i32, month: u32) -> String {
191 format!("{year:04}-{month:02}.events")
192 }
193
194 #[must_use]
196 pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
197 self.events_dir().join(Self::shard_filename(year, month))
198 }
199
200 #[must_use]
202 pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
203 self.events_dir()
204 .join(format!("{year:04}-{month:02}.manifest"))
205 }
206
207 pub fn ensure_dirs(&self) -> Result<(), ShardError> {
218 fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
219 fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
220 Ok(())
221 }
222
223 pub fn init(&self) -> Result<(i32, u32), ShardError> {
233 self.ensure_dirs()?;
234
235 let shards = self.list_shards()?;
236 if shards.is_empty() {
237 let (year, month) = current_year_month();
238 self.create_shard(year, month)?;
239 Ok((year, month))
240 } else if let Some(&(year, month)) = shards.last() {
241 Ok((year, month))
242 } else {
243 unreachable!("shards is non-empty")
244 }
245 }
246
247 pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
260 let events_dir = self.events_dir();
261 if !events_dir.exists() {
262 return Ok(Vec::new());
263 }
264
265 let mut shards = Vec::new();
266 for entry in fs::read_dir(&events_dir)? {
267 let entry = entry?;
268 let name = entry.file_name();
269 let name_str = name.to_string_lossy();
270 if let Some(ym) = parse_shard_filename(&name_str) {
271 shards.push(ym);
272 }
273 }
274 shards.sort_unstable();
275 Ok(shards)
276 }
277
278 pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
284 let shards = self.list_shards()?;
285 Ok(shards.last().copied())
286 }
287
288 pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
301 let path = self.shard_path(year, month);
302 if path.exists() {
303 return Ok(path);
304 }
305
306 let header = shard_header();
307 fs::write(&path, header)?;
308 Ok(path)
309 }
310
311 pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
320 let (current_year, current_month) = current_year_month();
321 let active = self.active_shard()?;
322
323 match active {
324 Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
325 Some((y, m)) => {
326 self.write_manifest(y, m)?;
328 self.create_shard(current_year, current_month)?;
330 Ok((current_year, current_month))
331 }
332 None => {
333 self.create_shard(current_year, current_month)?;
335 Ok((current_year, current_month))
336 }
337 }
338 }
339
340 pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
351 let shard_path = self.shard_path(year, month);
352 let content = fs::read(&shard_path)?;
353 let content_str = String::from_utf8_lossy(&content);
354
355 let event_count = content_str
357 .lines()
358 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
359 .count() as u64;
360
361 let byte_len = content.len() as u64;
362 let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
363
364 let manifest = ShardManifest {
365 shard_name: Self::shard_filename(year, month),
366 event_count,
367 byte_len,
368 file_hash,
369 };
370
371 let manifest_path = self.manifest_path(year, month);
372 fs::write(&manifest_path, manifest.to_string_repr())?;
373
374 Ok(manifest)
375 }
376
377 pub fn read_manifest(
384 &self,
385 year: i32,
386 month: u32,
387 ) -> Result<Option<ShardManifest>, ShardError> {
388 let manifest_path = self.manifest_path(year, month);
389 if !manifest_path.exists() {
390 return Ok(None);
391 }
392 let content = fs::read_to_string(&manifest_path)?;
393 Ok(ShardManifest::from_string_repr(&content))
394 }
395
396 pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
414 let shards = self.list_shards()?;
415 let mut issues = Vec::new();
416
417 let sealed = if shards.len() > 1 {
419 &shards[..shards.len() - 1]
420 } else {
421 return Ok(issues);
422 };
423
424 for &(year, month) in sealed {
425 let shard_path = self.shard_path(year, month);
426 let shard_name = Self::shard_filename(year, month);
427
428 let Some(manifest) = self.read_manifest(year, month)? else {
429 tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
430 issues.push(ShardIntegrityIssue {
431 shard_name: shard_name.clone(),
432 problem: "sealed shard has no manifest file".into(),
433 });
434 continue;
435 };
436
437 let file_len = fs::metadata(&shard_path)?.len();
438 if file_len != manifest.byte_len {
439 tracing::error!(
440 shard = %shard_name,
441 expected = manifest.byte_len,
442 actual = file_len,
443 "sealed shard byte length mismatch"
444 );
445 issues.push(ShardIntegrityIssue {
446 shard_name,
447 problem: format!(
448 "byte length mismatch: manifest says {} bytes, file is {} bytes",
449 manifest.byte_len, file_len
450 ),
451 });
452 }
453 }
454
455 Ok(issues)
456 }
457
458 pub fn append(
481 &self,
482 line: &str,
483 durable: bool,
484 lock_timeout: Duration,
485 ) -> Result<i64, ShardError> {
486 self.ensure_dirs()?;
487
488 let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
489
490 let (year, month) = self.rotate_if_needed()?;
492 let shard_path = self.shard_path(year, month);
493
494 if shard_path.exists() {
496 validate_shard_header(&shard_path)?;
497 }
498
499 let ts = self.next_timestamp()?;
501
502 let mut file = OpenOptions::new()
504 .create(true)
505 .append(true)
506 .open(&shard_path)?;
507
508 file.write_all(line.as_bytes())?;
509 file.flush()?;
510
511 if durable {
512 file.sync_data()?;
513 }
514
515 Ok(ts)
516 }
517
518 pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
527 let shard_path = self.shard_path(year, month);
528
529 let mut file = OpenOptions::new()
530 .create(true)
531 .append(true)
532 .open(&shard_path)?;
533
534 file.write_all(line.as_bytes())?;
535 file.flush()?;
536 Ok(())
537 }
538
539 pub fn read_clock(&self) -> Result<i64, ShardError> {
552 let path = self.clock_path();
553 if !path.exists() {
554 return Ok(0);
555 }
556 let content = fs::read_to_string(&path)?;
557 Ok(content.trim().parse::<i64>().unwrap_or(0))
558 }
559
560 pub fn next_timestamp(&self) -> Result<i64, ShardError> {
571 let last = self.read_clock()?;
572 let now = system_time_us();
573 let next = std::cmp::max(now, last + 1);
574 self.write_clock(next)?;
575 Ok(next)
576 }
577
578 fn write_clock(&self, value: i64) -> Result<(), ShardError> {
580 let path = self.clock_path();
581 if let Some(parent) = path.parent() {
582 fs::create_dir_all(parent)?;
583 }
584 fs::write(&path, value.to_string())?;
585 Ok(())
586 }
587
588 pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
607 let Some(active) = self.active_shard()? else {
608 return Ok(None);
609 };
610
611 let shard_path = self.shard_path(active.0, active.1);
612 recover_shard_torn_write(&shard_path)
613 }
614
615 pub fn replay(&self) -> Result<String, ShardError> {
628 let shards = self.list_shards()?;
629 let mut content = String::new();
630
631 for (year, month) in shards {
632 let path = self.shard_path(year, month);
633 let shard_content = fs::read_to_string(&path)?;
634 content.push_str(&shard_content);
635 }
636
637 Ok(content)
638 }
639
640 pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
646 let path = self.shard_path(year, month);
647 Ok(fs::read_to_string(&path)?)
648 }
649
650 pub fn total_content_len(&self) -> Result<usize, ShardError> {
660 let shards = self.list_shards()?;
661 let mut total = 0usize;
662 for (year, month) in shards {
663 let path = self.shard_path(year, month);
664 let meta = fs::metadata(&path)?;
665 total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
666 }
667 Ok(total)
668 }
669
670 pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
687 let shards = self.list_shards()?;
688 let mut cumulative: usize = 0;
689 let mut result = String::new();
690 let mut found_start = false;
691
692 for (year, month) in shards {
693 let path = self.shard_path(year, month);
694 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
695
696 let shard_end = cumulative.saturating_add(shard_len);
697
698 if shard_end <= offset {
699 cumulative = shard_end;
701 continue;
702 }
703
704 let shard_content = fs::read_to_string(&path)?;
706
707 if found_start {
708 result.push_str(&shard_content);
709 } else {
710 let within = offset.saturating_sub(cumulative);
712 let within = within.min(shard_content.len());
714 result.push_str(&shard_content[within..]);
715 found_start = true;
716 }
717
718 cumulative = shard_end;
719 }
720
721 Ok((result, cumulative))
722 }
723
724 pub fn read_content_range(
736 &self,
737 start_offset: usize,
738 end_offset: usize,
739 ) -> Result<String, ShardError> {
740 if start_offset >= end_offset {
741 return Ok(String::new());
742 }
743
744 let shards = self.list_shards()?;
745 let mut cumulative: usize = 0;
746 let mut result = String::new();
747
748 for (year, month) in shards {
749 let path = self.shard_path(year, month);
750 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
751 let shard_end = cumulative.saturating_add(shard_len);
752
753 if shard_end <= start_offset {
754 cumulative = shard_end;
756 continue;
757 }
758
759 if cumulative >= end_offset {
760 break;
762 }
763
764 let shard_content = fs::read_to_string(&path)?;
765
766 let within_start = if cumulative < start_offset {
768 (start_offset - cumulative).min(shard_content.len())
769 } else {
770 0
771 };
772 let within_end = if shard_end > end_offset {
773 (end_offset - cumulative).min(shard_content.len())
774 } else {
775 shard_content.len()
776 };
777
778 result.push_str(&shard_content[within_start..within_end]);
779 cumulative = shard_end;
780 }
781
782 Ok(result)
783 }
784
785 pub fn event_count(&self) -> Result<u64, ShardError> {
791 let content = self.replay()?;
792 let count = content
793 .lines()
794 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
795 .count();
796 Ok(count as u64)
797 }
798
799 pub fn replay_lines(
807 &self,
808 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
809 self.replay_lines_from_offset(0)
810 }
811
812 pub fn replay_lines_from_offset(
820 &self,
821 offset: usize,
822 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
823 let shards = self.list_shards()?;
824 let bones_dir = self.bones_dir.clone();
825
826 Ok(ShardLineIterator {
827 shards,
828 current_shard_idx: 0,
829 current_reader: None,
830 cumulative_offset: 0,
831 bones_dir,
832 }
833 .skip_to_offset(offset))
834 }
835
836 pub fn is_empty(&self) -> Result<bool, ShardError> {
842 let shards = self.list_shards()?;
843 Ok(shards.is_empty())
844 }
845}
846
847struct ShardLineIterator {
852 shards: Vec<(i32, u32)>,
853 current_shard_idx: usize,
854 current_reader: Option<io::BufReader<fs::File>>,
855 cumulative_offset: usize,
856 bones_dir: PathBuf,
857}
858
859impl ShardLineIterator {
860 fn skip_to_offset(mut self, offset: usize) -> Self {
861 while self.current_shard_idx < self.shards.len() {
863 let (year, month) = self.shards[self.current_shard_idx];
864 let shard_path = self
865 .bones_dir
866 .join("events")
867 .join(ShardManager::shard_filename(year, month));
868 if let Ok(meta) = fs::metadata(shard_path) {
869 let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
870 if self.cumulative_offset + shard_len <= offset {
871 self.cumulative_offset += shard_len;
872 self.current_shard_idx += 1;
873 continue;
874 }
875 }
876 break;
877 }
878
879 self.cumulative_offset = offset;
884 self
885 }
886}
887
888impl Iterator for ShardLineIterator {
889 type Item = io::Result<(usize, String)>;
890
891 fn next(&mut self) -> Option<Self::Item> {
892 use std::io::{BufRead, Seek, SeekFrom};
893
894 loop {
895 if self.current_reader.is_none() {
896 if self.current_shard_idx >= self.shards.len() {
897 return None;
898 }
899
900 let (year, month) = self.shards[self.current_shard_idx];
901 let shard_path = self
902 .bones_dir
903 .join("events")
904 .join(ShardManager::shard_filename(year, month));
905
906 if is_forwarding_pointer(&shard_path) {
910 tracing::warn!(
911 shard = %shard_path.display(),
912 "skipping legacy forwarding-pointer shard during replay"
913 );
914 if let Ok(meta) = fs::metadata(&shard_path) {
915 self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
916 }
917 self.current_shard_idx += 1;
918 continue;
919 }
920
921 if let Err(e) = validate_shard_header(&shard_path) {
923 tracing::error!(
924 shard = %shard_path.display(),
925 error = %e,
926 "shard header validation failed"
927 );
928 return Some(Err(io::Error::new(
929 io::ErrorKind::InvalidData,
930 e.to_string(),
931 )));
932 }
933
934 let mut file = match fs::File::open(shard_path) {
935 Ok(f) => f,
936 Err(e) => return Some(Err(e)),
937 };
938
939 let mut cumulative_before = 0;
941 for i in 0..self.current_shard_idx {
942 let (y, m) = self.shards[i];
943 let p = self
944 .bones_dir
945 .join("events")
946 .join(ShardManager::shard_filename(y, m));
947 if let Ok(meta) = fs::metadata(p) {
948 cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
949 }
950 }
951
952 if self.cumulative_offset > cumulative_before {
953 let within = self.cumulative_offset - cumulative_before;
954 if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
955 return Some(Err(e));
956 }
957 }
958
959 self.current_reader = Some(io::BufReader::new(file));
960 }
961
962 let reader = self
963 .current_reader
964 .as_mut()
965 .expect("reader was just set above");
966 let mut line = String::new();
967 let offset = self.cumulative_offset;
968
969 match reader.read_line(&mut line) {
970 Ok(0) => {
971 self.current_reader = None;
973 self.current_shard_idx += 1;
974 }
975 Ok(n) => {
976 self.cumulative_offset += n;
977 return Some(Ok((offset, line)));
978 }
979 Err(e) => return Some(Err(e)),
980 }
981 }
982 }
983}
984
985#[must_use]
991fn current_year_month() -> (i32, u32) {
992 let now = chrono::Utc::now();
993 (now.year(), now.month())
994}
995
996#[allow(clippy::cast_possible_truncation)]
998#[must_use]
999fn system_time_us() -> i64 {
1000 SystemTime::now()
1001 .duration_since(UNIX_EPOCH)
1002 .map(|d| d.as_micros() as i64)
1003 .unwrap_or(0)
1004}
1005
1006fn is_forwarding_pointer(path: &Path) -> bool {
1013 let Ok(meta) = fs::metadata(path) else {
1014 return false;
1015 };
1016 if meta.len() > 30 {
1017 return false;
1018 }
1019 let Ok(content) = fs::read_to_string(path) else {
1020 return false;
1021 };
1022 parse_shard_filename(content.trim()).is_some()
1023}
1024
1025pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
1036 use crate::event::writer::SHARD_HEADER;
1037 use std::io::{BufRead, BufReader};
1038
1039 if is_forwarding_pointer(path) {
1041 return Ok(());
1042 }
1043
1044 let file = fs::File::open(path)?;
1045 let mut reader = BufReader::new(file);
1046 let mut first_line = String::new();
1047 let n = reader.read_line(&mut first_line)?;
1048
1049 if n == 0 {
1051 return Ok(());
1052 }
1053
1054 let trimmed = first_line.trim_end();
1055 if trimmed != SHARD_HEADER {
1056 return Err(ShardError::CorruptedShard {
1057 path: path.to_path_buf(),
1058 reason: format!(
1059 "expected header '{}', found '{}'",
1060 SHARD_HEADER,
1061 &trimmed[..trimmed.len().min(80)]
1062 ),
1063 });
1064 }
1065
1066 Ok(())
1067}
1068
1069fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
1071 let stem = name.strip_suffix(".events")?;
1072 if stem == "current" {
1074 return None;
1075 }
1076 let (year_str, month_str) = stem.split_once('-')?;
1077 let year: i32 = year_str.parse().ok()?;
1078 let month: u32 = month_str.parse().ok()?;
1079 if !(1..=12).contains(&month) {
1080 return None;
1081 }
1082 Some((year, month))
1083}
1084
1085fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
1090 let metadata = fs::metadata(path)?;
1091 let file_len = metadata.len();
1092 if file_len == 0 {
1093 return Ok(None);
1094 }
1095
1096 let content = fs::read(path)?;
1097
1098 let last_newline = content.iter().rposition(|&b| b == b'\n');
1100
1101 if let Some(pos) = last_newline {
1102 let expected_len = (pos + 1) as u64;
1103 if expected_len < file_len {
1104 let truncated = file_len - expected_len;
1106 let file = OpenOptions::new().write(true).open(path)?;
1107 file.set_len(expected_len)?;
1108 Ok(Some(truncated))
1109 } else {
1110 Ok(None)
1112 }
1113 } else {
1114 let file = OpenOptions::new().write(true).open(path)?;
1117 file.set_len(0)?;
1118 Ok(Some(file_len))
1119 }
1120}
1121
1122#[cfg(test)]
1127mod tests {
1128 use super::*;
1129 use tempfile::TempDir;
1130
1131 fn setup() -> (TempDir, ShardManager) {
1132 let tmp = TempDir::new().expect("tempdir");
1133 let bones_dir = tmp.path().join(".bones");
1134 let mgr = ShardManager::new(&bones_dir);
1135 (tmp, mgr)
1136 }
1137
1138 #[test]
1143 fn parse_valid_shard_filenames() {
1144 assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1145 assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1146 assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1147 }
1148
1149 #[test]
1150 fn parse_invalid_shard_filenames() {
1151 assert_eq!(parse_shard_filename("current.events"), None);
1152 assert_eq!(parse_shard_filename("2026-13.events"), None); assert_eq!(parse_shard_filename("2026-00.events"), None); assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1155 assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1156 assert_eq!(parse_shard_filename(""), None);
1157 }
1158
1159 #[test]
1164 fn shard_manager_paths() {
1165 let mgr = ShardManager::new("/repo/.bones");
1166 assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1167 assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1168 assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1169 assert_eq!(
1170 mgr.shard_path(2026, 2),
1171 PathBuf::from("/repo/.bones/events/2026-02.events")
1172 );
1173 assert_eq!(
1174 mgr.manifest_path(2026, 1),
1175 PathBuf::from("/repo/.bones/events/2026-01.manifest")
1176 );
1177 }
1178
1179 #[test]
1180 fn shard_filename_format() {
1181 assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1182 assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1183 assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1184 }
1185
1186 #[test]
1191 fn ensure_dirs_creates_directories() {
1192 let (_tmp, mgr) = setup();
1193 mgr.ensure_dirs().expect("should create dirs");
1194 assert!(mgr.events_dir().exists());
1195 assert!(mgr.bones_dir.join("cache").exists());
1196 }
1197
1198 #[test]
1199 fn ensure_dirs_is_idempotent() {
1200 let (_tmp, mgr) = setup();
1201 mgr.ensure_dirs().expect("first");
1202 mgr.ensure_dirs().expect("second");
1203 assert!(mgr.events_dir().exists());
1204 }
1205
1206 #[test]
1207 fn init_creates_first_shard() {
1208 let (_tmp, mgr) = setup();
1209 let (year, month) = mgr.init().expect("init");
1210
1211 let (expected_year, expected_month) = current_year_month();
1212 assert_eq!(year, expected_year);
1213 assert_eq!(month, expected_month);
1214
1215 let shard_path = mgr.shard_path(year, month);
1217 assert!(shard_path.exists());
1218 let content = fs::read_to_string(&shard_path).expect("read");
1219 assert!(content.starts_with("# bones event log v1"));
1220 }
1221
1222 #[test]
1223 fn init_is_idempotent() {
1224 let (_tmp, mgr) = setup();
1225 let first = mgr.init().expect("first");
1226 let second = mgr.init().expect("second");
1227 assert_eq!(first, second);
1228 }
1229
1230 #[test]
1235 fn list_shards_empty() {
1236 let (_tmp, mgr) = setup();
1237 mgr.ensure_dirs().expect("dirs");
1238 let shards = mgr.list_shards().expect("list");
1239 assert!(shards.is_empty());
1240 }
1241
1242 #[test]
1243 fn list_shards_returns_sorted() {
1244 let (_tmp, mgr) = setup();
1245 mgr.ensure_dirs().expect("dirs");
1246
1247 mgr.create_shard(2026, 3).expect("create");
1249 mgr.create_shard(2026, 1).expect("create");
1250 mgr.create_shard(2026, 2).expect("create");
1251
1252 let shards = mgr.list_shards().expect("list");
1253 assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1254 }
1255
1256 #[test]
1257 fn list_shards_skips_non_shard_files() {
1258 let (_tmp, mgr) = setup();
1259 mgr.ensure_dirs().expect("dirs");
1260 mgr.create_shard(2026, 1).expect("create");
1261
1262 fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1264 fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1265
1266 let shards = mgr.list_shards().expect("list");
1267 assert_eq!(shards, vec![(2026, 1)]);
1268 }
1269
1270 #[test]
1271 fn list_shards_no_events_dir() {
1272 let (_tmp, mgr) = setup();
1273 let shards = mgr.list_shards().expect("list");
1275 assert!(shards.is_empty());
1276 }
1277
1278 #[test]
1283 fn create_shard_writes_header() {
1284 let (_tmp, mgr) = setup();
1285 mgr.ensure_dirs().expect("dirs");
1286 let path = mgr.create_shard(2026, 2).expect("create");
1287
1288 let content = fs::read_to_string(&path).expect("read");
1289 assert!(content.starts_with("# bones event log v1"));
1290 assert!(content.contains("# fields:"));
1291 assert_eq!(content.lines().count(), 2);
1292 }
1293
1294 #[test]
1295 fn create_shard_idempotent() {
1296 let (_tmp, mgr) = setup();
1297 mgr.ensure_dirs().expect("dirs");
1298 let p1 = mgr.create_shard(2026, 2).expect("first");
1299 fs::write(&p1, "modified").expect("write");
1301 let p2 = mgr.create_shard(2026, 2).expect("second");
1303 assert_eq!(p1, p2);
1304 let content = fs::read_to_string(&p2).expect("read");
1305 assert_eq!(content, "modified");
1306 }
1307
1308 #[test]
1313 fn clock_starts_at_zero() {
1314 let (_tmp, mgr) = setup();
1315 mgr.ensure_dirs().expect("dirs");
1316 let ts = mgr.read_clock().expect("read");
1317 assert_eq!(ts, 0);
1318 }
1319
1320 #[test]
1321 fn clock_is_monotonic() {
1322 let (_tmp, mgr) = setup();
1323 mgr.ensure_dirs().expect("dirs");
1324 let t1 = mgr.next_timestamp().expect("t1");
1325 let t2 = mgr.next_timestamp().expect("t2");
1326 let t3 = mgr.next_timestamp().expect("t3");
1327 assert!(t2 > t1);
1328 assert!(t3 > t2);
1329 }
1330
1331 #[test]
1332 fn clock_reads_back_written_value() {
1333 let (_tmp, mgr) = setup();
1334 mgr.ensure_dirs().expect("dirs");
1335 mgr.write_clock(42_000_000).expect("write");
1336 let ts = mgr.read_clock().expect("read");
1337 assert_eq!(ts, 42_000_000);
1338 }
1339
1340 #[test]
1341 fn clock_never_goes_backward() {
1342 let (_tmp, mgr) = setup();
1343 mgr.ensure_dirs().expect("dirs");
1344
1345 let future = system_time_us() + 10_000_000;
1347 mgr.write_clock(future).expect("write");
1348
1349 let next = mgr.next_timestamp().expect("next");
1350 assert!(next > future, "clock should advance past future value");
1351 }
1352
1353 #[test]
1358 fn append_raw_adds_line() {
1359 let (_tmp, mgr) = setup();
1360 mgr.ensure_dirs().expect("dirs");
1361 mgr.create_shard(2026, 2).expect("create");
1362
1363 mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1364 mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1365
1366 let content = mgr.read_shard(2026, 2).expect("read");
1367 assert!(content.contains("event line 1"));
1368 assert!(content.contains("event line 2"));
1369 }
1370
1371 #[test]
1372 fn append_with_lock() {
1373 let (_tmp, mgr) = setup();
1374 mgr.init().expect("init");
1375
1376 let _ts = mgr
1377 .append("test event line\n", false, Duration::from_secs(1))
1378 .expect("append");
1379
1380 let content = mgr.replay().expect("replay");
1381 assert!(content.contains("test event line"));
1382 }
1383
1384 #[test]
1385 fn append_returns_monotonic_timestamps() {
1386 let (_tmp, mgr) = setup();
1387 mgr.init().expect("init");
1388
1389 let t1 = mgr
1390 .append("line1\n", false, Duration::from_secs(1))
1391 .expect("t1");
1392 let t2 = mgr
1393 .append("line2\n", false, Duration::from_secs(1))
1394 .expect("t2");
1395
1396 assert!(t2 > t1);
1397 }
1398
1399 #[test]
1404 fn recover_clean_file() {
1405 let (_tmp, mgr) = setup();
1406 mgr.init().expect("init");
1407
1408 let (y, m) = current_year_month();
1409 mgr.append_raw(y, m, "complete line\n").expect("append");
1410
1411 let recovered = mgr.recover_torn_writes().expect("recover");
1412 assert_eq!(recovered, None);
1413 }
1414
1415 #[test]
1416 fn recover_torn_write_truncates() {
1417 let (_tmp, mgr) = setup();
1418 let (y, m) = mgr.init().expect("init");
1419 let shard_path = mgr.shard_path(y, m);
1420
1421 {
1423 let mut f = OpenOptions::new()
1424 .append(true)
1425 .open(&shard_path)
1426 .expect("open");
1427 f.write_all(b"complete line\npartial line without newline")
1428 .expect("write");
1429 f.flush().expect("flush");
1430 }
1431
1432 let recovered = mgr.recover_torn_writes().expect("recover");
1433 assert!(recovered.is_some());
1434
1435 let truncated = recovered.expect("checked is_some");
1436 assert_eq!(truncated, "partial line without newline".len() as u64);
1437
1438 let content = fs::read_to_string(&shard_path).expect("read");
1440 assert!(content.ends_with('\n'));
1441 assert!(content.contains("complete line"));
1442 assert!(!content.contains("partial line without newline"));
1443 }
1444
1445 #[test]
1446 fn recover_no_newline_at_all() {
1447 let (_tmp, mgr) = setup();
1448 let (y, m) = mgr.init().expect("init");
1449 let shard_path = mgr.shard_path(y, m);
1450
1451 fs::write(&shard_path, "no newlines here").expect("write");
1453
1454 let recovered = mgr.recover_torn_writes().expect("recover");
1455 assert_eq!(recovered, Some("no newlines here".len() as u64));
1456
1457 let content = fs::read_to_string(&shard_path).expect("read");
1459 assert!(content.is_empty());
1460 }
1461
1462 #[test]
1463 fn recover_empty_file() {
1464 let (_tmp, mgr) = setup();
1465 let (y, m) = mgr.init().expect("init");
1466 let shard_path = mgr.shard_path(y, m);
1467
1468 fs::write(&shard_path, "").expect("write");
1470
1471 let recovered = mgr.recover_torn_writes().expect("recover");
1472 assert_eq!(recovered, None);
1473 }
1474
1475 #[test]
1476 fn recover_no_active_shard() {
1477 let (_tmp, mgr) = setup();
1478 mgr.ensure_dirs().expect("dirs");
1479
1480 let recovered = mgr.recover_torn_writes().expect("recover");
1481 assert_eq!(recovered, None);
1482 }
1483
1484 #[test]
1489 fn replay_empty_repo() {
1490 let (_tmp, mgr) = setup();
1491 mgr.ensure_dirs().expect("dirs");
1492 let content = mgr.replay().expect("replay");
1493 assert!(content.is_empty());
1494 }
1495
1496 #[test]
1497 fn replay_single_shard() {
1498 let (_tmp, mgr) = setup();
1499 mgr.ensure_dirs().expect("dirs");
1500 mgr.create_shard(2026, 1).expect("create");
1501 mgr.append_raw(2026, 1, "event-a\n").expect("append");
1502
1503 let content = mgr.replay().expect("replay");
1504 assert!(content.contains("event-a"));
1505 }
1506
1507 #[test]
1508 fn replay_multiple_shards_in_order() {
1509 let (_tmp, mgr) = setup();
1510 mgr.ensure_dirs().expect("dirs");
1511
1512 mgr.create_shard(2026, 1).expect("create");
1513 mgr.create_shard(2026, 2).expect("create");
1514 mgr.create_shard(2026, 3).expect("create");
1515
1516 mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1517 mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1518 mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1519
1520 let content = mgr.replay().expect("replay");
1521
1522 let jan_pos = content.find("event-jan").expect("jan");
1524 let feb_pos = content.find("event-feb").expect("feb");
1525 let mar_pos = content.find("event-mar").expect("mar");
1526 assert!(jan_pos < feb_pos);
1527 assert!(feb_pos < mar_pos);
1528 }
1529
1530 #[test]
1535 fn event_count_empty() {
1536 let (_tmp, mgr) = setup();
1537 mgr.ensure_dirs().expect("dirs");
1538 assert_eq!(mgr.event_count().expect("count"), 0);
1539 }
1540
1541 #[test]
1542 fn event_count_excludes_comments_and_blanks() {
1543 let (_tmp, mgr) = setup();
1544 mgr.ensure_dirs().expect("dirs");
1545 mgr.create_shard(2026, 1).expect("create");
1546 mgr.append_raw(2026, 1, "event1\n").expect("append");
1548 mgr.append_raw(2026, 1, "event2\n").expect("append");
1549 mgr.append_raw(2026, 1, "\n").expect("blank");
1550
1551 assert_eq!(mgr.event_count().expect("count"), 2);
1552 }
1553
1554 #[test]
1559 fn is_empty_no_shards() {
1560 let (_tmp, mgr) = setup();
1561 mgr.ensure_dirs().expect("dirs");
1562 assert!(mgr.is_empty().expect("empty"));
1563 }
1564
1565 #[test]
1566 fn is_empty_with_shards() {
1567 let (_tmp, mgr) = setup();
1568 mgr.init().expect("init");
1569 assert!(!mgr.is_empty().expect("empty"));
1570 }
1571
1572 #[test]
1577 fn write_and_read_manifest() {
1578 let (_tmp, mgr) = setup();
1579 mgr.ensure_dirs().expect("dirs");
1580 mgr.create_shard(2026, 1).expect("create");
1581 mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1582 mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1583
1584 let written = mgr.write_manifest(2026, 1).expect("write manifest");
1585 assert_eq!(written.shard_name, "2026-01.events");
1586 assert_eq!(written.event_count, 2);
1587 assert!(written.byte_len > 0);
1588 assert!(written.file_hash.starts_with("blake3:"));
1589
1590 let read = mgr
1591 .read_manifest(2026, 1)
1592 .expect("read")
1593 .expect("should exist");
1594 assert_eq!(read, written);
1595 }
1596
1597 #[test]
1598 fn manifest_roundtrip() {
1599 let manifest = ShardManifest {
1600 shard_name: "2026-01.events".into(),
1601 event_count: 42,
1602 byte_len: 12345,
1603 file_hash: "blake3:abcdef0123456789".into(),
1604 };
1605
1606 let repr = manifest.to_string_repr();
1607 let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1608 assert_eq!(parsed, manifest);
1609 }
1610
1611 #[test]
1612 fn read_manifest_missing() {
1613 let (_tmp, mgr) = setup();
1614 mgr.ensure_dirs().expect("dirs");
1615 let result = mgr.read_manifest(2026, 1).expect("read");
1616 assert!(result.is_none());
1617 }
1618
1619 #[test]
1620 fn manifest_event_count_excludes_comments() {
1621 let (_tmp, mgr) = setup();
1622 mgr.ensure_dirs().expect("dirs");
1623 mgr.create_shard(2026, 1).expect("create");
1624 mgr.append_raw(2026, 1, "event1\n").expect("append");
1626
1627 let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1628 assert_eq!(manifest.event_count, 1);
1630 }
1631
1632 #[test]
1637 fn rotate_creates_shard_if_none_exist() {
1638 let (_tmp, mgr) = setup();
1639 mgr.ensure_dirs().expect("dirs");
1640
1641 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1642 let (ey, em) = current_year_month();
1643 assert_eq!((y, m), (ey, em));
1644
1645 assert!(mgr.shard_path(y, m).exists());
1646 }
1647
1648 #[test]
1649 fn rotate_no_op_same_month() {
1650 let (_tmp, mgr) = setup();
1651 let (y, m) = mgr.init().expect("init");
1652
1653 let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1654 assert_eq!((y, m), (y2, m2));
1655 }
1656
1657 #[test]
1658 fn rotate_different_month_seals_and_creates() {
1659 let (_tmp, mgr) = setup();
1660 mgr.ensure_dirs().expect("dirs");
1661
1662 mgr.create_shard(2025, 11).expect("create");
1664 mgr.append_raw(2025, 11, "old event\n").expect("append");
1665
1666 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1668 let (ey, em) = current_year_month();
1669 assert_eq!((y, m), (ey, em));
1670
1671 assert!(mgr.manifest_path(2025, 11).exists());
1673
1674 assert!(mgr.shard_path(ey, em).exists());
1676 }
1677
1678 #[test]
1683 fn frozen_shard_not_modified_by_append() {
1684 let (_tmp, mgr) = setup();
1685 mgr.ensure_dirs().expect("dirs");
1686
1687 mgr.create_shard(2025, 6).expect("create");
1689 mgr.append_raw(2025, 6, "old event\n").expect("append");
1690 let old_content = mgr.read_shard(2025, 6).expect("read");
1691
1692 mgr.init().expect("init");
1694
1695 mgr.append("new event\n", false, Duration::from_secs(1))
1697 .expect("append");
1698
1699 let after_content = mgr.read_shard(2025, 6).expect("read");
1701 assert_eq!(old_content, after_content);
1702 }
1703
1704 #[test]
1709 fn system_time_us_is_positive() {
1710 let ts = system_time_us();
1711 assert!(ts > 0, "system time should be positive: {ts}");
1712 }
1713
1714 #[test]
1715 fn system_time_us_is_reasonable() {
1716 let ts = system_time_us();
1717 let jan_2020_us: i64 = 1_577_836_800_000_000;
1719 assert!(ts > jan_2020_us, "system time too small: {ts}");
1720 }
1721
1722 #[test]
1727 fn total_content_len_empty_repo() {
1728 let (_tmp, mgr) = setup();
1729 mgr.ensure_dirs().expect("dirs");
1730 let len = mgr.total_content_len().expect("len");
1731 assert_eq!(len, 0);
1732 }
1733
1734 #[test]
1735 fn total_content_len_single_shard() {
1736 let (_tmp, mgr) = setup();
1737 mgr.ensure_dirs().expect("dirs");
1738 mgr.create_shard(2026, 1).expect("create");
1739 mgr.append_raw(2026, 1, "line1\n").expect("append");
1740 mgr.append_raw(2026, 1, "line2\n").expect("append");
1741
1742 let full = mgr.replay().expect("replay");
1743 let len = mgr.total_content_len().expect("len");
1744 assert_eq!(len, full.len());
1745 }
1746
1747 #[test]
1748 fn total_content_len_multiple_shards() {
1749 let (_tmp, mgr) = setup();
1750 mgr.ensure_dirs().expect("dirs");
1751 mgr.create_shard(2026, 1).expect("shard 1");
1752 mgr.create_shard(2026, 2).expect("shard 2");
1753 mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1754 mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1755
1756 let full = mgr.replay().expect("replay");
1757 let len = mgr.total_content_len().expect("len");
1758 assert_eq!(len, full.len(), "total_content_len must match replay len");
1759 }
1760
1761 #[test]
1766 fn read_content_range_empty_range() {
1767 let (_tmp, mgr) = setup();
1768 mgr.ensure_dirs().expect("dirs");
1769 mgr.create_shard(2026, 1).expect("create");
1770 mgr.append_raw(2026, 1, "event\n").expect("append");
1771
1772 let result = mgr.read_content_range(5, 5).expect("range");
1773 assert!(result.is_empty());
1774 }
1775
1776 #[test]
1777 fn read_content_range_within_single_shard() {
1778 let (_tmp, mgr) = setup();
1779 mgr.ensure_dirs().expect("dirs");
1780 mgr.create_shard(2026, 1).expect("create");
1781 mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1783
1784 let full = mgr.replay().expect("replay");
1785 let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1787 let range = mgr.read_content_range(pos, pos + 7).expect("range");
1788 assert_eq!(range, "ABCDEF\n");
1789 }
1790
1791 #[test]
1792 fn read_content_range_across_shard_boundary() {
1793 let (_tmp, mgr) = setup();
1794 mgr.ensure_dirs().expect("dirs");
1795 mgr.create_shard(2026, 1).expect("shard 1");
1796 mgr.create_shard(2026, 2).expect("shard 2");
1797 mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1798 mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1799
1800 let full = mgr.replay().expect("replay");
1801 let range = mgr.read_content_range(0, full.len()).expect("full range");
1803 assert_eq!(range, full);
1804 }
1805
1806 #[test]
1807 fn read_content_range_beyond_end() {
1808 let (_tmp, mgr) = setup();
1809 mgr.ensure_dirs().expect("dirs");
1810 mgr.create_shard(2026, 1).expect("create");
1811 mgr.append_raw(2026, 1, "event\n").expect("append");
1812
1813 let full = mgr.replay().expect("replay");
1814 let range = mgr
1816 .read_content_range(full.len(), full.len() + 100)
1817 .expect("beyond end");
1818 assert!(range.is_empty());
1819 }
1820
1821 #[test]
1826 fn replay_from_offset_zero_returns_full_content() {
1827 let (_tmp, mgr) = setup();
1828 mgr.ensure_dirs().expect("dirs");
1829 mgr.create_shard(2026, 1).expect("create");
1830 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1831 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1832
1833 let full = mgr.replay().expect("full replay");
1834 let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1835 assert_eq!(from_zero, full);
1836 assert_eq!(total_len, full.len());
1837 }
1838
1839 #[test]
1840 fn replay_from_offset_skips_content_before_cursor() {
1841 let (_tmp, mgr) = setup();
1842 mgr.ensure_dirs().expect("dirs");
1843 mgr.create_shard(2026, 1).expect("create");
1844 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1845 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1846 mgr.append_raw(2026, 1, "event3\n").expect("e3");
1847
1848 let full = mgr.replay().expect("full replay");
1849
1850 let cursor = full.find("event3").expect("event3 in content");
1852 let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1853 assert_eq!(tail, "event3\n");
1854 assert_eq!(total_len, full.len());
1855 }
1856
1857 #[test]
1858 fn replay_from_offset_at_end_returns_empty() {
1859 let (_tmp, mgr) = setup();
1860 mgr.ensure_dirs().expect("dirs");
1861 mgr.create_shard(2026, 1).expect("create");
1862 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1863
1864 let full = mgr.replay().expect("full replay");
1865 let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1866 assert!(tail.is_empty(), "tail should be empty at end of content");
1867 assert_eq!(total_len, full.len());
1868 }
1869
1870 #[test]
1871 fn replay_from_offset_skips_sealed_shards_before_cursor() {
1872 let (_tmp, mgr) = setup();
1873 mgr.ensure_dirs().expect("dirs");
1874
1875 mgr.create_shard(2026, 1).expect("jan");
1877 mgr.create_shard(2026, 2).expect("feb");
1878 mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1879 mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1880 mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1881 mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1882
1883 let full = mgr.replay().expect("full replay");
1884 let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1885
1886 let (tail, total_len) = mgr
1888 .replay_from_offset(jan_shard_len)
1889 .expect("from feb start");
1890 assert!(
1891 !tail.contains("jan-event"),
1892 "jan events should not appear in tail"
1893 );
1894 assert!(tail.contains("feb-event1"), "feb events must be in tail");
1895 assert!(tail.contains("feb-event2"), "feb events must be in tail");
1896 assert_eq!(total_len, full.len());
1897 }
1898
1899 #[test]
1900 fn replay_from_offset_total_len_equals_total_content_len() {
1901 let (_tmp, mgr) = setup();
1902 mgr.ensure_dirs().expect("dirs");
1903 mgr.create_shard(2026, 1).expect("shard 1");
1904 mgr.create_shard(2026, 2).expect("shard 2");
1905 mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1906 mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1907
1908 let total = mgr.total_content_len().expect("total_content_len");
1909 let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1910 assert_eq!(
1911 total, replay_total,
1912 "total_content_len and replay_from_offset total must agree"
1913 );
1914 }
1915
1916 #[test]
1920 fn replay_lines_skips_forwarding_pointer_shard() {
1921 let dir = tempfile::tempdir().expect("tmpdir");
1922 let mgr = ShardManager::new(dir.path());
1923 mgr.ensure_dirs().expect("dirs");
1924
1925 mgr.create_shard(2026, 1).expect("create jan");
1927 mgr.append_raw(2026, 1, "# bones event log v1\n")
1928 .expect("header");
1929 mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
1930
1931 let feb_path = dir.path().join("events").join("2026-02.events");
1933 fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
1934
1935 mgr.create_shard(2026, 3).expect("create mar");
1937 mgr.append_raw(2026, 3, "# bones event log v1\n")
1938 .expect("mar header");
1939 mgr.append_raw(2026, 3, "another-event-line\n")
1940 .expect("mar event");
1941
1942 let lines: Vec<String> = mgr
1943 .replay_lines()
1944 .expect("replay_lines")
1945 .map(|r| r.expect("line").1)
1946 .collect();
1947
1948 assert!(
1950 lines.iter().any(|l| l.contains("real-event-line")),
1951 "jan event missing"
1952 );
1953 assert!(
1954 lines.iter().any(|l| l.contains("another-event-line")),
1955 "mar event missing"
1956 );
1957 assert!(
1958 !lines.iter().any(|l| l.contains("2026-03.events")),
1959 "forwarding pointer content must not appear in replay"
1960 );
1961 }
1962}