use std::cmp::Ordering;
use std::iter::Peekable;
use std::path::Path;
use std::sync::Arc;
use std::vec;
use itertools::Itertools;
use time::OffsetDateTime;
use tracing::{debug, debug_span, error};
use crate::compress::snappy::{Compressor, Decompressor};
use crate::counters::Counter;
use crate::entry::KindMeta;
use crate::monitor::Monitor;
use crate::stats::IndexReadStats;
use crate::transport::local::LocalTransport;
use crate::unix_time::FromUnixAndNanos;
use crate::*;
pub const HUNKS_PER_SUBDIR: u32 = 10_000;
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct IndexEntry {
pub apath: Apath,
pub kind: Kind,
#[serde(default)]
pub mtime: i64,
#[serde(default)]
pub unix_mode: UnixMode,
#[serde(default, flatten, skip_serializing_if = "Owner::is_none")]
pub owner: Owner,
#[serde(default)]
#[serde(skip_serializing_if = "crate::misc::zero_u32")]
pub mtime_nanos: u32,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
pub addrs: Vec<blockdir::Address>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub target: Option<String>,
}
impl From<IndexEntry> for EntryValue {
fn from(index_entry: IndexEntry) -> EntryValue {
let kind_meta = match index_entry.kind {
Kind::File => KindMeta::File {
size: index_entry.addrs.iter().map(|a| a.len).sum(),
},
Kind::Symlink => KindMeta::Symlink {
target: index_entry
.target
.expect("symlink entry should have a target"),
},
Kind::Dir => KindMeta::Dir,
Kind::Unknown => KindMeta::Unknown,
};
EntryValue {
apath: index_entry.apath,
kind_meta,
mtime: OffsetDateTime::from_unix_seconds_and_nanos(
index_entry.mtime,
index_entry.mtime_nanos,
),
unix_mode: index_entry.unix_mode,
owner: index_entry.owner,
}
}
}
impl EntryTrait for IndexEntry {
fn apath(&self) -> &Apath {
&self.apath
}
#[inline]
fn kind(&self) -> Kind {
self.kind
}
#[inline]
fn mtime(&self) -> OffsetDateTime {
OffsetDateTime::from_unix_seconds_and_nanos(self.mtime, self.mtime_nanos)
}
fn size(&self) -> Option<u64> {
Some(self.addrs.iter().map(|a| a.len).sum())
}
#[inline]
fn symlink_target(&self) -> Option<&str> {
self.target.as_deref()
}
fn unix_mode(&self) -> UnixMode {
self.unix_mode
}
fn owner(&self) -> &Owner {
&self.owner
}
}
impl IndexEntry {
pub(crate) fn metadata_from(source: &EntryValue) -> IndexEntry {
let mtime = source.mtime();
assert_eq!(
source.symlink_target().is_some(),
source.kind() == Kind::Symlink
);
IndexEntry {
apath: source.apath().clone(),
kind: source.kind(),
addrs: Vec::new(),
target: source.symlink_target().map(|t| t.to_owned()),
mtime: mtime.unix_timestamp(),
mtime_nanos: mtime.nanosecond(),
unix_mode: source.unix_mode(),
owner: source.owner().to_owned(),
}
}
}
pub struct IndexWriter {
transport: Arc<dyn Transport>,
entries: Vec<IndexEntry>,
sequence: u32,
hunks_written: usize,
check_order: apath::DebugCheckOrder,
compressor: Compressor,
}
impl IndexWriter {
pub fn new(transport: Arc<dyn Transport>) -> IndexWriter {
IndexWriter {
transport,
entries: Vec::new(),
sequence: 0,
hunks_written: 0,
check_order: apath::DebugCheckOrder::new(),
compressor: Compressor::new(),
}
}
pub fn finish(mut self, monitor: Arc<dyn Monitor>) -> Result<usize> {
self.finish_hunk(monitor)?;
Ok(self.hunks_written)
}
pub(crate) fn push_entry(&mut self, entry: IndexEntry) {
self.entries.push(entry);
}
pub(crate) fn append_entries(&mut self, entries: &mut Vec<IndexEntry>) {
self.entries.append(entries);
}
pub fn finish_hunk(&mut self, monitor: Arc<dyn Monitor>) -> Result<()> {
if self.entries.is_empty() {
return Ok(());
}
self.entries.sort_unstable_by(|a, b| {
debug_assert!(a.apath != b.apath);
a.apath.cmp(&b.apath)
});
self.check_order.check(&self.entries[0].apath);
if self.entries.len() > 1 {
self.check_order.check(&self.entries.last().unwrap().apath);
}
let relpath = hunk_relpath(self.sequence);
let json = serde_json::to_vec(&self.entries)?;
if (self.sequence % HUNKS_PER_SUBDIR) == 0 {
self.transport.create_dir(&subdir_relpath(self.sequence))?;
}
let compressed_bytes = self.compressor.compress(&json)?;
self.transport.write_file(&relpath, &compressed_bytes)?;
self.hunks_written += 1;
monitor.count(Counter::IndexWrites, 1);
monitor.count(Counter::IndexWriteCompressedBytes, compressed_bytes.len());
monitor.count(Counter::IndexWriteUncompressedBytes, json.len());
self.entries.clear(); self.sequence += 1;
Ok(())
}
}
fn subdir_relpath(hunk_number: u32) -> String {
format!("{:05}", hunk_number / HUNKS_PER_SUBDIR)
}
#[mutants::skip] fn hunk_relpath(hunk_number: u32) -> String {
format!("{:05}/{:09}", hunk_number / HUNKS_PER_SUBDIR, hunk_number)
}
#[derive(Debug, Clone)]
pub struct IndexRead {
transport: Arc<dyn Transport>,
}
impl IndexRead {
#[allow(unused)]
pub(crate) fn open_path(path: &Path) -> IndexRead {
IndexRead::open(Arc::new(LocalTransport::new(path)))
}
pub(crate) fn open(transport: Arc<dyn Transport>) -> IndexRead {
IndexRead { transport }
}
pub fn iter_entries(self) -> IndexEntryIter<IndexHunkIter> {
IndexEntryIter::new(self.iter_hunks(), Apath::root(), Exclude::nothing())
}
pub fn iter_hunks(&self) -> IndexHunkIter {
let _span = debug_span!("iter_hunks", ?self.transport).entered();
let subdirs = self
.transport
.list_dir("")
.expect("list index dir") .dirs
.into_iter()
.sorted()
.collect_vec();
debug!(?subdirs);
let hunks = subdirs
.into_iter()
.filter_map(|dir| self.transport.list_dir(&dir).ok())
.flat_map(|list| list.files)
.filter_map(|f| f.parse::<u32>().ok())
.sorted()
.collect_vec();
debug!(?hunks);
IndexHunkIter {
hunks: hunks.into_iter(),
transport: Arc::clone(&self.transport),
decompressor: Decompressor::new(),
stats: IndexReadStats::default(),
after: None,
}
}
}
pub struct IndexHunkIter {
hunks: std::vec::IntoIter<u32>,
transport: Arc<dyn Transport>,
decompressor: Decompressor,
pub stats: IndexReadStats,
after: Option<Apath>,
}
impl Iterator for IndexHunkIter {
type Item = Vec<IndexEntry>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let hunk_number = self.hunks.next()?;
let entries = match self.read_next_hunk(hunk_number) {
Ok(None) => return None,
Ok(Some(entries)) => entries,
Err(err) => {
self.stats.errors += 1;
error!("Error reading index hunk {hunk_number:?}: {err}");
continue;
}
};
if let Some(ref after) = self.after {
if let Some(last) = entries.last() {
if last.apath <= *after {
continue;
}
}
if let Some(first) = entries.first() {
if first.apath > *after {
self.after = None; return Some(entries);
}
}
let idx = match entries.binary_search_by_key(&after, |entry| &entry.apath) {
Ok(idx) => idx + 1, Err(idx) => idx, };
return Some(Vec::from(&entries[idx..]));
}
if !entries.is_empty() {
return Some(entries);
}
}
}
}
impl IndexHunkIter {
#[must_use]
pub fn advance_to_after(self, apath: &Apath) -> Self {
IndexHunkIter {
after: Some(apath.clone()),
..self
}
}
fn read_next_hunk(&mut self, hunk_number: u32) -> Result<Option<Vec<IndexEntry>>> {
let path = hunk_relpath(hunk_number);
let compressed_bytes = match self.transport.read_file(&path) {
Ok(b) => b,
Err(err) if err.is_not_found() => {
return Ok(None);
}
Err(source) => return Err(Error::Transport { source }),
};
self.stats.index_hunks += 1;
self.stats.compressed_index_bytes += compressed_bytes.len() as u64;
let index_bytes = self.decompressor.decompress(&compressed_bytes)?;
self.stats.uncompressed_index_bytes += index_bytes.len() as u64;
let entries: Vec<IndexEntry> =
serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson {
path: path.clone(),
source,
})?;
if entries.is_empty() {
}
Ok(Some(entries))
}
}
pub struct IndexEntryIter<HI: Iterator<Item = Vec<IndexEntry>>> {
buffered_entries: Peekable<vec::IntoIter<IndexEntry>>,
hunk_iter: HI,
subtree: Apath,
exclude: Exclude,
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
pub(crate) fn new(hunk_iter: HI, subtree: Apath, exclude: Exclude) -> Self {
IndexEntryIter {
buffered_entries: Vec::<IndexEntry>::new().into_iter().peekable(),
hunk_iter,
subtree,
exclude,
}
}
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> Iterator for IndexEntryIter<HI> {
type Item = IndexEntry;
fn next(&mut self) -> Option<IndexEntry> {
loop {
if let Some(entry) = self.buffered_entries.next() {
if !self.subtree.is_prefix_of(&entry.apath) {
continue;
}
if self.exclude.matches(&entry.apath) {
continue;
}
return Some(entry);
}
if !self.refill_entry_buffer_or_warn() {
return None;
}
}
}
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
pub fn advance_to(&mut self, apath: &Apath) -> Option<IndexEntry> {
loop {
if let Some(cand) = self.buffered_entries.peek() {
match cand.apath.cmp(apath) {
Ordering::Less => {
self.buffered_entries.next().unwrap();
}
Ordering::Equal => {
return Some(self.buffered_entries.next().unwrap());
}
Ordering::Greater => {
return None;
}
}
} else if !self.refill_entry_buffer_or_warn() {
return None;
}
}
}
fn refill_entry_buffer_or_warn(&mut self) -> bool {
assert!(
self.buffered_entries.next().is_none(),
"refill_entry_buffer called with non-empty buffer"
);
if let Some(new_entries) = self.hunk_iter.next() {
self.buffered_entries = new_entries.into_iter().peekable();
true
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use crate::monitor::test::TestMonitor;
use super::*;
fn setup() -> (TempDir, IndexWriter) {
let testdir = TempDir::new().unwrap();
let ib = IndexWriter::new(Arc::new(LocalTransport::new(testdir.path())));
(testdir, ib)
}
fn sample_entry(apath: &str) -> IndexEntry {
IndexEntry {
apath: apath.into(),
mtime: 1_461_736_377,
mtime_nanos: 0,
kind: Kind::File,
addrs: vec![],
target: None,
unix_mode: Default::default(),
owner: Default::default(),
}
}
#[test]
fn serialize_index() {
let entries = [IndexEntry {
apath: "/a/b".into(),
mtime: 1_461_736_377,
mtime_nanos: 0,
kind: Kind::File,
addrs: vec![],
target: None,
unix_mode: Default::default(),
owner: Default::default(),
}];
let index_json = serde_json::to_string(&entries).unwrap();
println!("{index_json}");
assert_eq!(
index_json,
"[{\"apath\":\"/a/b\",\
\"kind\":\"File\",\
\"mtime\":1461736377,\
\"unix_mode\":null}]"
);
}
#[test]
fn index_builder_sorts_entries() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/zzz"));
ib.push_entry(sample_entry("/aaa"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
}
#[test]
#[should_panic]
fn index_builder_checks_names() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("../escapecat"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
}
#[test]
#[cfg(debug_assertions)]
#[should_panic]
fn no_duplicate_paths() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/again"));
ib.push_entry(sample_entry("/again"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
}
#[test]
#[cfg(debug_assertions)]
#[should_panic]
fn no_duplicate_paths_across_hunks() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/again"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
ib.push_entry(sample_entry("/again"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
}
#[test]
fn path_for_hunk() {
assert_eq!(super::hunk_relpath(0), "00000/000000000");
}
#[test]
fn basic() {
let (testdir, mut ib) = setup();
let monitor = TestMonitor::arc();
ib.append_entries(&mut vec![sample_entry("/apple"), sample_entry("/banana")]);
let hunks = ib.finish(monitor.clone()).unwrap();
assert_eq!(monitor.get_counter(Counter::IndexWrites), 1);
assert_eq!(hunks, 1);
let counters = monitor.counters();
dbg!(&counters);
assert!(counters.get(Counter::IndexWriteCompressedBytes) > 30);
assert!(counters.get(Counter::IndexWriteCompressedBytes) < 125,);
assert!(counters.get(Counter::IndexWriteUncompressedBytes) > 100);
assert!(counters.get(Counter::IndexWriteUncompressedBytes) < 250);
assert!(
std::fs::metadata(testdir.path().join("00000").join("000000000"))
.unwrap()
.is_file(),
"Index hunk file not found"
);
let mut it = IndexRead::open_path(testdir.path()).iter_entries();
let entry = it.next().expect("Get first entry");
assert_eq!(&entry.apath, "/apple");
let entry = it.next().expect("Get second entry");
assert_eq!(&entry.apath, "/banana");
assert!(it.next().is_none(), "Expected no more entries");
}
#[test]
fn multiple_hunks() {
let (testdir, mut ib) = setup();
ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
ib.finish_hunk(TestMonitor::arc()).unwrap();
ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
ib.finish_hunk(TestMonitor::arc()).unwrap();
let index_read = IndexRead::open_path(testdir.path());
let it = index_read.iter_entries();
let names: Vec<String> = it.map(|x| x.apath.into()).collect();
assert_eq!(names, &["/1.1", "/1.2", "/2.1", "/2.2"]);
let hunks: Vec<Vec<IndexEntry>> =
IndexRead::open_path(testdir.path()).iter_hunks().collect();
assert_eq!(hunks.len(), 2);
assert_eq!(
hunks[0]
.iter()
.map(|entry| entry.apath())
.collect::<Vec<_>>(),
vec!["/1.1", "/1.2"]
);
assert_eq!(
hunks[1]
.iter()
.map(|entry| entry.apath())
.collect::<Vec<_>>(),
vec!["/2.1", "/2.2"]
);
}
#[test]
fn iter_hunks_advance_to_after() {
let (testdir, mut ib) = setup();
ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
ib.finish_hunk(TestMonitor::arc()).unwrap();
ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
ib.finish_hunk(TestMonitor::arc()).unwrap();
let index_read = IndexRead::open_path(testdir.path());
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.1", "/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/nonexistent".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, [""; 0]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.1.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.2".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.3".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.0".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.2".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, [] as [&str; 0]);
}
#[test]
fn advance() {
let (testdir, mut ib) = setup();
ib.push_entry(sample_entry("/bar"));
ib.push_entry(sample_entry("/foo"));
ib.push_entry(sample_entry("/foobar"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
ib.push_entry(sample_entry("/g01"));
ib.push_entry(sample_entry("/g02"));
ib.push_entry(sample_entry("/g03"));
ib.finish_hunk(TestMonitor::arc()).unwrap();
let mut it = IndexRead::open_path(testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/foo")).unwrap().apath, "/foo");
assert_eq!(it.next().unwrap().apath, "/foobar");
assert_eq!(it.next().unwrap().apath, "/g01");
let mut it = IndexRead::open_path(testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/fxxx")), None);
assert_eq!(it.next().unwrap().apath, "/g01");
assert_eq!(it.next().unwrap().apath, "/g02");
let mut it = IndexRead::open_path(testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/aaaa")), None);
assert_eq!(it.next().unwrap().apath, "/bar");
assert_eq!(it.next().unwrap().apath, "/foo");
let mut it = IndexRead::open_path(testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/zz")), None);
assert_eq!(it.next(), None);
}
#[test]
fn no_final_empty_hunk() -> Result<()> {
let (testdir, mut ib) = setup();
for i in 0..100_000 {
ib.push_entry(sample_entry(&format!("/{i:0>10}")));
}
ib.finish_hunk(TestMonitor::arc())?;
ib.finish_hunk(TestMonitor::arc())?;
let read_index = IndexRead::open_path(testdir.path());
assert_eq!(read_index.iter_hunks().count(), 1);
Ok(())
}
}