1use std::cmp::Ordering;
17use std::iter::Peekable;
18use std::path::Path;
19use std::sync::Arc;
20use std::vec;
21
22use itertools::Itertools;
23use time::OffsetDateTime;
24use tracing::{debug, debug_span, error};
25
26use crate::compress::snappy::{Compressor, Decompressor};
27use crate::counters::Counter;
28use crate::entry::KindMeta;
29use crate::monitor::Monitor;
30use crate::stats::IndexReadStats;
31use crate::transport::local::LocalTransport;
32use crate::unix_time::FromUnixAndNanos;
33use crate::*;
34
35pub const HUNKS_PER_SUBDIR: u32 = 10_000;
36
37#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
43pub struct IndexEntry {
44 pub apath: Apath,
46
47 pub kind: Kind,
49
50 #[serde(default)]
52 pub mtime: i64,
53
54 #[serde(default)]
56 pub unix_mode: UnixMode,
57
58 #[serde(default, flatten, skip_serializing_if = "Owner::is_none")]
60 pub owner: Owner,
61
62 #[serde(default)]
72 #[serde(skip_serializing_if = "crate::misc::zero_u32")]
73 pub mtime_nanos: u32,
74
75 #[serde(default)]
77 #[serde(skip_serializing_if = "Vec::is_empty")]
78 pub addrs: Vec<blockdir::Address>,
79
80 #[serde(default)]
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub target: Option<String>,
84}
85impl From<IndexEntry> for EntryValue {
88 fn from(index_entry: IndexEntry) -> EntryValue {
89 let kind_meta = match index_entry.kind {
90 Kind::File => KindMeta::File {
91 size: index_entry.addrs.iter().map(|a| a.len).sum(),
92 },
93 Kind::Symlink => KindMeta::Symlink {
94 target: index_entry
96 .target
97 .expect("symlink entry should have a target"),
98 },
99 Kind::Dir => KindMeta::Dir,
100 Kind::Unknown => KindMeta::Unknown,
101 };
102 EntryValue {
103 apath: index_entry.apath,
104 kind_meta,
105 mtime: OffsetDateTime::from_unix_seconds_and_nanos(
106 index_entry.mtime,
107 index_entry.mtime_nanos,
108 ),
109 unix_mode: index_entry.unix_mode,
110 owner: index_entry.owner,
111 }
112 }
113}
114
115impl EntryTrait for IndexEntry {
116 fn apath(&self) -> &Apath {
118 &self.apath
119 }
120
121 #[inline]
122 fn kind(&self) -> Kind {
123 self.kind
124 }
125
126 #[inline]
127 fn mtime(&self) -> OffsetDateTime {
128 OffsetDateTime::from_unix_seconds_and_nanos(self.mtime, self.mtime_nanos)
129 }
130
131 fn size(&self) -> Option<u64> {
133 Some(self.addrs.iter().map(|a| a.len).sum())
134 }
135
136 #[inline]
138 fn symlink_target(&self) -> Option<&str> {
139 self.target.as_deref()
140 }
141
142 fn unix_mode(&self) -> UnixMode {
143 self.unix_mode
144 }
145
146 fn owner(&self) -> &Owner {
147 &self.owner
148 }
149}
150
151impl IndexEntry {
152 pub(crate) fn metadata_from(source: &EntryValue) -> IndexEntry {
156 let mtime = source.mtime();
157 assert_eq!(
158 source.symlink_target().is_some(),
159 source.kind() == Kind::Symlink
160 );
161 IndexEntry {
162 apath: source.apath().clone(),
163 kind: source.kind(),
164 addrs: Vec::new(),
165 target: source.symlink_target().map(|t| t.to_owned()),
166 mtime: mtime.unix_timestamp(),
167 mtime_nanos: mtime.nanosecond(),
168 unix_mode: source.unix_mode(),
169 owner: source.owner().to_owned(),
170 }
171 }
172}
173
174pub struct IndexWriter {
179 transport: Arc<dyn Transport>,
181
182 entries: Vec<IndexEntry>,
184
185 sequence: u32,
187
188 hunks_written: usize,
190
191 check_order: apath::DebugCheckOrder,
195
196 compressor: Compressor,
197}
198
199impl IndexWriter {
201 pub fn new(transport: Arc<dyn Transport>) -> IndexWriter {
203 IndexWriter {
204 transport,
205 entries: Vec::new(),
206 sequence: 0,
207 hunks_written: 0,
208 check_order: apath::DebugCheckOrder::new(),
209 compressor: Compressor::new(),
210 }
211 }
212
213 pub fn finish(mut self, monitor: Arc<dyn Monitor>) -> Result<usize> {
215 self.finish_hunk(monitor)?;
216 Ok(self.hunks_written)
217 }
218
219 pub(crate) fn push_entry(&mut self, entry: IndexEntry) {
226 self.entries.push(entry);
227 }
228
229 pub(crate) fn append_entries(&mut self, entries: &mut Vec<IndexEntry>) {
230 self.entries.append(entries);
231 }
232
233 pub fn finish_hunk(&mut self, monitor: Arc<dyn Monitor>) -> Result<()> {
239 if self.entries.is_empty() {
240 return Ok(());
241 }
242 self.entries.sort_unstable_by(|a, b| {
243 debug_assert!(a.apath != b.apath);
244 a.apath.cmp(&b.apath)
245 });
246 self.check_order.check(&self.entries[0].apath);
247 if self.entries.len() > 1 {
248 self.check_order.check(&self.entries.last().unwrap().apath);
249 }
250 let relpath = hunk_relpath(self.sequence);
251 let json = serde_json::to_vec(&self.entries)?;
252 if (self.sequence % HUNKS_PER_SUBDIR) == 0 {
253 self.transport.create_dir(&subdir_relpath(self.sequence))?;
254 }
255 let compressed_bytes = self.compressor.compress(&json)?;
256 self.transport.write_file(&relpath, &compressed_bytes)?;
257 self.hunks_written += 1;
258 monitor.count(Counter::IndexWrites, 1);
259 monitor.count(Counter::IndexWriteCompressedBytes, compressed_bytes.len());
260 monitor.count(Counter::IndexWriteUncompressedBytes, json.len());
261 self.entries.clear(); self.sequence += 1;
263 Ok(())
264 }
265}
266
267fn subdir_relpath(hunk_number: u32) -> String {
269 format!("{:05}", hunk_number / HUNKS_PER_SUBDIR)
270}
271
272#[mutants::skip] fn hunk_relpath(hunk_number: u32) -> String {
275 format!("{:05}/{:09}", hunk_number / HUNKS_PER_SUBDIR, hunk_number)
276}
277
278#[derive(Debug, Clone)]
280pub struct IndexRead {
281 transport: Arc<dyn Transport>,
283}
284
285impl IndexRead {
286 #[allow(unused)]
287 pub(crate) fn open_path(path: &Path) -> IndexRead {
288 IndexRead::open(Arc::new(LocalTransport::new(path)))
289 }
290
291 pub(crate) fn open(transport: Arc<dyn Transport>) -> IndexRead {
292 IndexRead { transport }
293 }
294
295 pub fn iter_entries(self) -> IndexEntryIter<IndexHunkIter> {
297 IndexEntryIter::new(self.iter_hunks(), Apath::root(), Exclude::nothing())
299 }
300
301 pub fn iter_hunks(&self) -> IndexHunkIter {
303 let _span = debug_span!("iter_hunks", ?self.transport).entered();
304 let subdirs = self
306 .transport
307 .list_dir("")
308 .expect("list index dir") .dirs
310 .into_iter()
311 .sorted()
312 .collect_vec();
313 debug!(?subdirs);
314 let hunks = subdirs
315 .into_iter()
316 .filter_map(|dir| self.transport.list_dir(&dir).ok())
317 .flat_map(|list| list.files)
318 .filter_map(|f| f.parse::<u32>().ok())
319 .sorted()
320 .collect_vec();
321 debug!(?hunks);
322 IndexHunkIter {
323 hunks: hunks.into_iter(),
324 transport: Arc::clone(&self.transport),
325 decompressor: Decompressor::new(),
326 stats: IndexReadStats::default(),
327 after: None,
328 }
329 }
330}
331
332pub struct IndexHunkIter {
336 hunks: std::vec::IntoIter<u32>,
337 transport: Arc<dyn Transport>,
339 decompressor: Decompressor,
340 pub stats: IndexReadStats,
341 after: Option<Apath>,
343}
344
345impl Iterator for IndexHunkIter {
346 type Item = Vec<IndexEntry>;
347
348 fn next(&mut self) -> Option<Self::Item> {
349 loop {
350 let hunk_number = self.hunks.next()?;
351 let entries = match self.read_next_hunk(hunk_number) {
352 Ok(None) => return None,
353 Ok(Some(entries)) => entries,
354 Err(err) => {
355 self.stats.errors += 1;
356 error!("Error reading index hunk {hunk_number:?}: {err}");
357 continue;
358 }
359 };
360 if let Some(ref after) = self.after {
361 if let Some(last) = entries.last() {
362 if last.apath <= *after {
363 continue;
364 }
365 }
366 if let Some(first) = entries.first() {
367 if first.apath > *after {
368 self.after = None; return Some(entries);
370 }
371 }
372 let idx = match entries.binary_search_by_key(&after, |entry| &entry.apath) {
373 Ok(idx) => idx + 1, Err(idx) => idx, };
376 return Some(Vec::from(&entries[idx..]));
377 }
378 if !entries.is_empty() {
379 return Some(entries);
380 }
381 }
382 }
383}
384
385impl IndexHunkIter {
386 #[must_use]
388 pub fn advance_to_after(self, apath: &Apath) -> Self {
389 IndexHunkIter {
390 after: Some(apath.clone()),
391 ..self
392 }
393 }
394
395 fn read_next_hunk(&mut self, hunk_number: u32) -> Result<Option<Vec<IndexEntry>>> {
396 let path = hunk_relpath(hunk_number);
397 let compressed_bytes = match self.transport.read_file(&path) {
398 Ok(b) => b,
399 Err(err) if err.is_not_found() => {
400 return Ok(None);
404 }
405 Err(source) => return Err(Error::Transport { source }),
406 };
407 self.stats.index_hunks += 1;
408 self.stats.compressed_index_bytes += compressed_bytes.len() as u64;
409 let index_bytes = self.decompressor.decompress(&compressed_bytes)?;
410 self.stats.uncompressed_index_bytes += index_bytes.len() as u64;
411 let entries: Vec<IndexEntry> =
412 serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson {
413 path: path.clone(),
414 source,
415 })?;
416 if entries.is_empty() {
417 }
419 Ok(Some(entries))
420 }
421}
422
423pub struct IndexEntryIter<HI: Iterator<Item = Vec<IndexEntry>>> {
426 buffered_entries: Peekable<vec::IntoIter<IndexEntry>>,
429 hunk_iter: HI,
430 subtree: Apath,
431 exclude: Exclude,
432}
433
434impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
435 pub(crate) fn new(hunk_iter: HI, subtree: Apath, exclude: Exclude) -> Self {
436 IndexEntryIter {
437 buffered_entries: Vec::<IndexEntry>::new().into_iter().peekable(),
438 hunk_iter,
439 subtree,
440 exclude,
441 }
442 }
443}
444
445impl<HI: Iterator<Item = Vec<IndexEntry>>> Iterator for IndexEntryIter<HI> {
446 type Item = IndexEntry;
447
448 fn next(&mut self) -> Option<IndexEntry> {
449 loop {
450 if let Some(entry) = self.buffered_entries.next() {
451 if !self.subtree.is_prefix_of(&entry.apath) {
455 continue;
456 }
457 if self.exclude.matches(&entry.apath) {
458 continue;
459 }
460 return Some(entry);
461 }
462 if !self.refill_entry_buffer_or_warn() {
463 return None;
464 }
465 }
466 }
467}
468
469impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
470 pub fn advance_to(&mut self, apath: &Apath) -> Option<IndexEntry> {
477 loop {
480 if let Some(cand) = self.buffered_entries.peek() {
481 match cand.apath.cmp(apath) {
482 Ordering::Less => {
483 self.buffered_entries.next().unwrap();
485 }
486 Ordering::Equal => {
487 return Some(self.buffered_entries.next().unwrap());
488 }
489 Ordering::Greater => {
490 return None;
492 }
493 }
494 } else if !self.refill_entry_buffer_or_warn() {
495 return None;
496 }
497 }
498 }
499
500 fn refill_entry_buffer_or_warn(&mut self) -> bool {
504 assert!(
505 self.buffered_entries.next().is_none(),
506 "refill_entry_buffer called with non-empty buffer"
507 );
508 if let Some(new_entries) = self.hunk_iter.next() {
509 self.buffered_entries = new_entries.into_iter().peekable();
510 true
511 } else {
512 false
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use tempfile::TempDir;
520
521 use crate::monitor::test::TestMonitor;
522
523 use super::*;
524
525 fn setup() -> (TempDir, IndexWriter) {
526 let testdir = TempDir::new().unwrap();
527 let ib = IndexWriter::new(Arc::new(LocalTransport::new(testdir.path())));
528 (testdir, ib)
529 }
530
531 fn sample_entry(apath: &str) -> IndexEntry {
532 IndexEntry {
533 apath: apath.into(),
534 mtime: 1_461_736_377,
535 mtime_nanos: 0,
536 kind: Kind::File,
537 addrs: vec![],
538 target: None,
539 unix_mode: Default::default(),
540 owner: Default::default(),
541 }
542 }
543
544 #[test]
545 fn serialize_index() {
546 let entries = [IndexEntry {
547 apath: "/a/b".into(),
548 mtime: 1_461_736_377,
549 mtime_nanos: 0,
550 kind: Kind::File,
551 addrs: vec![],
552 target: None,
553 unix_mode: Default::default(),
554 owner: Default::default(),
555 }];
556 let index_json = serde_json::to_string(&entries).unwrap();
557 println!("{index_json}");
558 assert_eq!(
559 index_json,
560 "[{\"apath\":\"/a/b\",\
561 \"kind\":\"File\",\
562 \"mtime\":1461736377,\
563 \"unix_mode\":null}]"
564 );
565 }
566
567 #[test]
568 fn index_builder_sorts_entries() {
569 let (_testdir, mut ib) = setup();
570 ib.push_entry(sample_entry("/zzz"));
571 ib.push_entry(sample_entry("/aaa"));
572 ib.finish_hunk(TestMonitor::arc()).unwrap();
573 }
574
575 #[test]
576 #[should_panic]
577 fn index_builder_checks_names() {
578 let (_testdir, mut ib) = setup();
579 ib.push_entry(sample_entry("../escapecat"));
580 ib.finish_hunk(TestMonitor::arc()).unwrap();
581 }
582
583 #[test]
584 #[cfg(debug_assertions)]
585 #[should_panic]
586 fn no_duplicate_paths() {
587 let (_testdir, mut ib) = setup();
588 ib.push_entry(sample_entry("/again"));
589 ib.push_entry(sample_entry("/again"));
590 ib.finish_hunk(TestMonitor::arc()).unwrap();
591 }
592
593 #[test]
594 #[cfg(debug_assertions)]
595 #[should_panic]
596 fn no_duplicate_paths_across_hunks() {
597 let (_testdir, mut ib) = setup();
598 ib.push_entry(sample_entry("/again"));
599 ib.finish_hunk(TestMonitor::arc()).unwrap();
600 ib.push_entry(sample_entry("/again"));
601 ib.finish_hunk(TestMonitor::arc()).unwrap();
602 }
603
604 #[test]
605 fn path_for_hunk() {
606 assert_eq!(super::hunk_relpath(0), "00000/000000000");
607 }
608
609 #[test]
610 fn basic() {
611 let (testdir, mut ib) = setup();
612 let monitor = TestMonitor::arc();
613 ib.append_entries(&mut vec![sample_entry("/apple"), sample_entry("/banana")]);
614 let hunks = ib.finish(monitor.clone()).unwrap();
615 assert_eq!(monitor.get_counter(Counter::IndexWrites), 1);
616
617 assert_eq!(hunks, 1);
618 let counters = monitor.counters();
619 dbg!(&counters);
620 assert!(counters.get(Counter::IndexWriteCompressedBytes) > 30);
621 assert!(counters.get(Counter::IndexWriteCompressedBytes) < 125,);
622 assert!(counters.get(Counter::IndexWriteUncompressedBytes) > 100);
623 assert!(counters.get(Counter::IndexWriteUncompressedBytes) < 250);
624
625 assert!(
626 std::fs::metadata(testdir.path().join("00000").join("000000000"))
627 .unwrap()
628 .is_file(),
629 "Index hunk file not found"
630 );
631
632 let mut it = IndexRead::open_path(testdir.path()).iter_entries();
633 let entry = it.next().expect("Get first entry");
634 assert_eq!(&entry.apath, "/apple");
635 let entry = it.next().expect("Get second entry");
636 assert_eq!(&entry.apath, "/banana");
637 assert!(it.next().is_none(), "Expected no more entries");
638 }
639
640 #[test]
641 fn multiple_hunks() {
642 let (testdir, mut ib) = setup();
643 ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
644 ib.finish_hunk(TestMonitor::arc()).unwrap();
645 ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
646 ib.finish_hunk(TestMonitor::arc()).unwrap();
647
648 let index_read = IndexRead::open_path(testdir.path());
649 let it = index_read.iter_entries();
650 let names: Vec<String> = it.map(|x| x.apath.into()).collect();
651 assert_eq!(names, &["/1.1", "/1.2", "/2.1", "/2.2"]);
652
653 let hunks: Vec<Vec<IndexEntry>> =
655 IndexRead::open_path(testdir.path()).iter_hunks().collect();
656 assert_eq!(hunks.len(), 2);
657 assert_eq!(
658 hunks[0]
659 .iter()
660 .map(|entry| entry.apath())
661 .collect::<Vec<_>>(),
662 vec!["/1.1", "/1.2"]
663 );
664 assert_eq!(
665 hunks[1]
666 .iter()
667 .map(|entry| entry.apath())
668 .collect::<Vec<_>>(),
669 vec!["/2.1", "/2.2"]
670 );
671 }
672
673 #[test]
674 fn iter_hunks_advance_to_after() {
675 let (testdir, mut ib) = setup();
676 ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
677 ib.finish_hunk(TestMonitor::arc()).unwrap();
678 ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
679 ib.finish_hunk(TestMonitor::arc()).unwrap();
680
681 let index_read = IndexRead::open_path(testdir.path());
682 let names: Vec<String> = index_read
683 .iter_hunks()
684 .advance_to_after(&"/".into())
685 .flatten()
686 .map(|entry| entry.apath.into())
687 .collect();
688 assert_eq!(names, ["/1.1", "/1.2", "/2.1", "/2.2"]);
689
690 let names: Vec<String> = index_read
691 .iter_hunks()
692 .advance_to_after(&"/nonexistent".into())
693 .flatten()
694 .map(|entry| entry.apath.into())
695 .collect();
696 assert_eq!(names, [""; 0]);
697
698 let names: Vec<String> = index_read
699 .iter_hunks()
700 .advance_to_after(&"/1.1".into())
701 .flatten()
702 .map(|entry| entry.apath.into())
703 .collect();
704 assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
705
706 let names: Vec<String> = index_read
707 .iter_hunks()
708 .advance_to_after(&"/1.1.1".into())
709 .flatten()
710 .map(|entry| entry.apath.into())
711 .collect();
712 assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
713
714 let names: Vec<String> = index_read
715 .iter_hunks()
716 .advance_to_after(&"/1.2".into())
717 .flatten()
718 .map(|entry| entry.apath.into())
719 .collect();
720 assert_eq!(names, ["/2.1", "/2.2"]);
721
722 let names: Vec<String> = index_read
723 .iter_hunks()
724 .advance_to_after(&"/1.3".into())
725 .flatten()
726 .map(|entry| entry.apath.into())
727 .collect();
728 assert_eq!(names, ["/2.1", "/2.2"]);
729
730 let names: Vec<String> = index_read
731 .iter_hunks()
732 .advance_to_after(&"/2.0".into())
733 .flatten()
734 .map(|entry| entry.apath.into())
735 .collect();
736 assert_eq!(names, ["/2.1", "/2.2"]);
737
738 let names: Vec<String> = index_read
739 .iter_hunks()
740 .advance_to_after(&"/2.1".into())
741 .flatten()
742 .map(|entry| entry.apath.into())
743 .collect();
744 assert_eq!(names, ["/2.2"]);
745
746 let names: Vec<String> = index_read
747 .iter_hunks()
748 .advance_to_after(&"/2.2".into())
749 .flatten()
750 .map(|entry| entry.apath.into())
751 .collect();
752 assert_eq!(names, [] as [&str; 0]);
753 }
754
755 #[test]
756 fn advance() {
757 let (testdir, mut ib) = setup();
758 ib.push_entry(sample_entry("/bar"));
759 ib.push_entry(sample_entry("/foo"));
760 ib.push_entry(sample_entry("/foobar"));
761 ib.finish_hunk(TestMonitor::arc()).unwrap();
762
763 ib.push_entry(sample_entry("/g01"));
765 ib.push_entry(sample_entry("/g02"));
766 ib.push_entry(sample_entry("/g03"));
767 ib.finish_hunk(TestMonitor::arc()).unwrap();
768
769 let mut it = IndexRead::open_path(testdir.path()).iter_entries();
771 assert_eq!(it.advance_to(&Apath::from("/foo")).unwrap().apath, "/foo");
772 assert_eq!(it.next().unwrap().apath, "/foobar");
773 assert_eq!(it.next().unwrap().apath, "/g01");
774
775 let mut it = IndexRead::open_path(testdir.path()).iter_entries();
777 assert_eq!(it.advance_to(&Apath::from("/fxxx")), None);
778 assert_eq!(it.next().unwrap().apath, "/g01");
779 assert_eq!(it.next().unwrap().apath, "/g02");
780
781 let mut it = IndexRead::open_path(testdir.path()).iter_entries();
783 assert_eq!(it.advance_to(&Apath::from("/aaaa")), None);
784 assert_eq!(it.next().unwrap().apath, "/bar");
785 assert_eq!(it.next().unwrap().apath, "/foo");
786
787 let mut it = IndexRead::open_path(testdir.path()).iter_entries();
789 assert_eq!(it.advance_to(&Apath::from("/zz")), None);
790 assert_eq!(it.next(), None);
791 }
792
793 #[test]
797 fn no_final_empty_hunk() -> Result<()> {
798 let (testdir, mut ib) = setup();
799 for i in 0..100_000 {
800 ib.push_entry(sample_entry(&format!("/{i:0>10}")));
801 }
802 ib.finish_hunk(TestMonitor::arc())?;
803 ib.finish_hunk(TestMonitor::arc())?;
805 let read_index = IndexRead::open_path(testdir.path());
806 assert_eq!(read_index.iter_hunks().count(), 1);
807 Ok(())
808 }
809}