1use std::fs::{self, OpenOptions};
32use std::io::{self, Write as IoWrite};
33use std::path::{Path, PathBuf};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use chrono::Datelike;
37
38use crate::event::writer::shard_header;
39use crate::lock::ShardLock;
40
41#[derive(Debug, thiserror::Error)]
47pub enum ShardError {
48 #[error("shard I/O error: {0}")]
50 Io(#[from] io::Error),
51
52 #[error("lock error: {0}")]
54 Lock(#[from] crate::lock::LockError),
55
56 #[error("failed to initialize .bones directory: {0}")]
58 InitFailed(io::Error),
59
60 #[error("invalid shard filename: {0}")]
62 InvalidShardName(String),
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct ShardManifest {
72 pub shard_name: String,
74 pub event_count: u64,
76 pub byte_len: u64,
78 pub file_hash: String,
80}
81
82impl ShardManifest {
83 #[must_use]
85 pub fn to_string_repr(&self) -> String {
86 format!(
87 "shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
88 self.shard_name, self.event_count, self.byte_len, self.file_hash
89 )
90 }
91
92 #[must_use]
96 pub fn from_string_repr(s: &str) -> Option<Self> {
97 let mut shard_name = None;
98 let mut event_count = None;
99 let mut byte_len = None;
100 let mut file_hash = None;
101
102 for line in s.lines() {
103 if let Some(val) = line.strip_prefix("shard: ") {
104 shard_name = Some(val.to_string());
105 } else if let Some(val) = line.strip_prefix("event_count: ") {
106 event_count = val.parse().ok();
107 } else if let Some(val) = line.strip_prefix("byte_len: ") {
108 byte_len = val.parse().ok();
109 } else if let Some(val) = line.strip_prefix("file_hash: ") {
110 file_hash = Some(val.to_string());
111 }
112 }
113
114 Some(Self {
115 shard_name: shard_name?,
116 event_count: event_count?,
117 byte_len: byte_len?,
118 file_hash: file_hash?,
119 })
120 }
121}
122
123pub struct ShardManager {
138 bones_dir: PathBuf,
140}
141
142impl ShardManager {
143 #[must_use]
148 pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
149 Self {
150 bones_dir: bones_dir.into(),
151 }
152 }
153
154 #[must_use]
156 pub fn events_dir(&self) -> PathBuf {
157 self.bones_dir.join("events")
158 }
159
160 #[must_use]
162 pub fn lock_path(&self) -> PathBuf {
163 self.bones_dir.join("lock")
164 }
165
166 #[must_use]
168 pub fn clock_path(&self) -> PathBuf {
169 self.bones_dir.join("cache").join("clock")
170 }
171
172 #[must_use]
174 pub fn current_symlink(&self) -> PathBuf {
175 self.events_dir().join("current.events")
176 }
177
178 #[must_use]
180 pub fn shard_filename(year: i32, month: u32) -> String {
181 format!("{year:04}-{month:02}.events")
182 }
183
184 #[must_use]
186 pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
187 self.events_dir().join(Self::shard_filename(year, month))
188 }
189
190 #[must_use]
192 pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
193 self.events_dir()
194 .join(format!("{year:04}-{month:02}.manifest"))
195 }
196
197 pub fn ensure_dirs(&self) -> Result<(), ShardError> {
208 fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
209 fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
210 Ok(())
211 }
212
213 pub fn init(&self) -> Result<(i32, u32), ShardError> {
223 self.ensure_dirs()?;
224
225 let shards = self.list_shards()?;
226 if shards.is_empty() {
227 let (year, month) = current_year_month();
228 self.create_shard(year, month)?;
229 self.update_symlink(year, month)?;
230 Ok((year, month))
231 } else if let Some(&(year, month)) = shards.last() {
232 self.update_symlink(year, month)?;
233 Ok((year, month))
234 } else {
235 unreachable!("shards is non-empty")
236 }
237 }
238
239 pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
252 let events_dir = self.events_dir();
253 if !events_dir.exists() {
254 return Ok(Vec::new());
255 }
256
257 let mut shards = Vec::new();
258 for entry in fs::read_dir(&events_dir)? {
259 let entry = entry?;
260 let name = entry.file_name();
261 let name_str = name.to_string_lossy();
262 if let Some(ym) = parse_shard_filename(&name_str) {
263 shards.push(ym);
264 }
265 }
266 shards.sort_unstable();
267 Ok(shards)
268 }
269
270 pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
276 let shards = self.list_shards()?;
277 Ok(shards.last().copied())
278 }
279
280 pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
293 let path = self.shard_path(year, month);
294 if path.exists() {
295 return Ok(path);
296 }
297
298 let header = shard_header();
299 fs::write(&path, header)?;
300 Ok(path)
301 }
302
303 pub fn update_symlink(&self, year: i32, month: u32) -> Result<(), ShardError> {
309 let symlink = self.current_symlink();
310 let target = Self::shard_filename(year, month);
311
312 if symlink.exists() || symlink.symlink_metadata().is_ok() {
314 fs::remove_file(&symlink)?;
315 }
316
317 #[cfg(unix)]
318 std::os::unix::fs::symlink(&target, &symlink)?;
319
320 #[cfg(not(unix))]
321 fs::write(&symlink, &target)?;
322
323 Ok(())
324 }
325
326 pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
335 let (current_year, current_month) = current_year_month();
336 let active = self.active_shard()?;
337
338 match active {
339 Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
340 Some((y, m)) => {
341 self.write_manifest(y, m)?;
343 self.create_shard(current_year, current_month)?;
345 self.update_symlink(current_year, current_month)?;
346 Ok((current_year, current_month))
347 }
348 None => {
349 self.create_shard(current_year, current_month)?;
351 self.update_symlink(current_year, current_month)?;
352 Ok((current_year, current_month))
353 }
354 }
355 }
356
357 pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
368 let shard_path = self.shard_path(year, month);
369 let content = fs::read(&shard_path)?;
370 let content_str = String::from_utf8_lossy(&content);
371
372 let event_count = content_str
374 .lines()
375 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
376 .count() as u64;
377
378 let byte_len = content.len() as u64;
379 let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
380
381 let manifest = ShardManifest {
382 shard_name: Self::shard_filename(year, month),
383 event_count,
384 byte_len,
385 file_hash,
386 };
387
388 let manifest_path = self.manifest_path(year, month);
389 fs::write(&manifest_path, manifest.to_string_repr())?;
390
391 Ok(manifest)
392 }
393
394 pub fn read_manifest(
401 &self,
402 year: i32,
403 month: u32,
404 ) -> Result<Option<ShardManifest>, ShardError> {
405 let manifest_path = self.manifest_path(year, month);
406 if !manifest_path.exists() {
407 return Ok(None);
408 }
409 let content = fs::read_to_string(&manifest_path)?;
410 Ok(ShardManifest::from_string_repr(&content))
411 }
412
413 pub fn append(
436 &self,
437 line: &str,
438 durable: bool,
439 lock_timeout: Duration,
440 ) -> Result<i64, ShardError> {
441 self.ensure_dirs()?;
442
443 let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
444
445 let (year, month) = self.rotate_if_needed()?;
447 let shard_path = self.shard_path(year, month);
448
449 let ts = self.next_timestamp()?;
451
452 let mut file = OpenOptions::new()
454 .create(true)
455 .append(true)
456 .open(&shard_path)?;
457
458 file.write_all(line.as_bytes())?;
459 file.flush()?;
460
461 if durable {
462 file.sync_data()?;
463 }
464
465 Ok(ts)
466 }
467
468 pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
477 let shard_path = self.shard_path(year, month);
478
479 let mut file = OpenOptions::new()
480 .create(true)
481 .append(true)
482 .open(&shard_path)?;
483
484 file.write_all(line.as_bytes())?;
485 file.flush()?;
486 Ok(())
487 }
488
489 pub fn read_clock(&self) -> Result<i64, ShardError> {
502 let path = self.clock_path();
503 if !path.exists() {
504 return Ok(0);
505 }
506 let content = fs::read_to_string(&path)?;
507 Ok(content.trim().parse::<i64>().unwrap_or(0))
508 }
509
510 pub fn next_timestamp(&self) -> Result<i64, ShardError> {
521 let last = self.read_clock()?;
522 let now = system_time_us();
523 let next = std::cmp::max(now, last + 1);
524 self.write_clock(next)?;
525 Ok(next)
526 }
527
528 fn write_clock(&self, value: i64) -> Result<(), ShardError> {
530 let path = self.clock_path();
531 if let Some(parent) = path.parent() {
532 fs::create_dir_all(parent)?;
533 }
534 fs::write(&path, value.to_string())?;
535 Ok(())
536 }
537
538 pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
557 let Some(active) = self.active_shard()? else {
558 return Ok(None);
559 };
560
561 let shard_path = self.shard_path(active.0, active.1);
562 recover_shard_torn_write(&shard_path)
563 }
564
565 pub fn replay(&self) -> Result<String, ShardError> {
578 let shards = self.list_shards()?;
579 let mut content = String::new();
580
581 for (year, month) in shards {
582 let path = self.shard_path(year, month);
583 let shard_content = fs::read_to_string(&path)?;
584 content.push_str(&shard_content);
585 }
586
587 Ok(content)
588 }
589
590 pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
596 let path = self.shard_path(year, month);
597 Ok(fs::read_to_string(&path)?)
598 }
599
600 pub fn total_content_len(&self) -> Result<usize, ShardError> {
610 let shards = self.list_shards()?;
611 let mut total = 0usize;
612 for (year, month) in shards {
613 let path = self.shard_path(year, month);
614 let meta = fs::metadata(&path)?;
615 total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
616 }
617 Ok(total)
618 }
619
620 pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
637 let shards = self.list_shards()?;
638 let mut cumulative: usize = 0;
639 let mut result = String::new();
640 let mut found_start = false;
641
642 for (year, month) in shards {
643 let path = self.shard_path(year, month);
644 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
645
646 let shard_end = cumulative.saturating_add(shard_len);
647
648 if shard_end <= offset {
649 cumulative = shard_end;
651 continue;
652 }
653
654 let shard_content = fs::read_to_string(&path)?;
656
657 if found_start {
658 result.push_str(&shard_content);
659 } else {
660 let within = offset.saturating_sub(cumulative);
662 let within = within.min(shard_content.len());
664 result.push_str(&shard_content[within..]);
665 found_start = true;
666 }
667
668 cumulative = shard_end;
669 }
670
671 Ok((result, cumulative))
672 }
673
674 pub fn read_content_range(
686 &self,
687 start_offset: usize,
688 end_offset: usize,
689 ) -> Result<String, ShardError> {
690 if start_offset >= end_offset {
691 return Ok(String::new());
692 }
693
694 let shards = self.list_shards()?;
695 let mut cumulative: usize = 0;
696 let mut result = String::new();
697
698 for (year, month) in shards {
699 let path = self.shard_path(year, month);
700 let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
701 let shard_end = cumulative.saturating_add(shard_len);
702
703 if shard_end <= start_offset {
704 cumulative = shard_end;
706 continue;
707 }
708
709 if cumulative >= end_offset {
710 break;
712 }
713
714 let shard_content = fs::read_to_string(&path)?;
715
716 let within_start = if cumulative < start_offset {
718 (start_offset - cumulative).min(shard_content.len())
719 } else {
720 0
721 };
722 let within_end = if shard_end > end_offset {
723 (end_offset - cumulative).min(shard_content.len())
724 } else {
725 shard_content.len()
726 };
727
728 result.push_str(&shard_content[within_start..within_end]);
729 cumulative = shard_end;
730 }
731
732 Ok(result)
733 }
734
735 pub fn event_count(&self) -> Result<u64, ShardError> {
741 let content = self.replay()?;
742 let count = content
743 .lines()
744 .filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
745 .count();
746 Ok(count as u64)
747 }
748
749 pub fn replay_lines(
757 &self,
758 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
759 self.replay_lines_from_offset(0)
760 }
761
762 pub fn replay_lines_from_offset(
770 &self,
771 offset: usize,
772 ) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
773 let shards = self.list_shards()?;
774 let bones_dir = self.bones_dir.clone();
775
776 Ok(ShardLineIterator {
777 shards,
778 current_shard_idx: 0,
779 current_reader: None,
780 cumulative_offset: 0,
781 bones_dir,
782 }
783 .skip_to_offset(offset))
784 }
785
786 pub fn is_empty(&self) -> Result<bool, ShardError> {
792 let shards = self.list_shards()?;
793 Ok(shards.is_empty())
794 }
795}
796
797struct ShardLineIterator {
802 shards: Vec<(i32, u32)>,
803 current_shard_idx: usize,
804 current_reader: Option<io::BufReader<fs::File>>,
805 cumulative_offset: usize,
806 bones_dir: PathBuf,
807}
808
809impl ShardLineIterator {
810 fn skip_to_offset(mut self, offset: usize) -> Self {
811 while self.current_shard_idx < self.shards.len() {
813 let (year, month) = self.shards[self.current_shard_idx];
814 let shard_path = self
815 .bones_dir
816 .join("events")
817 .join(ShardManager::shard_filename(year, month));
818 if let Ok(meta) = fs::metadata(shard_path) {
819 let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
820 if self.cumulative_offset + shard_len <= offset {
821 self.cumulative_offset += shard_len;
822 self.current_shard_idx += 1;
823 continue;
824 }
825 }
826 break;
827 }
828
829 self.cumulative_offset = offset;
834 self
835 }
836}
837
838impl Iterator for ShardLineIterator {
839 type Item = io::Result<(usize, String)>;
840
841 fn next(&mut self) -> Option<Self::Item> {
842 use std::io::{BufRead, Seek, SeekFrom};
843
844 loop {
845 if self.current_reader.is_none() {
846 if self.current_shard_idx >= self.shards.len() {
847 return None;
848 }
849
850 let (year, month) = self.shards[self.current_shard_idx];
851 let shard_path = self
852 .bones_dir
853 .join("events")
854 .join(ShardManager::shard_filename(year, month));
855 let mut file = match fs::File::open(shard_path) {
856 Ok(f) => f,
857 Err(e) => return Some(Err(e)),
858 };
859
860 let mut cumulative_before = 0;
862 for i in 0..self.current_shard_idx {
863 let (y, m) = self.shards[i];
864 let p = self
865 .bones_dir
866 .join("events")
867 .join(ShardManager::shard_filename(y, m));
868 if let Ok(meta) = fs::metadata(p) {
869 cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
870 }
871 }
872
873 if self.cumulative_offset > cumulative_before {
874 let within = self.cumulative_offset - cumulative_before;
875 if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
876 return Some(Err(e));
877 }
878 }
879
880 self.current_reader = Some(io::BufReader::new(file));
881 }
882
883 let reader = self
884 .current_reader
885 .as_mut()
886 .expect("reader was just set above");
887 let mut line = String::new();
888 let offset = self.cumulative_offset;
889
890 match reader.read_line(&mut line) {
891 Ok(0) => {
892 self.current_reader = None;
894 self.current_shard_idx += 1;
895 }
896 Ok(n) => {
897 self.cumulative_offset += n;
898 return Some(Ok((offset, line)));
899 }
900 Err(e) => return Some(Err(e)),
901 }
902 }
903 }
904}
905
906#[must_use]
912fn current_year_month() -> (i32, u32) {
913 let now = chrono::Utc::now();
914 (now.year(), now.month())
915}
916
917#[allow(clippy::cast_possible_truncation)]
919#[must_use]
920fn system_time_us() -> i64 {
921 SystemTime::now()
922 .duration_since(UNIX_EPOCH)
923 .map(|d| d.as_micros() as i64)
924 .unwrap_or(0)
925}
926
927fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
929 let stem = name.strip_suffix(".events")?;
930 if stem == "current" {
932 return None;
933 }
934 let (year_str, month_str) = stem.split_once('-')?;
935 let year: i32 = year_str.parse().ok()?;
936 let month: u32 = month_str.parse().ok()?;
937 if !(1..=12).contains(&month) {
938 return None;
939 }
940 Some((year, month))
941}
942
943fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
948 let metadata = fs::metadata(path)?;
949 let file_len = metadata.len();
950 if file_len == 0 {
951 return Ok(None);
952 }
953
954 let content = fs::read(path)?;
955
956 let last_newline = content.iter().rposition(|&b| b == b'\n');
958
959 if let Some(pos) = last_newline {
960 let expected_len = (pos + 1) as u64;
961 if expected_len < file_len {
962 let truncated = file_len - expected_len;
964 let file = OpenOptions::new().write(true).open(path)?;
965 file.set_len(expected_len)?;
966 Ok(Some(truncated))
967 } else {
968 Ok(None)
970 }
971 } else {
972 let file = OpenOptions::new().write(true).open(path)?;
975 file.set_len(0)?;
976 Ok(Some(file_len))
977 }
978}
979
980#[cfg(test)]
985mod tests {
986 use super::*;
987 use tempfile::TempDir;
988
989 fn setup() -> (TempDir, ShardManager) {
990 let tmp = TempDir::new().expect("tempdir");
991 let bones_dir = tmp.path().join(".bones");
992 let mgr = ShardManager::new(&bones_dir);
993 (tmp, mgr)
994 }
995
996 #[test]
1001 fn parse_valid_shard_filenames() {
1002 assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
1003 assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
1004 assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
1005 }
1006
1007 #[test]
1008 fn parse_invalid_shard_filenames() {
1009 assert_eq!(parse_shard_filename("current.events"), None);
1010 assert_eq!(parse_shard_filename("2026-13.events"), None); assert_eq!(parse_shard_filename("2026-00.events"), None); assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
1013 assert_eq!(parse_shard_filename("2026-01.manifest"), None);
1014 assert_eq!(parse_shard_filename(""), None);
1015 }
1016
1017 #[test]
1022 fn shard_manager_paths() {
1023 let mgr = ShardManager::new("/repo/.bones");
1024 assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
1025 assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
1026 assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
1027 assert_eq!(
1028 mgr.current_symlink(),
1029 PathBuf::from("/repo/.bones/events/current.events")
1030 );
1031 assert_eq!(
1032 mgr.shard_path(2026, 2),
1033 PathBuf::from("/repo/.bones/events/2026-02.events")
1034 );
1035 assert_eq!(
1036 mgr.manifest_path(2026, 1),
1037 PathBuf::from("/repo/.bones/events/2026-01.manifest")
1038 );
1039 }
1040
1041 #[test]
1042 fn shard_filename_format() {
1043 assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
1044 assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
1045 assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
1046 }
1047
1048 #[test]
1053 fn ensure_dirs_creates_directories() {
1054 let (_tmp, mgr) = setup();
1055 mgr.ensure_dirs().expect("should create dirs");
1056 assert!(mgr.events_dir().exists());
1057 assert!(mgr.bones_dir.join("cache").exists());
1058 }
1059
1060 #[test]
1061 fn ensure_dirs_is_idempotent() {
1062 let (_tmp, mgr) = setup();
1063 mgr.ensure_dirs().expect("first");
1064 mgr.ensure_dirs().expect("second");
1065 assert!(mgr.events_dir().exists());
1066 }
1067
1068 #[test]
1069 fn init_creates_first_shard() {
1070 let (_tmp, mgr) = setup();
1071 let (year, month) = mgr.init().expect("init");
1072
1073 let (expected_year, expected_month) = current_year_month();
1074 assert_eq!(year, expected_year);
1075 assert_eq!(month, expected_month);
1076
1077 let shard_path = mgr.shard_path(year, month);
1079 assert!(shard_path.exists());
1080 let content = fs::read_to_string(&shard_path).expect("read");
1081 assert!(content.starts_with("# bones event log v1"));
1082
1083 let symlink = mgr.current_symlink();
1085 assert!(symlink.exists() || symlink.symlink_metadata().is_ok());
1086 }
1087
1088 #[test]
1089 fn init_is_idempotent() {
1090 let (_tmp, mgr) = setup();
1091 let first = mgr.init().expect("first");
1092 let second = mgr.init().expect("second");
1093 assert_eq!(first, second);
1094 }
1095
1096 #[test]
1101 fn list_shards_empty() {
1102 let (_tmp, mgr) = setup();
1103 mgr.ensure_dirs().expect("dirs");
1104 let shards = mgr.list_shards().expect("list");
1105 assert!(shards.is_empty());
1106 }
1107
1108 #[test]
1109 fn list_shards_returns_sorted() {
1110 let (_tmp, mgr) = setup();
1111 mgr.ensure_dirs().expect("dirs");
1112
1113 mgr.create_shard(2026, 3).expect("create");
1115 mgr.create_shard(2026, 1).expect("create");
1116 mgr.create_shard(2026, 2).expect("create");
1117
1118 let shards = mgr.list_shards().expect("list");
1119 assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
1120 }
1121
1122 #[test]
1123 fn list_shards_skips_non_shard_files() {
1124 let (_tmp, mgr) = setup();
1125 mgr.ensure_dirs().expect("dirs");
1126 mgr.create_shard(2026, 1).expect("create");
1127
1128 fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
1130 fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
1131
1132 let shards = mgr.list_shards().expect("list");
1133 assert_eq!(shards, vec![(2026, 1)]);
1134 }
1135
1136 #[test]
1137 fn list_shards_no_events_dir() {
1138 let (_tmp, mgr) = setup();
1139 let shards = mgr.list_shards().expect("list");
1141 assert!(shards.is_empty());
1142 }
1143
1144 #[test]
1149 fn create_shard_writes_header() {
1150 let (_tmp, mgr) = setup();
1151 mgr.ensure_dirs().expect("dirs");
1152 let path = mgr.create_shard(2026, 2).expect("create");
1153
1154 let content = fs::read_to_string(&path).expect("read");
1155 assert!(content.starts_with("# bones event log v1"));
1156 assert!(content.contains("# fields:"));
1157 assert_eq!(content.lines().count(), 2);
1158 }
1159
1160 #[test]
1161 fn create_shard_idempotent() {
1162 let (_tmp, mgr) = setup();
1163 mgr.ensure_dirs().expect("dirs");
1164 let p1 = mgr.create_shard(2026, 2).expect("first");
1165 fs::write(&p1, "modified").expect("write");
1167 let p2 = mgr.create_shard(2026, 2).expect("second");
1169 assert_eq!(p1, p2);
1170 let content = fs::read_to_string(&p2).expect("read");
1171 assert_eq!(content, "modified");
1172 }
1173
1174 #[test]
1179 fn update_symlink_creates_link() {
1180 let (_tmp, mgr) = setup();
1181 mgr.ensure_dirs().expect("dirs");
1182 mgr.create_shard(2026, 2).expect("create");
1183 mgr.update_symlink(2026, 2).expect("symlink");
1184
1185 let symlink = mgr.current_symlink();
1186 assert!(symlink.symlink_metadata().is_ok());
1187
1188 #[cfg(unix)]
1189 {
1190 let target = fs::read_link(&symlink).expect("readlink");
1191 assert_eq!(target, PathBuf::from("2026-02.events"));
1192 }
1193 }
1194
1195 #[test]
1196 fn update_symlink_replaces_existing() {
1197 let (_tmp, mgr) = setup();
1198 mgr.ensure_dirs().expect("dirs");
1199 mgr.create_shard(2026, 1).expect("create");
1200 mgr.create_shard(2026, 2).expect("create");
1201
1202 mgr.update_symlink(2026, 1).expect("first");
1203 mgr.update_symlink(2026, 2).expect("second");
1204
1205 #[cfg(unix)]
1206 {
1207 let target = fs::read_link(&mgr.current_symlink()).expect("readlink");
1208 assert_eq!(target, PathBuf::from("2026-02.events"));
1209 }
1210 }
1211
1212 #[test]
1217 fn clock_starts_at_zero() {
1218 let (_tmp, mgr) = setup();
1219 mgr.ensure_dirs().expect("dirs");
1220 let ts = mgr.read_clock().expect("read");
1221 assert_eq!(ts, 0);
1222 }
1223
1224 #[test]
1225 fn clock_is_monotonic() {
1226 let (_tmp, mgr) = setup();
1227 mgr.ensure_dirs().expect("dirs");
1228 let t1 = mgr.next_timestamp().expect("t1");
1229 let t2 = mgr.next_timestamp().expect("t2");
1230 let t3 = mgr.next_timestamp().expect("t3");
1231 assert!(t2 > t1);
1232 assert!(t3 > t2);
1233 }
1234
1235 #[test]
1236 fn clock_reads_back_written_value() {
1237 let (_tmp, mgr) = setup();
1238 mgr.ensure_dirs().expect("dirs");
1239 mgr.write_clock(42_000_000).expect("write");
1240 let ts = mgr.read_clock().expect("read");
1241 assert_eq!(ts, 42_000_000);
1242 }
1243
1244 #[test]
1245 fn clock_never_goes_backward() {
1246 let (_tmp, mgr) = setup();
1247 mgr.ensure_dirs().expect("dirs");
1248
1249 let future = system_time_us() + 10_000_000;
1251 mgr.write_clock(future).expect("write");
1252
1253 let next = mgr.next_timestamp().expect("next");
1254 assert!(next > future, "clock should advance past future value");
1255 }
1256
1257 #[test]
1262 fn append_raw_adds_line() {
1263 let (_tmp, mgr) = setup();
1264 mgr.ensure_dirs().expect("dirs");
1265 mgr.create_shard(2026, 2).expect("create");
1266
1267 mgr.append_raw(2026, 2, "event line 1\n").expect("append");
1268 mgr.append_raw(2026, 2, "event line 2\n").expect("append");
1269
1270 let content = mgr.read_shard(2026, 2).expect("read");
1271 assert!(content.contains("event line 1"));
1272 assert!(content.contains("event line 2"));
1273 }
1274
1275 #[test]
1276 fn append_with_lock() {
1277 let (_tmp, mgr) = setup();
1278 mgr.init().expect("init");
1279
1280 let _ts = mgr
1281 .append("test event line\n", false, Duration::from_secs(1))
1282 .expect("append");
1283
1284 let content = mgr.replay().expect("replay");
1285 assert!(content.contains("test event line"));
1286 }
1287
1288 #[test]
1289 fn append_returns_monotonic_timestamps() {
1290 let (_tmp, mgr) = setup();
1291 mgr.init().expect("init");
1292
1293 let t1 = mgr
1294 .append("line1\n", false, Duration::from_secs(1))
1295 .expect("t1");
1296 let t2 = mgr
1297 .append("line2\n", false, Duration::from_secs(1))
1298 .expect("t2");
1299
1300 assert!(t2 > t1);
1301 }
1302
1303 #[test]
1308 fn recover_clean_file() {
1309 let (_tmp, mgr) = setup();
1310 mgr.init().expect("init");
1311
1312 let (y, m) = current_year_month();
1313 mgr.append_raw(y, m, "complete line\n").expect("append");
1314
1315 let recovered = mgr.recover_torn_writes().expect("recover");
1316 assert_eq!(recovered, None);
1317 }
1318
1319 #[test]
1320 fn recover_torn_write_truncates() {
1321 let (_tmp, mgr) = setup();
1322 let (y, m) = mgr.init().expect("init");
1323 let shard_path = mgr.shard_path(y, m);
1324
1325 {
1327 let mut f = OpenOptions::new()
1328 .append(true)
1329 .open(&shard_path)
1330 .expect("open");
1331 f.write_all(b"complete line\npartial line without newline")
1332 .expect("write");
1333 f.flush().expect("flush");
1334 }
1335
1336 let recovered = mgr.recover_torn_writes().expect("recover");
1337 assert!(recovered.is_some());
1338
1339 let truncated = recovered.expect("checked is_some");
1340 assert_eq!(truncated, "partial line without newline".len() as u64);
1341
1342 let content = fs::read_to_string(&shard_path).expect("read");
1344 assert!(content.ends_with('\n'));
1345 assert!(content.contains("complete line"));
1346 assert!(!content.contains("partial line without newline"));
1347 }
1348
1349 #[test]
1350 fn recover_no_newline_at_all() {
1351 let (_tmp, mgr) = setup();
1352 let (y, m) = mgr.init().expect("init");
1353 let shard_path = mgr.shard_path(y, m);
1354
1355 fs::write(&shard_path, "no newlines here").expect("write");
1357
1358 let recovered = mgr.recover_torn_writes().expect("recover");
1359 assert_eq!(recovered, Some("no newlines here".len() as u64));
1360
1361 let content = fs::read_to_string(&shard_path).expect("read");
1363 assert!(content.is_empty());
1364 }
1365
1366 #[test]
1367 fn recover_empty_file() {
1368 let (_tmp, mgr) = setup();
1369 let (y, m) = mgr.init().expect("init");
1370 let shard_path = mgr.shard_path(y, m);
1371
1372 fs::write(&shard_path, "").expect("write");
1374
1375 let recovered = mgr.recover_torn_writes().expect("recover");
1376 assert_eq!(recovered, None);
1377 }
1378
1379 #[test]
1380 fn recover_no_active_shard() {
1381 let (_tmp, mgr) = setup();
1382 mgr.ensure_dirs().expect("dirs");
1383
1384 let recovered = mgr.recover_torn_writes().expect("recover");
1385 assert_eq!(recovered, None);
1386 }
1387
1388 #[test]
1393 fn replay_empty_repo() {
1394 let (_tmp, mgr) = setup();
1395 mgr.ensure_dirs().expect("dirs");
1396 let content = mgr.replay().expect("replay");
1397 assert!(content.is_empty());
1398 }
1399
1400 #[test]
1401 fn replay_single_shard() {
1402 let (_tmp, mgr) = setup();
1403 mgr.ensure_dirs().expect("dirs");
1404 mgr.create_shard(2026, 1).expect("create");
1405 mgr.append_raw(2026, 1, "event-a\n").expect("append");
1406
1407 let content = mgr.replay().expect("replay");
1408 assert!(content.contains("event-a"));
1409 }
1410
1411 #[test]
1412 fn replay_multiple_shards_in_order() {
1413 let (_tmp, mgr) = setup();
1414 mgr.ensure_dirs().expect("dirs");
1415
1416 mgr.create_shard(2026, 1).expect("create");
1417 mgr.create_shard(2026, 2).expect("create");
1418 mgr.create_shard(2026, 3).expect("create");
1419
1420 mgr.append_raw(2026, 1, "event-jan\n").expect("append");
1421 mgr.append_raw(2026, 2, "event-feb\n").expect("append");
1422 mgr.append_raw(2026, 3, "event-mar\n").expect("append");
1423
1424 let content = mgr.replay().expect("replay");
1425
1426 let jan_pos = content.find("event-jan").expect("jan");
1428 let feb_pos = content.find("event-feb").expect("feb");
1429 let mar_pos = content.find("event-mar").expect("mar");
1430 assert!(jan_pos < feb_pos);
1431 assert!(feb_pos < mar_pos);
1432 }
1433
1434 #[test]
1439 fn event_count_empty() {
1440 let (_tmp, mgr) = setup();
1441 mgr.ensure_dirs().expect("dirs");
1442 assert_eq!(mgr.event_count().expect("count"), 0);
1443 }
1444
1445 #[test]
1446 fn event_count_excludes_comments_and_blanks() {
1447 let (_tmp, mgr) = setup();
1448 mgr.ensure_dirs().expect("dirs");
1449 mgr.create_shard(2026, 1).expect("create");
1450 mgr.append_raw(2026, 1, "event1\n").expect("append");
1452 mgr.append_raw(2026, 1, "event2\n").expect("append");
1453 mgr.append_raw(2026, 1, "\n").expect("blank");
1454
1455 assert_eq!(mgr.event_count().expect("count"), 2);
1456 }
1457
1458 #[test]
1463 fn is_empty_no_shards() {
1464 let (_tmp, mgr) = setup();
1465 mgr.ensure_dirs().expect("dirs");
1466 assert!(mgr.is_empty().expect("empty"));
1467 }
1468
1469 #[test]
1470 fn is_empty_with_shards() {
1471 let (_tmp, mgr) = setup();
1472 mgr.init().expect("init");
1473 assert!(!mgr.is_empty().expect("empty"));
1474 }
1475
1476 #[test]
1481 fn write_and_read_manifest() {
1482 let (_tmp, mgr) = setup();
1483 mgr.ensure_dirs().expect("dirs");
1484 mgr.create_shard(2026, 1).expect("create");
1485 mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
1486 mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
1487
1488 let written = mgr.write_manifest(2026, 1).expect("write manifest");
1489 assert_eq!(written.shard_name, "2026-01.events");
1490 assert_eq!(written.event_count, 2);
1491 assert!(written.byte_len > 0);
1492 assert!(written.file_hash.starts_with("blake3:"));
1493
1494 let read = mgr
1495 .read_manifest(2026, 1)
1496 .expect("read")
1497 .expect("should exist");
1498 assert_eq!(read, written);
1499 }
1500
1501 #[test]
1502 fn manifest_roundtrip() {
1503 let manifest = ShardManifest {
1504 shard_name: "2026-01.events".into(),
1505 event_count: 42,
1506 byte_len: 12345,
1507 file_hash: "blake3:abcdef0123456789".into(),
1508 };
1509
1510 let repr = manifest.to_string_repr();
1511 let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
1512 assert_eq!(parsed, manifest);
1513 }
1514
1515 #[test]
1516 fn read_manifest_missing() {
1517 let (_tmp, mgr) = setup();
1518 mgr.ensure_dirs().expect("dirs");
1519 let result = mgr.read_manifest(2026, 1).expect("read");
1520 assert!(result.is_none());
1521 }
1522
1523 #[test]
1524 fn manifest_event_count_excludes_comments() {
1525 let (_tmp, mgr) = setup();
1526 mgr.ensure_dirs().expect("dirs");
1527 mgr.create_shard(2026, 1).expect("create");
1528 mgr.append_raw(2026, 1, "event1\n").expect("append");
1530
1531 let manifest = mgr.write_manifest(2026, 1).expect("manifest");
1532 assert_eq!(manifest.event_count, 1);
1534 }
1535
1536 #[test]
1541 fn rotate_creates_shard_if_none_exist() {
1542 let (_tmp, mgr) = setup();
1543 mgr.ensure_dirs().expect("dirs");
1544
1545 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1546 let (ey, em) = current_year_month();
1547 assert_eq!((y, m), (ey, em));
1548
1549 assert!(mgr.shard_path(y, m).exists());
1550 }
1551
1552 #[test]
1553 fn rotate_no_op_same_month() {
1554 let (_tmp, mgr) = setup();
1555 let (y, m) = mgr.init().expect("init");
1556
1557 let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
1558 assert_eq!((y, m), (y2, m2));
1559 }
1560
1561 #[test]
1562 fn rotate_different_month_seals_and_creates() {
1563 let (_tmp, mgr) = setup();
1564 mgr.ensure_dirs().expect("dirs");
1565
1566 mgr.create_shard(2025, 11).expect("create");
1568 mgr.append_raw(2025, 11, "old event\n").expect("append");
1569 mgr.update_symlink(2025, 11).expect("symlink");
1570
1571 let (y, m) = mgr.rotate_if_needed().expect("rotate");
1573 let (ey, em) = current_year_month();
1574 assert_eq!((y, m), (ey, em));
1575
1576 assert!(mgr.manifest_path(2025, 11).exists());
1578
1579 assert!(mgr.shard_path(ey, em).exists());
1581
1582 #[cfg(unix)]
1584 {
1585 let target = fs::read_link(mgr.current_symlink()).expect("readlink");
1586 assert_eq!(target, PathBuf::from(ShardManager::shard_filename(ey, em)));
1587 }
1588 }
1589
1590 #[test]
1595 fn frozen_shard_not_modified_by_append() {
1596 let (_tmp, mgr) = setup();
1597 mgr.ensure_dirs().expect("dirs");
1598
1599 mgr.create_shard(2025, 6).expect("create");
1601 mgr.append_raw(2025, 6, "old event\n").expect("append");
1602 let old_content = mgr.read_shard(2025, 6).expect("read");
1603
1604 mgr.init().expect("init");
1606
1607 mgr.append("new event\n", false, Duration::from_secs(1))
1609 .expect("append");
1610
1611 let after_content = mgr.read_shard(2025, 6).expect("read");
1613 assert_eq!(old_content, after_content);
1614 }
1615
1616 #[test]
1621 fn system_time_us_is_positive() {
1622 let ts = system_time_us();
1623 assert!(ts > 0, "system time should be positive: {ts}");
1624 }
1625
1626 #[test]
1627 fn system_time_us_is_reasonable() {
1628 let ts = system_time_us();
1629 let jan_2020_us: i64 = 1_577_836_800_000_000;
1631 assert!(ts > jan_2020_us, "system time too small: {ts}");
1632 }
1633
1634 #[test]
1639 fn total_content_len_empty_repo() {
1640 let (_tmp, mgr) = setup();
1641 mgr.ensure_dirs().expect("dirs");
1642 let len = mgr.total_content_len().expect("len");
1643 assert_eq!(len, 0);
1644 }
1645
1646 #[test]
1647 fn total_content_len_single_shard() {
1648 let (_tmp, mgr) = setup();
1649 mgr.ensure_dirs().expect("dirs");
1650 mgr.create_shard(2026, 1).expect("create");
1651 mgr.append_raw(2026, 1, "line1\n").expect("append");
1652 mgr.append_raw(2026, 1, "line2\n").expect("append");
1653
1654 let full = mgr.replay().expect("replay");
1655 let len = mgr.total_content_len().expect("len");
1656 assert_eq!(len, full.len());
1657 }
1658
1659 #[test]
1660 fn total_content_len_multiple_shards() {
1661 let (_tmp, mgr) = setup();
1662 mgr.ensure_dirs().expect("dirs");
1663 mgr.create_shard(2026, 1).expect("shard 1");
1664 mgr.create_shard(2026, 2).expect("shard 2");
1665 mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
1666 mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
1667
1668 let full = mgr.replay().expect("replay");
1669 let len = mgr.total_content_len().expect("len");
1670 assert_eq!(len, full.len(), "total_content_len must match replay len");
1671 }
1672
1673 #[test]
1678 fn read_content_range_empty_range() {
1679 let (_tmp, mgr) = setup();
1680 mgr.ensure_dirs().expect("dirs");
1681 mgr.create_shard(2026, 1).expect("create");
1682 mgr.append_raw(2026, 1, "event\n").expect("append");
1683
1684 let result = mgr.read_content_range(5, 5).expect("range");
1685 assert!(result.is_empty());
1686 }
1687
1688 #[test]
1689 fn read_content_range_within_single_shard() {
1690 let (_tmp, mgr) = setup();
1691 mgr.ensure_dirs().expect("dirs");
1692 mgr.create_shard(2026, 1).expect("create");
1693 mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
1695
1696 let full = mgr.replay().expect("replay");
1697 let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
1699 let range = mgr.read_content_range(pos, pos + 7).expect("range");
1700 assert_eq!(range, "ABCDEF\n");
1701 }
1702
1703 #[test]
1704 fn read_content_range_across_shard_boundary() {
1705 let (_tmp, mgr) = setup();
1706 mgr.ensure_dirs().expect("dirs");
1707 mgr.create_shard(2026, 1).expect("shard 1");
1708 mgr.create_shard(2026, 2).expect("shard 2");
1709 mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
1710 mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
1711
1712 let full = mgr.replay().expect("replay");
1713 let range = mgr.read_content_range(0, full.len()).expect("full range");
1715 assert_eq!(range, full);
1716 }
1717
1718 #[test]
1719 fn read_content_range_beyond_end() {
1720 let (_tmp, mgr) = setup();
1721 mgr.ensure_dirs().expect("dirs");
1722 mgr.create_shard(2026, 1).expect("create");
1723 mgr.append_raw(2026, 1, "event\n").expect("append");
1724
1725 let full = mgr.replay().expect("replay");
1726 let range = mgr
1728 .read_content_range(full.len(), full.len() + 100)
1729 .expect("beyond end");
1730 assert!(range.is_empty());
1731 }
1732
1733 #[test]
1738 fn replay_from_offset_zero_returns_full_content() {
1739 let (_tmp, mgr) = setup();
1740 mgr.ensure_dirs().expect("dirs");
1741 mgr.create_shard(2026, 1).expect("create");
1742 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1743 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1744
1745 let full = mgr.replay().expect("full replay");
1746 let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
1747 assert_eq!(from_zero, full);
1748 assert_eq!(total_len, full.len());
1749 }
1750
1751 #[test]
1752 fn replay_from_offset_skips_content_before_cursor() {
1753 let (_tmp, mgr) = setup();
1754 mgr.ensure_dirs().expect("dirs");
1755 mgr.create_shard(2026, 1).expect("create");
1756 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1757 mgr.append_raw(2026, 1, "event2\n").expect("e2");
1758 mgr.append_raw(2026, 1, "event3\n").expect("e3");
1759
1760 let full = mgr.replay().expect("full replay");
1761
1762 let cursor = full.find("event3").expect("event3 in content");
1764 let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
1765 assert_eq!(tail, "event3\n");
1766 assert_eq!(total_len, full.len());
1767 }
1768
1769 #[test]
1770 fn replay_from_offset_at_end_returns_empty() {
1771 let (_tmp, mgr) = setup();
1772 mgr.ensure_dirs().expect("dirs");
1773 mgr.create_shard(2026, 1).expect("create");
1774 mgr.append_raw(2026, 1, "event1\n").expect("e1");
1775
1776 let full = mgr.replay().expect("full replay");
1777 let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
1778 assert!(tail.is_empty(), "tail should be empty at end of content");
1779 assert_eq!(total_len, full.len());
1780 }
1781
1782 #[test]
1783 fn replay_from_offset_skips_sealed_shards_before_cursor() {
1784 let (_tmp, mgr) = setup();
1785 mgr.ensure_dirs().expect("dirs");
1786
1787 mgr.create_shard(2026, 1).expect("jan");
1789 mgr.create_shard(2026, 2).expect("feb");
1790 mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
1791 mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
1792 mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
1793 mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
1794
1795 let full = mgr.replay().expect("full replay");
1796 let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
1797
1798 let (tail, total_len) = mgr
1800 .replay_from_offset(jan_shard_len)
1801 .expect("from feb start");
1802 assert!(
1803 !tail.contains("jan-event"),
1804 "jan events should not appear in tail"
1805 );
1806 assert!(tail.contains("feb-event1"), "feb events must be in tail");
1807 assert!(tail.contains("feb-event2"), "feb events must be in tail");
1808 assert_eq!(total_len, full.len());
1809 }
1810
1811 #[test]
1812 fn replay_from_offset_total_len_equals_total_content_len() {
1813 let (_tmp, mgr) = setup();
1814 mgr.ensure_dirs().expect("dirs");
1815 mgr.create_shard(2026, 1).expect("shard 1");
1816 mgr.create_shard(2026, 2).expect("shard 2");
1817 mgr.append_raw(2026, 1, "event-a\n").expect("ea");
1818 mgr.append_raw(2026, 2, "event-b\n").expect("eb");
1819
1820 let total = mgr.total_content_len().expect("total_content_len");
1821 let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
1822 assert_eq!(
1823 total, replay_total,
1824 "total_content_len and replay_from_offset total must agree"
1825 );
1826 }
1827}