1use std::fs::{self, OpenOptions};
30use std::io::{self, Write as IoWrite};
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use chrono::Datelike;
35
36use crate::event::writer::shard_header;
37use crate::lock::ShardLock;
38
39#[derive(Debug, thiserror::Error)]
45pub enum ShardError {
46 #[error("shard I/O error: {0}")]
48 Io(#[from] io::Error),
49
50 #[error("lock error: {0}")]
52 Lock(#[from] crate::lock::LockError),
53
54 #[error("failed to initialize .bones directory: {0}")]
56 InitFailed(io::Error),
57
58 #[error("invalid shard filename: {0}")]
60 InvalidShardName(String),
61
62 #[error("corrupted shard {path}: {reason}")]
64 CorruptedShard {
65 path: PathBuf,
67 reason: String,
69 },
70}
71
72#[derive(Debug, Clone)]
74pub struct ShardIntegrityIssue {
75 pub shard_name: String,
77 pub problem: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardManifest {
88 pub shard_name: String,
90 pub event_count: u64,
92 pub byte_len: u64,
94 pub file_hash: String,
96}
97
98impl ShardManifest {
99 #[must_use]
101 pub fn to_string_repr(&self) -> String {
102 format!(
103 "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
104 self.shard_name, self.event_count, self.byte_len, self.file_hash
105 )
106 }
107
108 #[must_use]
112 pub fn from_string_repr(s: &str) -> Option<Self> {
113 let mut shard_name = None;
114 let mut event_count = None;
115 let mut byte_len = None;
116 let mut file_hash = None;
117
118 for line in s.lines() {
119 if let Some(val) = line.strip_prefix("shard: ") {
120 shard_name = Some(val.to_string());
121 } else if let Some(val) = line.strip_prefix("event_count: ") {
122 event_count = val.parse().ok();
123 } else if let Some(val) = line.strip_prefix("byte_len: ") {
124 byte_len = val.parse().ok();
125 } else if let Some(val) = line.strip_prefix("file_hash: ") {
126 file_hash = Some(val.to_string());
127 }
128 }
129
130 Some(Self {
131 shard_name: shard_name?,
132 event_count: event_count?,
133 byte_len: byte_len?,
134 file_hash: file_hash?,
135 })
136 }
137}
138
139pub struct ShardManager {
154 bones_dir: PathBuf,
156}
157
158impl ShardManager {
159 #[must_use]
164 pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
165 Self {
166 bones_dir: bones_dir.into(),
167 }
168 }
169
170 #[must_use]
172 pub fn events_dir(&self) -> PathBuf {
173 self.bones_dir.join("events")
174 }
175
176 #[must_use]
178 pub fn lock_path(&self) -> PathBuf {
179 self.bones_dir.join("lock")
180 }
181
182 #[must_use]
184 pub fn clock_path(&self) -> PathBuf {
185 self.bones_dir.join("cache").join("clock")
186 }
187
188 #[must_use]
190 pub fn shard_filename(year: i32, month: u32) -> String {
191 format!("{year:04}-{month:02}.events")
192 }
193
194 #[must_use]
196 pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
197 self.events_dir().join(Self::shard_filename(year, month))
198 }
199
200 #[must_use]
202 pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
203 self.events_dir()
204 .join(format!("{year:04}-{month:02}.manifest"))
205 }
206
207 pub fn ensure_dirs(&self) -> Result<(), ShardError> {
218 fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
219 fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
220 Ok(())
221 }
222
223 pub fn init(&self) -> Result<(i32, u32), ShardError> {
233 self.ensure_dirs()?;
234
235 let shards = self.list_shards()?;
236 if shards.is_empty() {
237 let (year, month) = current_year_month();
238 self.create_shard(year, month)?;
239 Ok((year, month))
240 } else if let Some(&(year, month)) = shards.last() {
241 Ok((year, month))
242 } else {
243 unreachable!("shards is non-empty")
244 }
245 }
246
247 pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
260 let events_dir = self.events_dir();
261 if !events_dir.exists() {
262 return Ok(Vec::new());
263 }
264
265 let mut shards = Vec::new();
266 for entry in fs::read_dir(&events_dir)? {
267 let entry = entry?;
268 let name = entry.file_name();
269 let name_str = name.to_string_lossy();
270 if let Some(ym) = parse_shard_filename(&name_str) {
271 shards.push(ym);
272 }
273 }
274 shards.sort_unstable();
275 Ok(shards)
276 }
277
278 pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
284 let shards = self.list_shards()?;
285 Ok(shards.last().copied())
286 }
287
288 pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
301 let path = self.shard_path(year, month);
302 if path.exists() {
303 return Ok(path);
304 }
305
306 let header = shard_header();
307 fs::write(&path, header)?;
308 Ok(path)
309 }
310
311 pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
320 let (current_year, current_month) = current_year_month();
321 let active = self.active_shard()?;
322
323 match active {
324 Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
325 Some((y, m)) => {
326 self.write_manifest(y, m)?;
328 self.create_shard(current_year, current_month)?;
330 Ok((current_year, current_month))
331 }
332 None => {
333 self.create_shard(current_year, current_month)?;
335 Ok((current_year, current_month))
336 }
337 }
338 }
339
340 pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
351 let shard_path = self.shard_path(year, month);
352 let content = fs::read(&shard_path)?;
353 let content_str = String::from_utf8_lossy(&content);
354
355 let event_count = content_str
357 .lines()
358 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
359 .count() as u64;
360
361 let byte_len = content.len() as u64;
362 let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
363
364 let manifest = ShardManifest {
365 shard_name: Self::shard_filename(year, month),
366 event_count,
367 byte_len,
368 file_hash,
369 };
370
371 let manifest_path = self.manifest_path(year, month);
372 fs::write(&manifest_path, manifest.to_string_repr())?;
373
374 Ok(manifest)
375 }
376
377 pub fn read_manifest(
384 &self,
385 year: i32,
386 month: u32,
387 ) -> Result<Option<ShardManifest>, ShardError> {
388 let manifest_path = self.manifest_path(year, month);
389 if !manifest_path.exists() {
390 return Ok(None);
391 }
392 let content = fs::read_to_string(&manifest_path)?;
393 Ok(ShardManifest::from_string_repr(&content))
394 }
395
396 pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
414 let shards = self.list_shards()?;
415 let mut issues = Vec::new();
416
417 let sealed = if shards.len() > 1 {
419 &shards[..shards.len() - 1]
420 } else {
421 return Ok(issues);
422 };
423
424 for &(year, month) in sealed {
425 let shard_path = self.shard_path(year, month);
426 let shard_name = Self::shard_filename(year, month);
427
428 let Some(manifest) = self.read_manifest(year, month)? else {
429 tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
430 issues.push(ShardIntegrityIssue {
431 shard_name: shard_name.clone(),
432 problem: "sealed shard has no manifest file".into(),
433 });
434 continue;
435 };
436
437 let file_len = fs::metadata(&shard_path)?.len();
438 if file_len != manifest.byte_len {
439 tracing::error!(
440 shard = %shard_name,
441 expected = manifest.byte_len,
442 actual = file_len,
443 "sealed shard byte length mismatch"
444 );
445 issues.push(ShardIntegrityIssue {
446 shard_name,
447 problem: format!(
448 "byte length mismatch: manifest says {} bytes, file is {} bytes",
449 manifest.byte_len, file_len
450 ),
451 });
452 }
453 }
454
455 Ok(issues)
456 }
457
458 pub fn append(
481 &self,
482 line: &str,
483 durable: bool,
484 lock_timeout: Duration,
485 ) -> Result<i64, ShardError> {
486 self.ensure_dirs()?;
487
488 let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
489
490 let (year, month) = self.rotate_if_needed()?;
492 let shard_path = self.shard_path(year, month);
493
494 if shard_path.exists() {
496 validate_shard_header(&shard_path)?;
497 }
498
499 let ts = self.next_timestamp()?;
501
502 let mut file = OpenOptions::new()
504 .create(true)
505 .append(true)
506 .open(&shard_path)?;
507
508 file.write_all(line.as_bytes())?;
509 file.flush()?;
510
511 if durable {
512 file.sync_data()?;
513 }
514
515 Ok(ts)
516 }
517
518 pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
527 let shard_path = self.shard_path(year, month);
528
529 let mut file = OpenOptions::new()
530 .create(true)
531 .append(true)
532 .open(&shard_path)?;
533
534 file.write_all(line.as_bytes())?;
535 file.flush()?;
536 Ok(())
537 }
538
539 pub fn read_clock(&self) -> Result<i64, ShardError> {
552 let path = self.clock_path();
553 if !path.exists() {
554 return Ok(0);
555 }
556 let content = fs::read_to_string(&path)?;
557 Ok(content.trim().parse::<i64>().unwrap_or(0))
558 }
559
560 pub fn next_timestamp(&self) -> Result<i64, ShardError> {
571 let last = self.read_clock()?;
572 let now = system_time_us();
573 let next = std::cmp::max(now, last + 1);
574 self.write_clock(next)?;
575 Ok(next)
576 }
577
578 fn write_clock(&self, value: i64) -> Result<(), ShardError> {
580 let path = self.clock_path();
581 if let Some(parent) = path.parent() {
582 fs::create_dir_all(parent)?;
583 }
584 fs::write(&path, value.to_string())?;
585 Ok(())
586 }
587
588 pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
607 let Some(active) = self.active_shard()? else {
608 return Ok(None);
609 };
610
611 let shard_path = self.shard_path(active.0, active.1);
612 recover_shard_torn_write(&shard_path)
613 }
614
615 pub fn replay(&self) -> Result<String, ShardError> {
628 let shards = self.list_shards()?;
629 let mut content = String::new();
630
631 for (year, month) in shards {
632 let path = self.shard_path(year, month);
633 let shard_content = fs::read_to_string(&path)?;
634 content.push_str(&shard_content);
635 }
636
637 Ok(content)
638 }
639
640 pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
646 let path = self.shard_path(year, month);
647 Ok(fs::read_to_string(&path)?)
648 }
649
650 pub fn total_content_len(&self) -> Result<usize, ShardError> {
660 let shards = self.list_shards()?;
661 let mut total = 0usize;
662 for (year, month) in shards {
663 let path = self.shard_path(year, month);
664 let meta = fs::metadata(&path)?;
665 total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
666 }
667 Ok(total)
668 }
669
670 pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
687 let shards = self.list_shards()?;
688 let mut cumulative: usize = 0;
689 let mut result = String::new();
690 let mut found_start = false;
691
692 for (year, month) in shards {
693 let path = self.shard_path(year, month);
694 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
695
696 let shard_end = cumulative.saturating_add(shard_len);
697
698 if shard_end <= offset {
699 cumulative = shard_end;
701 continue;
702 }
703
704 let shard_content = fs::read_to_string(&path)?;
706
707 if found_start {
708 result.push_str(&shard_content);
709 } else {
710 let within = offset.saturating_sub(cumulative);
712 let within = within.min(shard_content.len());
714 let within = snap_to_char_boundary(&shard_content, within);
716 result.push_str(&shard_content[within..]);
717 found_start = true;
718 }
719
720 cumulative = shard_end;
721 }
722
723 Ok((result, cumulative))
724 }
725
726 pub fn read_content_range(
738 &self,
739 start_offset: usize,
740 end_offset: usize,
741 ) -> Result<String, ShardError> {
742 if start_offset >= end_offset {
743 return Ok(String::new());
744 }
745
746 let shards = self.list_shards()?;
747 let mut cumulative: usize = 0;
748 let mut result = String::new();
749
750 for (year, month) in shards {
751 let path = self.shard_path(year, month);
752 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
753 let shard_end = cumulative.saturating_add(shard_len);
754
755 if shard_end <= start_offset {
756 cumulative = shard_end;
758 continue;
759 }
760
761 if cumulative >= end_offset {
762 break;
764 }
765
766 let shard_content = fs::read_to_string(&path)?;
767
768 let within_start = if cumulative < start_offset {
770 (start_offset - cumulative).min(shard_content.len())
771 } else {
772 0
773 };
774 let within_end = if shard_end > end_offset {
775 (end_offset - cumulative).min(shard_content.len())
776 } else {
777 shard_content.len()
778 };
779
780 let within_start = snap_to_char_boundary(&shard_content, within_start);
782 let within_end = snap_to_char_boundary(&shard_content, within_end);
783 if within_start < within_end {
784 result.push_str(&shard_content[within_start..within_end]);
785 }
786 cumulative = shard_end;
787 }
788
789 Ok(result)
790 }
791
792 pub fn event_count(&self) -> Result<u64, ShardError> {
798 let content = self.replay()?;
799 let count = content
800 .lines()
801 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
802 .count();
803 Ok(count as u64)
804 }
805
806 pub fn replay_lines(
814 &self,
815 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
816 self.replay_lines_from_offset(0)
817 }
818
819 pub fn replay_lines_from_offset(
827 &self,
828 offset: usize,
829 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
830 let shards = self.list_shards()?;
831 let bones_dir = self.bones_dir.clone();
832
833 Ok(ShardLineIterator {
834 shards,
835 current_shard_idx: 0,
836 current_reader: None,
837 cumulative_offset: 0,
838 bones_dir,
839 }
840 .skip_to_offset(offset))
841 }
842
843 pub fn is_empty(&self) -> Result<bool, ShardError> {
849 let shards = self.list_shards()?;
850 Ok(shards.is_empty())
851 }
852}
853
854struct ShardLineIterator {
859 shards: Vec<(i32, u32)>,
860 current_shard_idx: usize,
861 current_reader: Option<io::BufReader<fs::File>>,
862 cumulative_offset: usize,
863 bones_dir: PathBuf,
864}
865
866impl ShardLineIterator {
867 fn skip_to_offset(mut self, offset: usize) -> Self {
868 while self.current_shard_idx < self.shards.len() {
870 let (year, month) = self.shards[self.current_shard_idx];
871 let shard_path = self
872 .bones_dir
873 .join("events")
874 .join(ShardManager::shard_filename(year, month));
875 if let Ok(meta) = fs::metadata(shard_path) {
876 let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
877 if self.cumulative_offset + shard_len <= offset {
878 self.cumulative_offset += shard_len;
879 self.current_shard_idx += 1;
880 continue;
881 }
882 }
883 break;
884 }
885
886 self.cumulative_offset = offset;
891 self
892 }
893}
894
895impl Iterator for ShardLineIterator {
896 type Item = io::Result<(usize, String)>;
897
898 fn next(&mut self) -> Option<Self::Item> {
899 use std::io::{BufRead, Seek, SeekFrom};
900
901 loop {
902 if self.current_reader.is_none() {
903 if self.current_shard_idx >= self.shards.len() {
904 return None;
905 }
906
907 let (year, month) = self.shards[self.current_shard_idx];
908 let shard_path = self
909 .bones_dir
910 .join("events")
911 .join(ShardManager::shard_filename(year, month));
912
913 if is_forwarding_pointer(&shard_path) {
917 tracing::warn!(
918 shard = %shard_path.display(),
919 "skipping legacy forwarding-pointer shard during replay"
920 );
921 if let Ok(meta) = fs::metadata(&shard_path) {
922 self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
923 }
924 self.current_shard_idx += 1;
925 continue;
926 }
927
928 if let Err(e) = validate_shard_header(&shard_path) {
930 tracing::error!(
931 shard = %shard_path.display(),
932 error = %e,
933 "shard header validation failed"
934 );
935 return Some(Err(io::Error::new(
936 io::ErrorKind::InvalidData,
937 e.to_string(),
938 )));
939 }
940
941 let mut file = match fs::File::open(shard_path) {
942 Ok(f) => f,
943 Err(e) => return Some(Err(e)),
944 };
945
946 let mut cumulative_before = 0;
948 for i in 0..self.current_shard_idx {
949 let (y, m) = self.shards[i];
950 let p = self
951 .bones_dir
952 .join("events")
953 .join(ShardManager::shard_filename(y, m));
954 if let Ok(meta) = fs::metadata(p) {
955 cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
956 }
957 }
958
959 if self.cumulative_offset > cumulative_before {
960 let within = self.cumulative_offset - cumulative_before;
961 if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
962 return Some(Err(e));
963 }
964 }
965
966 self.current_reader = Some(io::BufReader::new(file));
967 }
968
969 let reader = self
970 .current_reader
971 .as_mut()
972 .expect("reader was just set above");
973 let mut line = String::new();
974 let offset = self.cumulative_offset;
975
976 match reader.read_line(&mut line) {
977 Ok(0) => {
978 self.current_reader = None;
980 self.current_shard_idx += 1;
981 }
982 Ok(n) => {
983 self.cumulative_offset += n;
984 return Some(Ok((offset, line)));
985 }
986 Err(e) => return Some(Err(e)),
987 }
988 }
989 }
990}
991
992const fn snap_to_char_boundary(s: &str, offset: usize) -> usize {
1002 if offset >= s.len() {
1003 return s.len();
1004 }
1005 let mut pos = offset;
1007 while pos < s.len() && !s.is_char_boundary(pos) {
1008 pos += 1;
1009 }
1010 pos
1011}
1012
1013#[must_use]
1015fn current_year_month() -> (i32, u32) {
1016 let now = chrono::Utc::now();
1017 (now.year(), now.month())
1018}
1019
1020#[allow(clippy::cast_possible_truncation)]
1022#[must_use]
1023fn system_time_us() -> i64 {
1024 SystemTime::now()
1025 .duration_since(UNIX_EPOCH)
1026 .map(|d| d.as_micros() as i64)
1027 .unwrap_or(0)
1028}
1029
1030fn is_forwarding_pointer(path: &Path) -> bool {
1037 let Ok(meta) = fs::metadata(path) else {
1038 return false;
1039 };
1040 if meta.len() > 30 {
1041 return false;
1042 }
1043 let Ok(content) = fs::read_to_string(path) else {
1044 return false;
1045 };
1046 parse_shard_filename(content.trim()).is_some()
1047}
1048
1049pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
1060 use crate::event::writer::SHARD_HEADER;
1061 use std::io::{BufRead, BufReader};
1062
1063 if is_forwarding_pointer(path) {
1065 return Ok(());
1066 }
1067
1068 let file = fs::File::open(path)?;
1069 let mut reader = BufReader::new(file);
1070 let mut first_line = String::new();
1071 let n = reader.read_line(&mut first_line)?;
1072
1073 if n == 0 {
1075 return Ok(());
1076 }
1077
1078 let trimmed = first_line.trim_end();
1079 if trimmed != SHARD_HEADER {
1080 return Err(ShardError::CorruptedShard {
1081 path: path.to_path_buf(),
1082 reason: format!(
1083 "expected header '{}', found '{}'",
1084 SHARD_HEADER,
1085 &trimmed[..trimmed.len().min(80)]
1086 ),
1087 });
1088 }
1089
1090 Ok(())
1091}
1092
1093fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
1095 let stem = name.strip_suffix(".events")?;
1096 if stem == "current" {
1098 return None;
1099 }
1100 let (year_str, month_str) = stem.split_once('-')?;
1101 let year: i32 = year_str.parse().ok()?;
1102 let month: u32 = month_str.parse().ok()?;
1103 if !(1..=12).contains(&month) {
1104 return None;
1105 }
1106 Some((year, month))
1107}
1108
1109fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
1114 let metadata = fs::metadata(path)?;
1115 let file_len = metadata.len();
1116 if file_len == 0 {
1117 return Ok(None);
1118 }
1119
1120 let content = fs::read(path)?;
1121
1122 let last_newline = content.iter().rposition(|&b| b == b'\n');
1124
1125 if let Some(pos) = last_newline {
1126 let expected_len = (pos + 1) as u64;
1127 if expected_len < file_len {
1128 let truncated = file_len - expected_len;
1130 let file = OpenOptions::new().write(true).open(path)?;
1131 file.set_len(expected_len)?;
1132 Ok(Some(truncated))
1133 } else {
1134 Ok(None)
1136 }
1137 } else {
1138 let file = OpenOptions::new().write(true).open(path)?;
1141 file.set_len(0)?;
1142 Ok(Some(file_len))
1143 }
1144}
1145
1146#[cfg(test)]
1151mod tests {
1152 use super::*;
1153 use tempfile::TempDir;
1154
1155 #[test]
1156 fn snap_to_char_boundary_ascii() {
1157 let s = "hello world";
1158 assert_eq!(snap_to_char_boundary(s, 0), 0);
1159 assert_eq!(snap_to_char_boundary(s, 5), 5);
1160 assert_eq!(snap_to_char_boundary(s, 11), 11); assert_eq!(snap_to_char_boundary(s, 100), 11); }
1163
1164 #[test]
1165 fn snap_to_char_boundary_emoji() {
1166 let s = "ab✅cd🎉ef";
1168 assert_eq!(snap_to_char_boundary(s, 2), 2); assert_eq!(snap_to_char_boundary(s, 3), 5); assert_eq!(snap_to_char_boundary(s, 4), 5); assert_eq!(snap_to_char_boundary(s, 5), 5); assert_eq!(snap_to_char_boundary(s, 8), 11); assert_eq!(snap_to_char_boundary(s, 9), 11);
1175 assert_eq!(snap_to_char_boundary(s, 10), 11);
1176 }
1177
1178 fn setup() -> (TempDir, ShardManager) {
1179 let tmp = TempDir::new().expect("tempdir");
1180 let bones_dir = tmp.path().join(".bones");
1181 let mgr = ShardManager::new(&bones_dir);
1182 (tmp, mgr)
1183 }
1184
1185 #[test]
1190 fn parse_valid_shard_filenames() {
1191 assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1192 assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1193 assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1194 }
1195
1196 #[test]
1197 fn parse_invalid_shard_filenames() {
1198 assert_eq!(parse_shard_filename("current.events"), None);
1199 assert_eq!(parse_shard_filename("2026-13.events"), None); assert_eq!(parse_shard_filename("2026-00.events"), None); assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1202 assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1203 assert_eq!(parse_shard_filename(""), None);
1204 }
1205
1206 #[test]
1211 fn shard_manager_paths() {
1212 let mgr = ShardManager::new("/repo/.bones");
1213 assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1214 assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1215 assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1216 assert_eq!(
1217 mgr.shard_path(2026, 2),
1218 PathBuf::from("/repo/.bones/events/2026-02.events")
1219 );
1220 assert_eq!(
1221 mgr.manifest_path(2026, 1),
1222 PathBuf::from("/repo/.bones/events/2026-01.manifest")
1223 );
1224 }
1225
1226 #[test]
1227 fn shard_filename_format() {
1228 assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1229 assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1230 assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1231 }
1232
1233 #[test]
1238 fn ensure_dirs_creates_directories() {
1239 let (_tmp, mgr) = setup();
1240 mgr.ensure_dirs().expect("should create dirs");
1241 assert!(mgr.events_dir().exists());
1242 assert!(mgr.bones_dir.join("cache").exists());
1243 }
1244
1245 #[test]
1246 fn ensure_dirs_is_idempotent() {
1247 let (_tmp, mgr) = setup();
1248 mgr.ensure_dirs().expect("first");
1249 mgr.ensure_dirs().expect("second");
1250 assert!(mgr.events_dir().exists());
1251 }
1252
1253 #[test]
1254 fn init_creates_first_shard() {
1255 let (_tmp, mgr) = setup();
1256 let (year, month) = mgr.init().expect("init");
1257
1258 let (expected_year, expected_month) = current_year_month();
1259 assert_eq!(year, expected_year);
1260 assert_eq!(month, expected_month);
1261
1262 let shard_path = mgr.shard_path(year, month);
1264 assert!(shard_path.exists());
1265 let content = fs::read_to_string(&shard_path).expect("read");
1266 assert!(content.starts_with("# bones event log v1"));
1267 }
1268
1269 #[test]
1270 fn init_is_idempotent() {
1271 let (_tmp, mgr) = setup();
1272 let first = mgr.init().expect("first");
1273 let second = mgr.init().expect("second");
1274 assert_eq!(first, second);
1275 }
1276
1277 #[test]
1282 fn list_shards_empty() {
1283 let (_tmp, mgr) = setup();
1284 mgr.ensure_dirs().expect("dirs");
1285 let shards = mgr.list_shards().expect("list");
1286 assert!(shards.is_empty());
1287 }
1288
1289 #[test]
1290 fn list_shards_returns_sorted() {
1291 let (_tmp, mgr) = setup();
1292 mgr.ensure_dirs().expect("dirs");
1293
1294 mgr.create_shard(2026, 3).expect("create");
1296 mgr.create_shard(2026, 1).expect("create");
1297 mgr.create_shard(2026, 2).expect("create");
1298
1299 let shards = mgr.list_shards().expect("list");
1300 assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1301 }
1302
1303 #[test]
1304 fn list_shards_skips_non_shard_files() {
1305 let (_tmp, mgr) = setup();
1306 mgr.ensure_dirs().expect("dirs");
1307 mgr.create_shard(2026, 1).expect("create");
1308
1309 fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1311 fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1312
1313 let shards = mgr.list_shards().expect("list");
1314 assert_eq!(shards, vec![(2026, 1)]);
1315 }
1316
1317 #[test]
1318 fn list_shards_no_events_dir() {
1319 let (_tmp, mgr) = setup();
1320 let shards = mgr.list_shards().expect("list");
1322 assert!(shards.is_empty());
1323 }
1324
1325 #[test]
1330 fn create_shard_writes_header() {
1331 let (_tmp, mgr) = setup();
1332 mgr.ensure_dirs().expect("dirs");
1333 let path = mgr.create_shard(2026, 2).expect("create");
1334
1335 let content = fs::read_to_string(&path).expect("read");
1336 assert!(content.starts_with("# bones event log v1"));
1337 assert!(content.contains("# fields:"));
1338 assert_eq!(content.lines().count(), 2);
1339 }
1340
1341 #[test]
1342 fn create_shard_idempotent() {
1343 let (_tmp, mgr) = setup();
1344 mgr.ensure_dirs().expect("dirs");
1345 let p1 = mgr.create_shard(2026, 2).expect("first");
1346 fs::write(&p1, "modified").expect("write");
1348 let p2 = mgr.create_shard(2026, 2).expect("second");
1350 assert_eq!(p1, p2);
1351 let content = fs::read_to_string(&p2).expect("read");
1352 assert_eq!(content, "modified");
1353 }
1354
1355 #[test]
1360 fn clock_starts_at_zero() {
1361 let (_tmp, mgr) = setup();
1362 mgr.ensure_dirs().expect("dirs");
1363 let ts = mgr.read_clock().expect("read");
1364 assert_eq!(ts, 0);
1365 }
1366
1367 #[test]
1368 fn clock_is_monotonic() {
1369 let (_tmp, mgr) = setup();
1370 mgr.ensure_dirs().expect("dirs");
1371 let t1 = mgr.next_timestamp().expect("t1");
1372 let t2 = mgr.next_timestamp().expect("t2");
1373 let t3 = mgr.next_timestamp().expect("t3");
1374 assert!(t2 > t1);
1375 assert!(t3 > t2);
1376 }
1377
1378 #[test]
1379 fn clock_reads_back_written_value() {
1380 let (_tmp, mgr) = setup();
1381 mgr.ensure_dirs().expect("dirs");
1382 mgr.write_clock(42_000_000).expect("write");
1383 let ts = mgr.read_clock().expect("read");
1384 assert_eq!(ts, 42_000_000);
1385 }
1386
1387 #[test]
1388 fn clock_never_goes_backward() {
1389 let (_tmp, mgr) = setup();
1390 mgr.ensure_dirs().expect("dirs");
1391
1392 let future = system_time_us() + 10_000_000;
1394 mgr.write_clock(future).expect("write");
1395
1396 let next = mgr.next_timestamp().expect("next");
1397 assert!(next > future, "clock should advance past future value");
1398 }
1399
1400 #[test]
1405 fn append_raw_adds_line() {
1406 let (_tmp, mgr) = setup();
1407 mgr.ensure_dirs().expect("dirs");
1408 mgr.create_shard(2026, 2).expect("create");
1409
1410 mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1411 mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1412
1413 let content = mgr.read_shard(2026, 2).expect("read");
1414 assert!(content.contains("event line 1"));
1415 assert!(content.contains("event line 2"));
1416 }
1417
1418 #[test]
1419 fn append_with_lock() {
1420 let (_tmp, mgr) = setup();
1421 mgr.init().expect("init");
1422
1423 let _ts = mgr
1424 .append("test event line\n", false, Duration::from_secs(1))
1425 .expect("append");
1426
1427 let content = mgr.replay().expect("replay");
1428 assert!(content.contains("test event line"));
1429 }
1430
1431 #[test]
1432 fn append_returns_monotonic_timestamps() {
1433 let (_tmp, mgr) = setup();
1434 mgr.init().expect("init");
1435
1436 let t1 = mgr
1437 .append("line1\n", false, Duration::from_secs(1))
1438 .expect("t1");
1439 let t2 = mgr
1440 .append("line2\n", false, Duration::from_secs(1))
1441 .expect("t2");
1442
1443 assert!(t2 > t1);
1444 }
1445
1446 #[test]
1451 fn recover_clean_file() {
1452 let (_tmp, mgr) = setup();
1453 mgr.init().expect("init");
1454
1455 let (y, m) = current_year_month();
1456 mgr.append_raw(y, m, "complete line\n").expect("append");
1457
1458 let recovered = mgr.recover_torn_writes().expect("recover");
1459 assert_eq!(recovered, None);
1460 }
1461
1462 #[test]
1463 fn recover_torn_write_truncates() {
1464 let (_tmp, mgr) = setup();
1465 let (y, m) = mgr.init().expect("init");
1466 let shard_path = mgr.shard_path(y, m);
1467
1468 {
1470 let mut f = OpenOptions::new()
1471 .append(true)
1472 .open(&shard_path)
1473 .expect("open");
1474 f.write_all(b"complete line\npartial line without newline")
1475 .expect("write");
1476 f.flush().expect("flush");
1477 }
1478
1479 let recovered = mgr.recover_torn_writes().expect("recover");
1480 assert!(recovered.is_some());
1481
1482 let truncated = recovered.expect("checked is_some");
1483 assert_eq!(truncated, "partial line without newline".len() as u64);
1484
1485 let content = fs::read_to_string(&shard_path).expect("read");
1487 assert!(content.ends_with('\n'));
1488 assert!(content.contains("complete line"));
1489 assert!(!content.contains("partial line without newline"));
1490 }
1491
1492 #[test]
1493 fn recover_no_newline_at_all() {
1494 let (_tmp, mgr) = setup();
1495 let (y, m) = mgr.init().expect("init");
1496 let shard_path = mgr.shard_path(y, m);
1497
1498 fs::write(&shard_path, "no newlines here").expect("write");
1500
1501 let recovered = mgr.recover_torn_writes().expect("recover");
1502 assert_eq!(recovered, Some("no newlines here".len() as u64));
1503
1504 let content = fs::read_to_string(&shard_path).expect("read");
1506 assert!(content.is_empty());
1507 }
1508
1509 #[test]
1510 fn recover_empty_file() {
1511 let (_tmp, mgr) = setup();
1512 let (y, m) = mgr.init().expect("init");
1513 let shard_path = mgr.shard_path(y, m);
1514
1515 fs::write(&shard_path, "").expect("write");
1517
1518 let recovered = mgr.recover_torn_writes().expect("recover");
1519 assert_eq!(recovered, None);
1520 }
1521
1522 #[test]
1523 fn recover_no_active_shard() {
1524 let (_tmp, mgr) = setup();
1525 mgr.ensure_dirs().expect("dirs");
1526
1527 let recovered = mgr.recover_torn_writes().expect("recover");
1528 assert_eq!(recovered, None);
1529 }
1530
1531 #[test]
1536 fn replay_empty_repo() {
1537 let (_tmp, mgr) = setup();
1538 mgr.ensure_dirs().expect("dirs");
1539 let content = mgr.replay().expect("replay");
1540 assert!(content.is_empty());
1541 }
1542
1543 #[test]
1544 fn replay_single_shard() {
1545 let (_tmp, mgr) = setup();
1546 mgr.ensure_dirs().expect("dirs");
1547 mgr.create_shard(2026, 1).expect("create");
1548 mgr.append_raw(2026, 1, "event-a\n").expect("append");
1549
1550 let content = mgr.replay().expect("replay");
1551 assert!(content.contains("event-a"));
1552 }
1553
1554 #[test]
1555 fn replay_multiple_shards_in_order() {
1556 let (_tmp, mgr) = setup();
1557 mgr.ensure_dirs().expect("dirs");
1558
1559 mgr.create_shard(2026, 1).expect("create");
1560 mgr.create_shard(2026, 2).expect("create");
1561 mgr.create_shard(2026, 3).expect("create");
1562
1563 mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1564 mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1565 mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1566
1567 let content = mgr.replay().expect("replay");
1568
1569 let jan_pos = content.find("event-jan").expect("jan");
1571 let feb_pos = content.find("event-feb").expect("feb");
1572 let mar_pos = content.find("event-mar").expect("mar");
1573 assert!(jan_pos < feb_pos);
1574 assert!(feb_pos < mar_pos);
1575 }
1576
1577 #[test]
1582 fn event_count_empty() {
1583 let (_tmp, mgr) = setup();
1584 mgr.ensure_dirs().expect("dirs");
1585 assert_eq!(mgr.event_count().expect("count"), 0);
1586 }
1587
1588 #[test]
1589 fn event_count_excludes_comments_and_blanks() {
1590 let (_tmp, mgr) = setup();
1591 mgr.ensure_dirs().expect("dirs");
1592 mgr.create_shard(2026, 1).expect("create");
1593 mgr.append_raw(2026, 1, "event1\n").expect("append");
1595 mgr.append_raw(2026, 1, "event2\n").expect("append");
1596 mgr.append_raw(2026, 1, "\n").expect("blank");
1597
1598 assert_eq!(mgr.event_count().expect("count"), 2);
1599 }
1600
1601 #[test]
1606 fn is_empty_no_shards() {
1607 let (_tmp, mgr) = setup();
1608 mgr.ensure_dirs().expect("dirs");
1609 assert!(mgr.is_empty().expect("empty"));
1610 }
1611
1612 #[test]
1613 fn is_empty_with_shards() {
1614 let (_tmp, mgr) = setup();
1615 mgr.init().expect("init");
1616 assert!(!mgr.is_empty().expect("empty"));
1617 }
1618
1619 #[test]
1624 fn write_and_read_manifest() {
1625 let (_tmp, mgr) = setup();
1626 mgr.ensure_dirs().expect("dirs");
1627 mgr.create_shard(2026, 1).expect("create");
1628 mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1629 mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1630
1631 let written = mgr.write_manifest(2026, 1).expect("write manifest");
1632 assert_eq!(written.shard_name, "2026-01.events");
1633 assert_eq!(written.event_count, 2);
1634 assert!(written.byte_len > 0);
1635 assert!(written.file_hash.starts_with("blake3:"));
1636
1637 let read = mgr
1638 .read_manifest(2026, 1)
1639 .expect("read")
1640 .expect("should exist");
1641 assert_eq!(read, written);
1642 }
1643
1644 #[test]
1645 fn manifest_roundtrip() {
1646 let manifest = ShardManifest {
1647 shard_name: "2026-01.events".into(),
1648 event_count: 42,
1649 byte_len: 12345,
1650 file_hash: "blake3:abcdef0123456789".into(),
1651 };
1652
1653 let repr = manifest.to_string_repr();
1654 let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1655 assert_eq!(parsed, manifest);
1656 }
1657
1658 #[test]
1659 fn read_manifest_missing() {
1660 let (_tmp, mgr) = setup();
1661 mgr.ensure_dirs().expect("dirs");
1662 let result = mgr.read_manifest(2026, 1).expect("read");
1663 assert!(result.is_none());
1664 }
1665
1666 #[test]
1667 fn manifest_event_count_excludes_comments() {
1668 let (_tmp, mgr) = setup();
1669 mgr.ensure_dirs().expect("dirs");
1670 mgr.create_shard(2026, 1).expect("create");
1671 mgr.append_raw(2026, 1, "event1\n").expect("append");
1673
1674 let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1675 assert_eq!(manifest.event_count, 1);
1677 }
1678
1679 #[test]
1684 fn rotate_creates_shard_if_none_exist() {
1685 let (_tmp, mgr) = setup();
1686 mgr.ensure_dirs().expect("dirs");
1687
1688 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1689 let (ey, em) = current_year_month();
1690 assert_eq!((y, m), (ey, em));
1691
1692 assert!(mgr.shard_path(y, m).exists());
1693 }
1694
1695 #[test]
1696 fn rotate_no_op_same_month() {
1697 let (_tmp, mgr) = setup();
1698 let (y, m) = mgr.init().expect("init");
1699
1700 let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1701 assert_eq!((y, m), (y2, m2));
1702 }
1703
1704 #[test]
1705 fn rotate_different_month_seals_and_creates() {
1706 let (_tmp, mgr) = setup();
1707 mgr.ensure_dirs().expect("dirs");
1708
1709 mgr.create_shard(2025, 11).expect("create");
1711 mgr.append_raw(2025, 11, "old event\n").expect("append");
1712
1713 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1715 let (ey, em) = current_year_month();
1716 assert_eq!((y, m), (ey, em));
1717
1718 assert!(mgr.manifest_path(2025, 11).exists());
1720
1721 assert!(mgr.shard_path(ey, em).exists());
1723 }
1724
1725 #[test]
1730 fn frozen_shard_not_modified_by_append() {
1731 let (_tmp, mgr) = setup();
1732 mgr.ensure_dirs().expect("dirs");
1733
1734 mgr.create_shard(2025, 6).expect("create");
1736 mgr.append_raw(2025, 6, "old event\n").expect("append");
1737 let old_content = mgr.read_shard(2025, 6).expect("read");
1738
1739 mgr.init().expect("init");
1741
1742 mgr.append("new event\n", false, Duration::from_secs(1))
1744 .expect("append");
1745
1746 let after_content = mgr.read_shard(2025, 6).expect("read");
1748 assert_eq!(old_content, after_content);
1749 }
1750
1751 #[test]
1756 fn system_time_us_is_positive() {
1757 let ts = system_time_us();
1758 assert!(ts > 0, "system time should be positive: {ts}");
1759 }
1760
1761 #[test]
1762 fn system_time_us_is_reasonable() {
1763 let ts = system_time_us();
1764 let jan_2020_us: i64 = 1_577_836_800_000_000;
1766 assert!(ts > jan_2020_us, "system time too small: {ts}");
1767 }
1768
1769 #[test]
1774 fn total_content_len_empty_repo() {
1775 let (_tmp, mgr) = setup();
1776 mgr.ensure_dirs().expect("dirs");
1777 let len = mgr.total_content_len().expect("len");
1778 assert_eq!(len, 0);
1779 }
1780
1781 #[test]
1782 fn total_content_len_single_shard() {
1783 let (_tmp, mgr) = setup();
1784 mgr.ensure_dirs().expect("dirs");
1785 mgr.create_shard(2026, 1).expect("create");
1786 mgr.append_raw(2026, 1, "line1\n").expect("append");
1787 mgr.append_raw(2026, 1, "line2\n").expect("append");
1788
1789 let full = mgr.replay().expect("replay");
1790 let len = mgr.total_content_len().expect("len");
1791 assert_eq!(len, full.len());
1792 }
1793
1794 #[test]
1795 fn total_content_len_multiple_shards() {
1796 let (_tmp, mgr) = setup();
1797 mgr.ensure_dirs().expect("dirs");
1798 mgr.create_shard(2026, 1).expect("shard 1");
1799 mgr.create_shard(2026, 2).expect("shard 2");
1800 mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1801 mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1802
1803 let full = mgr.replay().expect("replay");
1804 let len = mgr.total_content_len().expect("len");
1805 assert_eq!(len, full.len(), "total_content_len must match replay len");
1806 }
1807
1808 #[test]
1813 fn read_content_range_empty_range() {
1814 let (_tmp, mgr) = setup();
1815 mgr.ensure_dirs().expect("dirs");
1816 mgr.create_shard(2026, 1).expect("create");
1817 mgr.append_raw(2026, 1, "event\n").expect("append");
1818
1819 let result = mgr.read_content_range(5, 5).expect("range");
1820 assert!(result.is_empty());
1821 }
1822
1823 #[test]
1824 fn read_content_range_within_single_shard() {
1825 let (_tmp, mgr) = setup();
1826 mgr.ensure_dirs().expect("dirs");
1827 mgr.create_shard(2026, 1).expect("create");
1828 mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1830
1831 let full = mgr.replay().expect("replay");
1832 let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1834 let range = mgr.read_content_range(pos, pos + 7).expect("range");
1835 assert_eq!(range, "ABCDEF\n");
1836 }
1837
1838 #[test]
1839 fn read_content_range_across_shard_boundary() {
1840 let (_tmp, mgr) = setup();
1841 mgr.ensure_dirs().expect("dirs");
1842 mgr.create_shard(2026, 1).expect("shard 1");
1843 mgr.create_shard(2026, 2).expect("shard 2");
1844 mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1845 mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1846
1847 let full = mgr.replay().expect("replay");
1848 let range = mgr.read_content_range(0, full.len()).expect("full range");
1850 assert_eq!(range, full);
1851 }
1852
1853 #[test]
1854 fn read_content_range_beyond_end() {
1855 let (_tmp, mgr) = setup();
1856 mgr.ensure_dirs().expect("dirs");
1857 mgr.create_shard(2026, 1).expect("create");
1858 mgr.append_raw(2026, 1, "event\n").expect("append");
1859
1860 let full = mgr.replay().expect("replay");
1861 let range = mgr
1863 .read_content_range(full.len(), full.len() + 100)
1864 .expect("beyond end");
1865 assert!(range.is_empty());
1866 }
1867
1868 #[test]
1873 fn replay_from_offset_zero_returns_full_content() {
1874 let (_tmp, mgr) = setup();
1875 mgr.ensure_dirs().expect("dirs");
1876 mgr.create_shard(2026, 1).expect("create");
1877 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1878 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1879
1880 let full = mgr.replay().expect("full replay");
1881 let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1882 assert_eq!(from_zero, full);
1883 assert_eq!(total_len, full.len());
1884 }
1885
1886 #[test]
1887 fn replay_from_offset_skips_content_before_cursor() {
1888 let (_tmp, mgr) = setup();
1889 mgr.ensure_dirs().expect("dirs");
1890 mgr.create_shard(2026, 1).expect("create");
1891 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1892 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1893 mgr.append_raw(2026, 1, "event3\n").expect("e3");
1894
1895 let full = mgr.replay().expect("full replay");
1896
1897 let cursor = full.find("event3").expect("event3 in content");
1899 let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1900 assert_eq!(tail, "event3\n");
1901 assert_eq!(total_len, full.len());
1902 }
1903
1904 #[test]
1905 fn replay_from_offset_at_end_returns_empty() {
1906 let (_tmp, mgr) = setup();
1907 mgr.ensure_dirs().expect("dirs");
1908 mgr.create_shard(2026, 1).expect("create");
1909 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1910
1911 let full = mgr.replay().expect("full replay");
1912 let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1913 assert!(tail.is_empty(), "tail should be empty at end of content");
1914 assert_eq!(total_len, full.len());
1915 }
1916
1917 #[test]
1918 fn replay_from_offset_skips_sealed_shards_before_cursor() {
1919 let (_tmp, mgr) = setup();
1920 mgr.ensure_dirs().expect("dirs");
1921
1922 mgr.create_shard(2026, 1).expect("jan");
1924 mgr.create_shard(2026, 2).expect("feb");
1925 mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1926 mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1927 mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1928 mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1929
1930 let full = mgr.replay().expect("full replay");
1931 let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1932
1933 let (tail, total_len) = mgr
1935 .replay_from_offset(jan_shard_len)
1936 .expect("from feb start");
1937 assert!(
1938 !tail.contains("jan-event"),
1939 "jan events should not appear in tail"
1940 );
1941 assert!(tail.contains("feb-event1"), "feb events must be in tail");
1942 assert!(tail.contains("feb-event2"), "feb events must be in tail");
1943 assert_eq!(total_len, full.len());
1944 }
1945
1946 #[test]
1947 fn replay_from_offset_total_len_equals_total_content_len() {
1948 let (_tmp, mgr) = setup();
1949 mgr.ensure_dirs().expect("dirs");
1950 mgr.create_shard(2026, 1).expect("shard 1");
1951 mgr.create_shard(2026, 2).expect("shard 2");
1952 mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1953 mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1954
1955 let total = mgr.total_content_len().expect("total_content_len");
1956 let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1957 assert_eq!(
1958 total, replay_total,
1959 "total_content_len and replay_from_offset total must agree"
1960 );
1961 }
1962
1963 #[test]
1967 fn replay_lines_skips_forwarding_pointer_shard() {
1968 let dir = tempfile::tempdir().expect("tmpdir");
1969 let mgr = ShardManager::new(dir.path());
1970 mgr.ensure_dirs().expect("dirs");
1971
1972 mgr.create_shard(2026, 1).expect("create jan");
1974 mgr.append_raw(2026, 1, "# bones event log v1\n")
1975 .expect("header");
1976 mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
1977
1978 let feb_path = dir.path().join("events").join("2026-02.events");
1980 fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
1981
1982 mgr.create_shard(2026, 3).expect("create mar");
1984 mgr.append_raw(2026, 3, "# bones event log v1\n")
1985 .expect("mar header");
1986 mgr.append_raw(2026, 3, "another-event-line\n")
1987 .expect("mar event");
1988
1989 let lines: Vec<String> = mgr
1990 .replay_lines()
1991 .expect("replay_lines")
1992 .map(|r| r.expect("line").1)
1993 .collect();
1994
1995 assert!(
1997 lines.iter().any(|l| l.contains("real-event-line")),
1998 "jan event missing"
1999 );
2000 assert!(
2001 lines.iter().any(|l| l.contains("another-event-line")),
2002 "mar event missing"
2003 );
2004 assert!(
2005 !lines.iter().any(|l| l.contains("2026-03.events")),
2006 "forwarding pointer content must not appear in replay"
2007 );
2008 }
2009}