1use std::fs::{self, OpenOptions};
30use std::io::{self, Write as IoWrite};
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use chrono::Datelike;
35
36use crate::event::writer::shard_header;
37use crate::lock::ShardLock;
38
39#[derive(Debug, thiserror::Error)]
45pub enum ShardError {
46 #[error("shard I/O error: {0}")]
48 Io(#[from] io::Error),
49
50 #[error("lock error: {0}")]
52 Lock(#[from] crate::lock::LockError),
53
54 #[error("failed to initialize .bones directory: {0}")]
56 InitFailed(io::Error),
57
58 #[error("invalid shard filename: {0}")]
60 InvalidShardName(String),
61
62 #[error("corrupted shard {path}: {reason}")]
64 CorruptedShard {
65 path: PathBuf,
67 reason: String,
69 },
70}
71
72#[derive(Debug, Clone)]
74pub struct ShardIntegrityIssue {
75 pub shard_name: String,
77 pub problem: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardManifest {
88 pub shard_name: String,
90 pub event_count: u64,
92 pub byte_len: u64,
94 pub file_hash: String,
96}
97
98impl ShardManifest {
99 #[must_use]
101 pub fn to_string_repr(&self) -> String {
102 format!(
103 "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
104 self.shard_name, self.event_count, self.byte_len, self.file_hash
105 )
106 }
107
108 #[must_use]
112 pub fn from_string_repr(s: &str) -> Option<Self> {
113 let mut shard_name = None;
114 let mut event_count = None;
115 let mut byte_len = None;
116 let mut file_hash = None;
117
118 for line in s.lines() {
119 if let Some(val) = line.strip_prefix("shard: ") {
120 shard_name = Some(val.to_string());
121 } else if let Some(val) = line.strip_prefix("event_count: ") {
122 event_count = val.parse().ok();
123 } else if let Some(val) = line.strip_prefix("byte_len: ") {
124 byte_len = val.parse().ok();
125 } else if let Some(val) = line.strip_prefix("file_hash: ") {
126 file_hash = Some(val.to_string());
127 }
128 }
129
130 Some(Self {
131 shard_name: shard_name?,
132 event_count: event_count?,
133 byte_len: byte_len?,
134 file_hash: file_hash?,
135 })
136 }
137}
138
139pub struct ShardManager {
154 bones_dir: PathBuf,
156}
157
158impl ShardManager {
159 #[must_use]
164 pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
165 Self {
166 bones_dir: bones_dir.into(),
167 }
168 }
169
170 #[must_use]
172 pub fn events_dir(&self) -> PathBuf {
173 self.bones_dir.join("events")
174 }
175
176 #[must_use]
178 pub fn lock_path(&self) -> PathBuf {
179 self.bones_dir.join("lock")
180 }
181
182 #[must_use]
184 pub fn clock_path(&self) -> PathBuf {
185 self.bones_dir.join("cache").join("clock")
186 }
187
188 #[must_use]
190 pub fn shard_filename(year: i32, month: u32) -> String {
191 format!("{year:04}-{month:02}.events")
192 }
193
194 #[must_use]
196 pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
197 self.events_dir().join(Self::shard_filename(year, month))
198 }
199
200 #[must_use]
202 pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
203 self.events_dir()
204 .join(format!("{year:04}-{month:02}.manifest"))
205 }
206
207 pub fn ensure_dirs(&self) -> Result<(), ShardError> {
218 fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
219 fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
220 Ok(())
221 }
222
223 pub fn init(&self) -> Result<(i32, u32), ShardError> {
233 self.ensure_dirs()?;
234
235 let shards = self.list_shards()?;
236 if shards.is_empty() {
237 let (year, month) = current_year_month();
238 self.create_shard(year, month)?;
239 Ok((year, month))
240 } else if let Some(&(year, month)) = shards.last() {
241 Ok((year, month))
242 } else {
243 unreachable!("shards is non-empty")
244 }
245 }
246
247 pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
260 let events_dir = self.events_dir();
261 if !events_dir.exists() {
262 return Ok(Vec::new());
263 }
264
265 let mut shards = Vec::new();
266 for entry in fs::read_dir(&events_dir)? {
267 let entry = entry?;
268 let name = entry.file_name();
269 let name_str = name.to_string_lossy();
270 if let Some(ym) = parse_shard_filename(&name_str) {
271 shards.push(ym);
272 }
273 }
274 shards.sort_unstable();
275 Ok(shards)
276 }
277
278 pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
284 let shards = self.list_shards()?;
285 Ok(shards.last().copied())
286 }
287
288 pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
301 let path = self.shard_path(year, month);
302 if path.exists() {
303 return Ok(path);
304 }
305
306 let header = shard_header();
307 fs::write(&path, header)?;
308 Ok(path)
309 }
310
311 pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
320 let (current_year, current_month) = current_year_month();
321 let active = self.active_shard()?;
322
323 match active {
324 Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
325 Some((y, m)) => {
326 self.write_manifest(y, m)?;
328 self.create_shard(current_year, current_month)?;
330 Ok((current_year, current_month))
331 }
332 None => {
333 self.create_shard(current_year, current_month)?;
335 Ok((current_year, current_month))
336 }
337 }
338 }
339
340 pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
351 let shard_path = self.shard_path(year, month);
352 let content = fs::read(&shard_path)?;
353 let content_str = String::from_utf8_lossy(&content);
354
355 let event_count = content_str
357 .lines()
358 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
359 .count() as u64;
360
361 let byte_len = content.len() as u64;
362 let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
363
364 let manifest = ShardManifest {
365 shard_name: Self::shard_filename(year, month),
366 event_count,
367 byte_len,
368 file_hash,
369 };
370
371 let manifest_path = self.manifest_path(year, month);
372 fs::write(&manifest_path, manifest.to_string_repr())?;
373
374 Ok(manifest)
375 }
376
377 pub fn read_manifest(
384 &self,
385 year: i32,
386 month: u32,
387 ) -> Result<Option<ShardManifest>, ShardError> {
388 let manifest_path = self.manifest_path(year, month);
389 if !manifest_path.exists() {
390 return Ok(None);
391 }
392 let content = fs::read_to_string(&manifest_path)?;
393 Ok(ShardManifest::from_string_repr(&content))
394 }
395
396 pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
414 let shards = self.list_shards()?;
415 let mut issues = Vec::new();
416
417 let sealed = if shards.len() > 1 {
419 &shards[..shards.len() - 1]
420 } else {
421 return Ok(issues);
422 };
423
424 for &(year, month) in sealed {
425 let shard_path = self.shard_path(year, month);
426 let shard_name = Self::shard_filename(year, month);
427
428 let Some(manifest) = self.read_manifest(year, month)? else {
429 tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
430 issues.push(ShardIntegrityIssue {
431 shard_name: shard_name.clone(),
432 problem: "sealed shard has no manifest file".into(),
433 });
434 continue;
435 };
436
437 let file_len = fs::metadata(&shard_path)?.len();
438 if file_len != manifest.byte_len {
439 tracing::error!(
440 shard = %shard_name,
441 expected = manifest.byte_len,
442 actual = file_len,
443 "sealed shard byte length mismatch"
444 );
445 issues.push(ShardIntegrityIssue {
446 shard_name,
447 problem: format!(
448 "byte length mismatch: manifest says {} bytes, file is {} bytes",
449 manifest.byte_len, file_len
450 ),
451 });
452 }
453 }
454
455 Ok(issues)
456 }
457
458 pub fn append(
481 &self,
482 line: &str,
483 durable: bool,
484 lock_timeout: Duration,
485 ) -> Result<i64, ShardError> {
486 self.ensure_dirs()?;
487
488 let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
489
490 let (year, month) = self.rotate_if_needed()?;
492 let shard_path = self.shard_path(year, month);
493
494 if shard_path.exists() {
496 validate_shard_header(&shard_path)?;
497 }
498
499 let ts = self.next_timestamp()?;
501
502 let mut file = OpenOptions::new()
504 .create(true)
505 .append(true)
506 .open(&shard_path)?;
507
508 file.write_all(line.as_bytes())?;
509 file.flush()?;
510
511 if durable {
512 file.sync_data()?;
513 }
514
515 Ok(ts)
516 }
517
518 pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
527 let shard_path = self.shard_path(year, month);
528
529 let mut file = OpenOptions::new()
530 .create(true)
531 .append(true)
532 .open(&shard_path)?;
533
534 file.write_all(line.as_bytes())?;
535 file.flush()?;
536 Ok(())
537 }
538
539 pub fn read_clock(&self) -> Result<i64, ShardError> {
552 let path = self.clock_path();
553 if !path.exists() {
554 return Ok(0);
555 }
556 let content = fs::read_to_string(&path)?;
557 Ok(content.trim().parse::<i64>().unwrap_or(0))
558 }
559
560 pub fn next_timestamp(&self) -> Result<i64, ShardError> {
571 let last = self.read_clock()?;
572 let now = system_time_us();
573 let next = std::cmp::max(now, last + 1);
574 self.write_clock(next)?;
575 Ok(next)
576 }
577
578 fn write_clock(&self, value: i64) -> Result<(), ShardError> {
580 let path = self.clock_path();
581 if let Some(parent) = path.parent() {
582 fs::create_dir_all(parent)?;
583 }
584 fs::write(&path, value.to_string())?;
585 Ok(())
586 }
587
588 pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
607 let Some(active) = self.active_shard()? else {
608 return Ok(None);
609 };
610
611 let shard_path = self.shard_path(active.0, active.1);
612 recover_shard_torn_write(&shard_path)
613 }
614
615 pub fn replay(&self) -> Result<String, ShardError> {
628 let shards = self.list_shards()?;
629 let mut content = String::new();
630
631 for (year, month) in shards {
632 let path = self.shard_path(year, month);
633 let shard_content = fs::read_to_string(&path)?;
634 content.push_str(&shard_content);
635 }
636
637 Ok(content)
638 }
639
640 pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
646 let path = self.shard_path(year, month);
647 Ok(fs::read_to_string(&path)?)
648 }
649
650 pub fn total_content_len(&self) -> Result<usize, ShardError> {
660 let shards = self.list_shards()?;
661 let mut total = 0usize;
662 for (year, month) in shards {
663 let path = self.shard_path(year, month);
664 let meta = fs::metadata(&path)?;
665 total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
666 }
667 Ok(total)
668 }
669
670 pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
687 let shards = self.list_shards()?;
688 let mut cumulative: usize = 0;
689 let mut result = String::new();
690 let mut found_start = false;
691
692 for (year, month) in shards {
693 let path = self.shard_path(year, month);
694 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
695
696 let shard_end = cumulative.saturating_add(shard_len);
697
698 if shard_end <= offset {
699 cumulative = shard_end;
701 continue;
702 }
703
704 let shard_content = fs::read_to_string(&path)?;
706
707 if found_start {
708 result.push_str(&shard_content);
709 } else {
710 let within = offset.saturating_sub(cumulative);
712 let within = within.min(shard_content.len());
714 let within = snap_to_char_boundary(&shard_content, within);
716 result.push_str(&shard_content[within..]);
717 found_start = true;
718 }
719
720 cumulative = shard_end;
721 }
722
723 Ok((result, cumulative))
724 }
725
726 pub fn read_content_range(
738 &self,
739 start_offset: usize,
740 end_offset: usize,
741 ) -> Result<String, ShardError> {
742 if start_offset >= end_offset {
743 return Ok(String::new());
744 }
745
746 let shards = self.list_shards()?;
747 let mut cumulative: usize = 0;
748 let mut result = String::new();
749
750 for (year, month) in shards {
751 let path = self.shard_path(year, month);
752 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
753 let shard_end = cumulative.saturating_add(shard_len);
754
755 if shard_end <= start_offset {
756 cumulative = shard_end;
758 continue;
759 }
760
761 if cumulative >= end_offset {
762 break;
764 }
765
766 let shard_content = fs::read_to_string(&path)?;
767
768 let within_start = if cumulative < start_offset {
770 (start_offset - cumulative).min(shard_content.len())
771 } else {
772 0
773 };
774 let within_end = if shard_end > end_offset {
775 (end_offset - cumulative).min(shard_content.len())
776 } else {
777 shard_content.len()
778 };
779
780 let within_start = snap_to_char_boundary(&shard_content, within_start);
782 let within_end = snap_to_char_boundary(&shard_content, within_end);
783 if within_start < within_end {
784 result.push_str(&shard_content[within_start..within_end]);
785 }
786 cumulative = shard_end;
787 }
788
789 Ok(result)
790 }
791
792 pub fn event_count(&self) -> Result<u64, ShardError> {
798 let content = self.replay()?;
799 let count = content
800 .lines()
801 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
802 .count();
803 Ok(count as u64)
804 }
805
806 pub fn replay_lines(
814 &self,
815 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
816 self.replay_lines_from_offset(0)
817 }
818
819 pub fn replay_lines_from_offset(
827 &self,
828 offset: usize,
829 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
830 let shards = self.list_shards()?;
831 let bones_dir = self.bones_dir.clone();
832
833 Ok(ShardLineIterator {
834 shards,
835 current_shard_idx: 0,
836 current_reader: None,
837 cumulative_offset: 0,
838 bones_dir,
839 }
840 .skip_to_offset(offset))
841 }
842
843 pub fn is_empty(&self) -> Result<bool, ShardError> {
849 let shards = self.list_shards()?;
850 Ok(shards.is_empty())
851 }
852}
853
854struct ShardLineIterator {
859 shards: Vec<(i32, u32)>,
860 current_shard_idx: usize,
861 current_reader: Option<io::BufReader<fs::File>>,
862 cumulative_offset: usize,
863 bones_dir: PathBuf,
864}
865
866impl ShardLineIterator {
867 fn skip_to_offset(mut self, offset: usize) -> Self {
868 while self.current_shard_idx < self.shards.len() {
870 let (year, month) = self.shards[self.current_shard_idx];
871 let shard_path = self
872 .bones_dir
873 .join("events")
874 .join(ShardManager::shard_filename(year, month));
875 if let Ok(meta) = fs::metadata(shard_path) {
876 let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
877 if self.cumulative_offset + shard_len <= offset {
878 self.cumulative_offset += shard_len;
879 self.current_shard_idx += 1;
880 continue;
881 }
882 }
883 break;
884 }
885
886 self.cumulative_offset = offset;
891 self
892 }
893}
894
895impl Iterator for ShardLineIterator {
896 type Item = io::Result<(usize, String)>;
897
898 fn next(&mut self) -> Option<Self::Item> {
899 use std::io::{BufRead, Seek, SeekFrom};
900
901 loop {
902 if self.current_reader.is_none() {
903 if self.current_shard_idx >= self.shards.len() {
904 return None;
905 }
906
907 let (year, month) = self.shards[self.current_shard_idx];
908 let shard_path = self
909 .bones_dir
910 .join("events")
911 .join(ShardManager::shard_filename(year, month));
912
913 if is_forwarding_pointer(&shard_path) {
917 tracing::warn!(
918 shard = %shard_path.display(),
919 "skipping legacy forwarding-pointer shard during replay"
920 );
921 if let Ok(meta) = fs::metadata(&shard_path) {
922 self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
923 }
924 self.current_shard_idx += 1;
925 continue;
926 }
927
928 if let Err(e) = validate_shard_header(&shard_path) {
930 tracing::error!(
931 shard = %shard_path.display(),
932 error = %e,
933 "shard header validation failed"
934 );
935 return Some(Err(io::Error::new(
936 io::ErrorKind::InvalidData,
937 e.to_string(),
938 )));
939 }
940
941 let mut file = match fs::File::open(shard_path) {
942 Ok(f) => f,
943 Err(e) => return Some(Err(e)),
944 };
945
946 let mut cumulative_before = 0;
948 for i in 0..self.current_shard_idx {
949 let (y, m) = self.shards[i];
950 let p = self
951 .bones_dir
952 .join("events")
953 .join(ShardManager::shard_filename(y, m));
954 if let Ok(meta) = fs::metadata(p) {
955 cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
956 }
957 }
958
959 if self.cumulative_offset > cumulative_before {
960 let within = self.cumulative_offset - cumulative_before;
961 if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
962 return Some(Err(e));
963 }
964 }
965
966 self.current_reader = Some(io::BufReader::new(file));
967 }
968
969 let reader = self
970 .current_reader
971 .as_mut()
972 .expect("reader was just set above");
973 let mut line = String::new();
974 let offset = self.cumulative_offset;
975
976 match reader.read_line(&mut line) {
977 Ok(0) => {
978 self.current_reader = None;
980 self.current_shard_idx += 1;
981 }
982 Ok(n) => {
983 self.cumulative_offset += n;
984 return Some(Ok((offset, line)));
985 }
986 Err(e) => return Some(Err(e)),
987 }
988 }
989 }
990}
991
992const fn snap_to_char_boundary(s: &str, offset: usize) -> usize {
1002 if offset >= s.len() {
1003 return s.len();
1004 }
1005 let mut pos = offset;
1007 while pos < s.len() && !s.is_char_boundary(pos) {
1008 pos += 1;
1009 }
1010 pos
1011}
1012
1013#[must_use]
1015fn current_year_month() -> (i32, u32) {
1016 let now = chrono::Utc::now();
1017 (now.year(), now.month())
1018}
1019
1020#[allow(clippy::cast_possible_truncation)]
1022#[must_use]
1023fn system_time_us() -> i64 {
1024 SystemTime::now()
1025 .duration_since(UNIX_EPOCH)
1026 .map_or(0, |d| d.as_micros() as i64)
1027}
1028
1029fn is_forwarding_pointer(path: &Path) -> bool {
1036 let Ok(meta) = fs::metadata(path) else {
1037 return false;
1038 };
1039 if meta.len() > 30 {
1040 return false;
1041 }
1042 let Ok(content) = fs::read_to_string(path) else {
1043 return false;
1044 };
1045 parse_shard_filename(content.trim()).is_some()
1046}
1047
1048pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
1059 use crate::event::writer::SHARD_HEADER;
1060 use std::io::{BufRead, BufReader};
1061
1062 if is_forwarding_pointer(path) {
1064 return Ok(());
1065 }
1066
1067 let file = fs::File::open(path)?;
1068 let mut reader = BufReader::new(file);
1069 let mut first_line = String::new();
1070 let n = reader.read_line(&mut first_line)?;
1071
1072 if n == 0 {
1074 return Ok(());
1075 }
1076
1077 let trimmed = first_line.trim_end();
1078 if trimmed != SHARD_HEADER {
1079 return Err(ShardError::CorruptedShard {
1080 path: path.to_path_buf(),
1081 reason: format!(
1082 "expected header '{}', found '{}'",
1083 SHARD_HEADER,
1084 trimmed.chars().take(80).collect::<String>()
1085 ),
1086 });
1087 }
1088
1089 Ok(())
1090}
1091
1092fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
1094 let stem = name.strip_suffix(".events")?;
1095 if stem == "current" {
1097 return None;
1098 }
1099 let (year_str, month_str) = stem.split_once('-')?;
1100 let year: i32 = year_str.parse().ok()?;
1101 let month: u32 = month_str.parse().ok()?;
1102 if !(1..=12).contains(&month) {
1103 return None;
1104 }
1105 Some((year, month))
1106}
1107
1108fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
1113 let metadata = fs::metadata(path)?;
1114 let file_len = metadata.len();
1115 if file_len == 0 {
1116 return Ok(None);
1117 }
1118
1119 let content = fs::read(path)?;
1120
1121 let last_newline = content.iter().rposition(|&b| b == b'\n');
1123
1124 if let Some(pos) = last_newline {
1125 let expected_len = (pos + 1) as u64;
1126 if expected_len < file_len {
1127 let truncated = file_len - expected_len;
1129 let file = OpenOptions::new().write(true).open(path)?;
1130 file.set_len(expected_len)?;
1131 Ok(Some(truncated))
1132 } else {
1133 Ok(None)
1135 }
1136 } else {
1137 let file = OpenOptions::new().write(true).open(path)?;
1140 file.set_len(0)?;
1141 Ok(Some(file_len))
1142 }
1143}
1144
1145#[cfg(test)]
1150mod tests {
1151 use super::*;
1152 use tempfile::TempDir;
1153
1154 #[test]
1155 fn snap_to_char_boundary_ascii() {
1156 let s = "hello world";
1157 assert_eq!(snap_to_char_boundary(s, 0), 0);
1158 assert_eq!(snap_to_char_boundary(s, 5), 5);
1159 assert_eq!(snap_to_char_boundary(s, 11), 11); assert_eq!(snap_to_char_boundary(s, 100), 11); }
1162
1163 #[test]
1164 fn snap_to_char_boundary_emoji() {
1165 let s = "ab✅cd🎉ef";
1167 assert_eq!(snap_to_char_boundary(s, 2), 2); assert_eq!(snap_to_char_boundary(s, 3), 5); assert_eq!(snap_to_char_boundary(s, 4), 5); assert_eq!(snap_to_char_boundary(s, 5), 5); assert_eq!(snap_to_char_boundary(s, 8), 11); assert_eq!(snap_to_char_boundary(s, 9), 11);
1174 assert_eq!(snap_to_char_boundary(s, 10), 11);
1175 }
1176
1177 fn setup() -> (TempDir, ShardManager) {
1178 let tmp = TempDir::new().expect("tempdir");
1179 let bones_dir = tmp.path().join(".bones");
1180 let mgr = ShardManager::new(&bones_dir);
1181 (tmp, mgr)
1182 }
1183
1184 #[test]
1189 fn parse_valid_shard_filenames() {
1190 assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1191 assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1192 assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1193 }
1194
1195 #[test]
1196 fn parse_invalid_shard_filenames() {
1197 assert_eq!(parse_shard_filename("current.events"), None);
1198 assert_eq!(parse_shard_filename("2026-13.events"), None); assert_eq!(parse_shard_filename("2026-00.events"), None); assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1201 assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1202 assert_eq!(parse_shard_filename(""), None);
1203 }
1204
1205 #[test]
1210 fn shard_manager_paths() {
1211 let mgr = ShardManager::new("/repo/.bones");
1212 assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1213 assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1214 assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1215 assert_eq!(
1216 mgr.shard_path(2026, 2),
1217 PathBuf::from("/repo/.bones/events/2026-02.events")
1218 );
1219 assert_eq!(
1220 mgr.manifest_path(2026, 1),
1221 PathBuf::from("/repo/.bones/events/2026-01.manifest")
1222 );
1223 }
1224
1225 #[test]
1226 fn shard_filename_format() {
1227 assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1228 assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1229 assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1230 }
1231
1232 #[test]
1237 fn ensure_dirs_creates_directories() {
1238 let (_tmp, mgr) = setup();
1239 mgr.ensure_dirs().expect("should create dirs");
1240 assert!(mgr.events_dir().exists());
1241 assert!(mgr.bones_dir.join("cache").exists());
1242 }
1243
1244 #[test]
1245 fn ensure_dirs_is_idempotent() {
1246 let (_tmp, mgr) = setup();
1247 mgr.ensure_dirs().expect("first");
1248 mgr.ensure_dirs().expect("second");
1249 assert!(mgr.events_dir().exists());
1250 }
1251
1252 #[test]
1253 fn init_creates_first_shard() {
1254 let (_tmp, mgr) = setup();
1255 let (year, month) = mgr.init().expect("init");
1256
1257 let (expected_year, expected_month) = current_year_month();
1258 assert_eq!(year, expected_year);
1259 assert_eq!(month, expected_month);
1260
1261 let shard_path = mgr.shard_path(year, month);
1263 assert!(shard_path.exists());
1264 let content = fs::read_to_string(&shard_path).expect("read");
1265 assert!(content.starts_with("# bones event log v1"));
1266 }
1267
1268 #[test]
1269 fn init_is_idempotent() {
1270 let (_tmp, mgr) = setup();
1271 let first = mgr.init().expect("first");
1272 let second = mgr.init().expect("second");
1273 assert_eq!(first, second);
1274 }
1275
1276 #[test]
1277 fn validate_shard_header_reports_unicode_without_panicking() {
1278 let (_tmp, mgr) = setup();
1279 mgr.ensure_dirs().expect("dirs");
1280 let path = mgr.shard_path(2026, 4);
1281 fs::write(&path, format!("{}\n", "é".repeat(120))).expect("write shard");
1282
1283 let err = validate_shard_header(&path).expect_err("invalid header");
1284 assert!(err.to_string().contains("expected header"));
1285 }
1286
1287 #[test]
1292 fn list_shards_empty() {
1293 let (_tmp, mgr) = setup();
1294 mgr.ensure_dirs().expect("dirs");
1295 let shards = mgr.list_shards().expect("list");
1296 assert!(shards.is_empty());
1297 }
1298
1299 #[test]
1300 fn list_shards_returns_sorted() {
1301 let (_tmp, mgr) = setup();
1302 mgr.ensure_dirs().expect("dirs");
1303
1304 mgr.create_shard(2026, 3).expect("create");
1306 mgr.create_shard(2026, 1).expect("create");
1307 mgr.create_shard(2026, 2).expect("create");
1308
1309 let shards = mgr.list_shards().expect("list");
1310 assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1311 }
1312
1313 #[test]
1314 fn list_shards_skips_non_shard_files() {
1315 let (_tmp, mgr) = setup();
1316 mgr.ensure_dirs().expect("dirs");
1317 mgr.create_shard(2026, 1).expect("create");
1318
1319 fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1321 fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1322
1323 let shards = mgr.list_shards().expect("list");
1324 assert_eq!(shards, vec![(2026, 1)]);
1325 }
1326
1327 #[test]
1328 fn list_shards_no_events_dir() {
1329 let (_tmp, mgr) = setup();
1330 let shards = mgr.list_shards().expect("list");
1332 assert!(shards.is_empty());
1333 }
1334
1335 #[test]
1340 fn create_shard_writes_header() {
1341 let (_tmp, mgr) = setup();
1342 mgr.ensure_dirs().expect("dirs");
1343 let path = mgr.create_shard(2026, 2).expect("create");
1344
1345 let content = fs::read_to_string(&path).expect("read");
1346 assert!(content.starts_with("# bones event log v1"));
1347 assert!(content.contains("# fields:"));
1348 assert_eq!(content.lines().count(), 2);
1349 }
1350
1351 #[test]
1352 fn create_shard_idempotent() {
1353 let (_tmp, mgr) = setup();
1354 mgr.ensure_dirs().expect("dirs");
1355 let p1 = mgr.create_shard(2026, 2).expect("first");
1356 fs::write(&p1, "modified").expect("write");
1358 let p2 = mgr.create_shard(2026, 2).expect("second");
1360 assert_eq!(p1, p2);
1361 let content = fs::read_to_string(&p2).expect("read");
1362 assert_eq!(content, "modified");
1363 }
1364
1365 #[test]
1370 fn clock_starts_at_zero() {
1371 let (_tmp, mgr) = setup();
1372 mgr.ensure_dirs().expect("dirs");
1373 let ts = mgr.read_clock().expect("read");
1374 assert_eq!(ts, 0);
1375 }
1376
1377 #[test]
1378 fn clock_is_monotonic() {
1379 let (_tmp, mgr) = setup();
1380 mgr.ensure_dirs().expect("dirs");
1381 let t1 = mgr.next_timestamp().expect("t1");
1382 let t2 = mgr.next_timestamp().expect("t2");
1383 let t3 = mgr.next_timestamp().expect("t3");
1384 assert!(t2 > t1);
1385 assert!(t3 > t2);
1386 }
1387
1388 #[test]
1389 fn clock_reads_back_written_value() {
1390 let (_tmp, mgr) = setup();
1391 mgr.ensure_dirs().expect("dirs");
1392 mgr.write_clock(42_000_000).expect("write");
1393 let ts = mgr.read_clock().expect("read");
1394 assert_eq!(ts, 42_000_000);
1395 }
1396
1397 #[test]
1398 fn clock_never_goes_backward() {
1399 let (_tmp, mgr) = setup();
1400 mgr.ensure_dirs().expect("dirs");
1401
1402 let future = system_time_us() + 10_000_000;
1404 mgr.write_clock(future).expect("write");
1405
1406 let next = mgr.next_timestamp().expect("next");
1407 assert!(next > future, "clock should advance past future value");
1408 }
1409
1410 #[test]
1415 fn append_raw_adds_line() {
1416 let (_tmp, mgr) = setup();
1417 mgr.ensure_dirs().expect("dirs");
1418 mgr.create_shard(2026, 2).expect("create");
1419
1420 mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1421 mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1422
1423 let content = mgr.read_shard(2026, 2).expect("read");
1424 assert!(content.contains("event line 1"));
1425 assert!(content.contains("event line 2"));
1426 }
1427
1428 #[test]
1429 fn append_with_lock() {
1430 let (_tmp, mgr) = setup();
1431 mgr.init().expect("init");
1432
1433 let _ts = mgr
1434 .append("test event line\n", false, Duration::from_secs(1))
1435 .expect("append");
1436
1437 let content = mgr.replay().expect("replay");
1438 assert!(content.contains("test event line"));
1439 }
1440
1441 #[test]
1442 fn append_returns_monotonic_timestamps() {
1443 let (_tmp, mgr) = setup();
1444 mgr.init().expect("init");
1445
1446 let t1 = mgr
1447 .append("line1\n", false, Duration::from_secs(1))
1448 .expect("t1");
1449 let t2 = mgr
1450 .append("line2\n", false, Duration::from_secs(1))
1451 .expect("t2");
1452
1453 assert!(t2 > t1);
1454 }
1455
1456 #[test]
1461 fn recover_clean_file() {
1462 let (_tmp, mgr) = setup();
1463 mgr.init().expect("init");
1464
1465 let (y, m) = current_year_month();
1466 mgr.append_raw(y, m, "complete line\n").expect("append");
1467
1468 let recovered = mgr.recover_torn_writes().expect("recover");
1469 assert_eq!(recovered, None);
1470 }
1471
1472 #[test]
1473 fn recover_torn_write_truncates() {
1474 let (_tmp, mgr) = setup();
1475 let (y, m) = mgr.init().expect("init");
1476 let shard_path = mgr.shard_path(y, m);
1477
1478 {
1480 let mut f = OpenOptions::new()
1481 .append(true)
1482 .open(&shard_path)
1483 .expect("open");
1484 f.write_all(b"complete line\npartial line without newline")
1485 .expect("write");
1486 f.flush().expect("flush");
1487 }
1488
1489 let recovered = mgr.recover_torn_writes().expect("recover");
1490 assert!(recovered.is_some());
1491
1492 let truncated = recovered.expect("checked is_some");
1493 assert_eq!(truncated, "partial line without newline".len() as u64);
1494
1495 let content = fs::read_to_string(&shard_path).expect("read");
1497 assert!(content.ends_with('\n'));
1498 assert!(content.contains("complete line"));
1499 assert!(!content.contains("partial line without newline"));
1500 }
1501
1502 #[test]
1503 fn recover_no_newline_at_all() {
1504 let (_tmp, mgr) = setup();
1505 let (y, m) = mgr.init().expect("init");
1506 let shard_path = mgr.shard_path(y, m);
1507
1508 fs::write(&shard_path, "no newlines here").expect("write");
1510
1511 let recovered = mgr.recover_torn_writes().expect("recover");
1512 assert_eq!(recovered, Some("no newlines here".len() as u64));
1513
1514 let content = fs::read_to_string(&shard_path).expect("read");
1516 assert!(content.is_empty());
1517 }
1518
1519 #[test]
1520 fn recover_empty_file() {
1521 let (_tmp, mgr) = setup();
1522 let (y, m) = mgr.init().expect("init");
1523 let shard_path = mgr.shard_path(y, m);
1524
1525 fs::write(&shard_path, "").expect("write");
1527
1528 let recovered = mgr.recover_torn_writes().expect("recover");
1529 assert_eq!(recovered, None);
1530 }
1531
1532 #[test]
1533 fn recover_no_active_shard() {
1534 let (_tmp, mgr) = setup();
1535 mgr.ensure_dirs().expect("dirs");
1536
1537 let recovered = mgr.recover_torn_writes().expect("recover");
1538 assert_eq!(recovered, None);
1539 }
1540
1541 #[test]
1546 fn replay_empty_repo() {
1547 let (_tmp, mgr) = setup();
1548 mgr.ensure_dirs().expect("dirs");
1549 let content = mgr.replay().expect("replay");
1550 assert!(content.is_empty());
1551 }
1552
1553 #[test]
1554 fn replay_single_shard() {
1555 let (_tmp, mgr) = setup();
1556 mgr.ensure_dirs().expect("dirs");
1557 mgr.create_shard(2026, 1).expect("create");
1558 mgr.append_raw(2026, 1, "event-a\n").expect("append");
1559
1560 let content = mgr.replay().expect("replay");
1561 assert!(content.contains("event-a"));
1562 }
1563
1564 #[test]
1565 fn replay_multiple_shards_in_order() {
1566 let (_tmp, mgr) = setup();
1567 mgr.ensure_dirs().expect("dirs");
1568
1569 mgr.create_shard(2026, 1).expect("create");
1570 mgr.create_shard(2026, 2).expect("create");
1571 mgr.create_shard(2026, 3).expect("create");
1572
1573 mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1574 mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1575 mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1576
1577 let content = mgr.replay().expect("replay");
1578
1579 let jan_pos = content.find("event-jan").expect("jan");
1581 let feb_pos = content.find("event-feb").expect("feb");
1582 let mar_pos = content.find("event-mar").expect("mar");
1583 assert!(jan_pos < feb_pos);
1584 assert!(feb_pos < mar_pos);
1585 }
1586
1587 #[test]
1592 fn event_count_empty() {
1593 let (_tmp, mgr) = setup();
1594 mgr.ensure_dirs().expect("dirs");
1595 assert_eq!(mgr.event_count().expect("count"), 0);
1596 }
1597
1598 #[test]
1599 fn event_count_excludes_comments_and_blanks() {
1600 let (_tmp, mgr) = setup();
1601 mgr.ensure_dirs().expect("dirs");
1602 mgr.create_shard(2026, 1).expect("create");
1603 mgr.append_raw(2026, 1, "event1\n").expect("append");
1605 mgr.append_raw(2026, 1, "event2\n").expect("append");
1606 mgr.append_raw(2026, 1, "\n").expect("blank");
1607
1608 assert_eq!(mgr.event_count().expect("count"), 2);
1609 }
1610
1611 #[test]
1616 fn is_empty_no_shards() {
1617 let (_tmp, mgr) = setup();
1618 mgr.ensure_dirs().expect("dirs");
1619 assert!(mgr.is_empty().expect("empty"));
1620 }
1621
1622 #[test]
1623 fn is_empty_with_shards() {
1624 let (_tmp, mgr) = setup();
1625 mgr.init().expect("init");
1626 assert!(!mgr.is_empty().expect("empty"));
1627 }
1628
1629 #[test]
1634 fn write_and_read_manifest() {
1635 let (_tmp, mgr) = setup();
1636 mgr.ensure_dirs().expect("dirs");
1637 mgr.create_shard(2026, 1).expect("create");
1638 mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1639 mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1640
1641 let written = mgr.write_manifest(2026, 1).expect("write manifest");
1642 assert_eq!(written.shard_name, "2026-01.events");
1643 assert_eq!(written.event_count, 2);
1644 assert!(written.byte_len > 0);
1645 assert!(written.file_hash.starts_with("blake3:"));
1646
1647 let read = mgr
1648 .read_manifest(2026, 1)
1649 .expect("read")
1650 .expect("should exist");
1651 assert_eq!(read, written);
1652 }
1653
1654 #[test]
1655 fn manifest_roundtrip() {
1656 let manifest = ShardManifest {
1657 shard_name: "2026-01.events".into(),
1658 event_count: 42,
1659 byte_len: 12345,
1660 file_hash: "blake3:abcdef0123456789".into(),
1661 };
1662
1663 let repr = manifest.to_string_repr();
1664 let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1665 assert_eq!(parsed, manifest);
1666 }
1667
1668 #[test]
1669 fn read_manifest_missing() {
1670 let (_tmp, mgr) = setup();
1671 mgr.ensure_dirs().expect("dirs");
1672 let result = mgr.read_manifest(2026, 1).expect("read");
1673 assert!(result.is_none());
1674 }
1675
1676 #[test]
1677 fn manifest_event_count_excludes_comments() {
1678 let (_tmp, mgr) = setup();
1679 mgr.ensure_dirs().expect("dirs");
1680 mgr.create_shard(2026, 1).expect("create");
1681 mgr.append_raw(2026, 1, "event1\n").expect("append");
1683
1684 let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1685 assert_eq!(manifest.event_count, 1);
1687 }
1688
1689 #[test]
1694 fn rotate_creates_shard_if_none_exist() {
1695 let (_tmp, mgr) = setup();
1696 mgr.ensure_dirs().expect("dirs");
1697
1698 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1699 let (ey, em) = current_year_month();
1700 assert_eq!((y, m), (ey, em));
1701
1702 assert!(mgr.shard_path(y, m).exists());
1703 }
1704
1705 #[test]
1706 fn rotate_no_op_same_month() {
1707 let (_tmp, mgr) = setup();
1708 let (y, m) = mgr.init().expect("init");
1709
1710 let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1711 assert_eq!((y, m), (y2, m2));
1712 }
1713
1714 #[test]
1715 fn rotate_different_month_seals_and_creates() {
1716 let (_tmp, mgr) = setup();
1717 mgr.ensure_dirs().expect("dirs");
1718
1719 mgr.create_shard(2025, 11).expect("create");
1721 mgr.append_raw(2025, 11, "old event\n").expect("append");
1722
1723 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1725 let (ey, em) = current_year_month();
1726 assert_eq!((y, m), (ey, em));
1727
1728 assert!(mgr.manifest_path(2025, 11).exists());
1730
1731 assert!(mgr.shard_path(ey, em).exists());
1733 }
1734
1735 #[test]
1740 fn frozen_shard_not_modified_by_append() {
1741 let (_tmp, mgr) = setup();
1742 mgr.ensure_dirs().expect("dirs");
1743
1744 mgr.create_shard(2025, 6).expect("create");
1746 mgr.append_raw(2025, 6, "old event\n").expect("append");
1747 let old_content = mgr.read_shard(2025, 6).expect("read");
1748
1749 mgr.init().expect("init");
1751
1752 mgr.append("new event\n", false, Duration::from_secs(1))
1754 .expect("append");
1755
1756 let after_content = mgr.read_shard(2025, 6).expect("read");
1758 assert_eq!(old_content, after_content);
1759 }
1760
1761 #[test]
1766 fn system_time_us_is_positive() {
1767 let ts = system_time_us();
1768 assert!(ts > 0, "system time should be positive: {ts}");
1769 }
1770
1771 #[test]
1772 fn system_time_us_is_reasonable() {
1773 let ts = system_time_us();
1774 let jan_2020_us: i64 = 1_577_836_800_000_000;
1776 assert!(ts > jan_2020_us, "system time too small: {ts}");
1777 }
1778
1779 #[test]
1784 fn total_content_len_empty_repo() {
1785 let (_tmp, mgr) = setup();
1786 mgr.ensure_dirs().expect("dirs");
1787 let len = mgr.total_content_len().expect("len");
1788 assert_eq!(len, 0);
1789 }
1790
1791 #[test]
1792 fn total_content_len_single_shard() {
1793 let (_tmp, mgr) = setup();
1794 mgr.ensure_dirs().expect("dirs");
1795 mgr.create_shard(2026, 1).expect("create");
1796 mgr.append_raw(2026, 1, "line1\n").expect("append");
1797 mgr.append_raw(2026, 1, "line2\n").expect("append");
1798
1799 let full = mgr.replay().expect("replay");
1800 let len = mgr.total_content_len().expect("len");
1801 assert_eq!(len, full.len());
1802 }
1803
1804 #[test]
1805 fn total_content_len_multiple_shards() {
1806 let (_tmp, mgr) = setup();
1807 mgr.ensure_dirs().expect("dirs");
1808 mgr.create_shard(2026, 1).expect("shard 1");
1809 mgr.create_shard(2026, 2).expect("shard 2");
1810 mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1811 mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1812
1813 let full = mgr.replay().expect("replay");
1814 let len = mgr.total_content_len().expect("len");
1815 assert_eq!(len, full.len(), "total_content_len must match replay len");
1816 }
1817
1818 #[test]
1823 fn read_content_range_empty_range() {
1824 let (_tmp, mgr) = setup();
1825 mgr.ensure_dirs().expect("dirs");
1826 mgr.create_shard(2026, 1).expect("create");
1827 mgr.append_raw(2026, 1, "event\n").expect("append");
1828
1829 let result = mgr.read_content_range(5, 5).expect("range");
1830 assert!(result.is_empty());
1831 }
1832
1833 #[test]
1834 fn read_content_range_within_single_shard() {
1835 let (_tmp, mgr) = setup();
1836 mgr.ensure_dirs().expect("dirs");
1837 mgr.create_shard(2026, 1).expect("create");
1838 mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1840
1841 let full = mgr.replay().expect("replay");
1842 let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1844 let range = mgr.read_content_range(pos, pos + 7).expect("range");
1845 assert_eq!(range, "ABCDEF\n");
1846 }
1847
1848 #[test]
1849 fn read_content_range_across_shard_boundary() {
1850 let (_tmp, mgr) = setup();
1851 mgr.ensure_dirs().expect("dirs");
1852 mgr.create_shard(2026, 1).expect("shard 1");
1853 mgr.create_shard(2026, 2).expect("shard 2");
1854 mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1855 mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1856
1857 let full = mgr.replay().expect("replay");
1858 let range = mgr.read_content_range(0, full.len()).expect("full range");
1860 assert_eq!(range, full);
1861 }
1862
1863 #[test]
1864 fn read_content_range_beyond_end() {
1865 let (_tmp, mgr) = setup();
1866 mgr.ensure_dirs().expect("dirs");
1867 mgr.create_shard(2026, 1).expect("create");
1868 mgr.append_raw(2026, 1, "event\n").expect("append");
1869
1870 let full = mgr.replay().expect("replay");
1871 let range = mgr
1873 .read_content_range(full.len(), full.len() + 100)
1874 .expect("beyond end");
1875 assert!(range.is_empty());
1876 }
1877
1878 #[test]
1883 fn replay_from_offset_zero_returns_full_content() {
1884 let (_tmp, mgr) = setup();
1885 mgr.ensure_dirs().expect("dirs");
1886 mgr.create_shard(2026, 1).expect("create");
1887 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1888 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1889
1890 let full = mgr.replay().expect("full replay");
1891 let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1892 assert_eq!(from_zero, full);
1893 assert_eq!(total_len, full.len());
1894 }
1895
1896 #[test]
1897 fn replay_from_offset_skips_content_before_cursor() {
1898 let (_tmp, mgr) = setup();
1899 mgr.ensure_dirs().expect("dirs");
1900 mgr.create_shard(2026, 1).expect("create");
1901 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1902 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1903 mgr.append_raw(2026, 1, "event3\n").expect("e3");
1904
1905 let full = mgr.replay().expect("full replay");
1906
1907 let cursor = full.find("event3").expect("event3 in content");
1909 let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1910 assert_eq!(tail, "event3\n");
1911 assert_eq!(total_len, full.len());
1912 }
1913
1914 #[test]
1915 fn replay_from_offset_at_end_returns_empty() {
1916 let (_tmp, mgr) = setup();
1917 mgr.ensure_dirs().expect("dirs");
1918 mgr.create_shard(2026, 1).expect("create");
1919 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1920
1921 let full = mgr.replay().expect("full replay");
1922 let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1923 assert!(tail.is_empty(), "tail should be empty at end of content");
1924 assert_eq!(total_len, full.len());
1925 }
1926
1927 #[test]
1928 fn replay_from_offset_skips_sealed_shards_before_cursor() {
1929 let (_tmp, mgr) = setup();
1930 mgr.ensure_dirs().expect("dirs");
1931
1932 mgr.create_shard(2026, 1).expect("jan");
1934 mgr.create_shard(2026, 2).expect("feb");
1935 mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1936 mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1937 mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1938 mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1939
1940 let full = mgr.replay().expect("full replay");
1941 let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1942
1943 let (tail, total_len) = mgr
1945 .replay_from_offset(jan_shard_len)
1946 .expect("from feb start");
1947 assert!(
1948 !tail.contains("jan-event"),
1949 "jan events should not appear in tail"
1950 );
1951 assert!(tail.contains("feb-event1"), "feb events must be in tail");
1952 assert!(tail.contains("feb-event2"), "feb events must be in tail");
1953 assert_eq!(total_len, full.len());
1954 }
1955
1956 #[test]
1957 fn replay_from_offset_total_len_equals_total_content_len() {
1958 let (_tmp, mgr) = setup();
1959 mgr.ensure_dirs().expect("dirs");
1960 mgr.create_shard(2026, 1).expect("shard 1");
1961 mgr.create_shard(2026, 2).expect("shard 2");
1962 mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1963 mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1964
1965 let total = mgr.total_content_len().expect("total_content_len");
1966 let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1967 assert_eq!(
1968 total, replay_total,
1969 "total_content_len and replay_from_offset total must agree"
1970 );
1971 }
1972
1973 #[test]
1977 fn replay_lines_skips_forwarding_pointer_shard() {
1978 let dir = tempfile::tempdir().expect("tmpdir");
1979 let mgr = ShardManager::new(dir.path());
1980 mgr.ensure_dirs().expect("dirs");
1981
1982 mgr.create_shard(2026, 1).expect("create jan");
1984 mgr.append_raw(2026, 1, "# bones event log v1\n")
1985 .expect("header");
1986 mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
1987
1988 let feb_path = dir.path().join("events").join("2026-02.events");
1990 fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
1991
1992 mgr.create_shard(2026, 3).expect("create mar");
1994 mgr.append_raw(2026, 3, "# bones event log v1\n")
1995 .expect("mar header");
1996 mgr.append_raw(2026, 3, "another-event-line\n")
1997 .expect("mar event");
1998
1999 let lines: Vec<String> = mgr
2000 .replay_lines()
2001 .expect("replay_lines")
2002 .map(|r| r.expect("line").1)
2003 .collect();
2004
2005 assert!(
2007 lines.iter().any(|l| l.contains("real-event-line")),
2008 "jan event missing"
2009 );
2010 assert!(
2011 lines.iter().any(|l| l.contains("another-event-line")),
2012 "mar event missing"
2013 );
2014 assert!(
2015 !lines.iter().any(|l| l.contains("2026-03.events")),
2016 "forwarding pointer content must not appear in replay"
2017 );
2018 }
2019}