use std::cmp::Ordering;
use std::io;
use std::iter::Peekable;
use std::path::Path;
use std::vec;
use crate::compress::snappy::{Compressor, Decompressor};
use crate::kind::Kind;
use crate::stats::{IndexReadStats, IndexWriterStats};
use crate::transport::local::LocalTransport;
use crate::transport::Transport;
use crate::unix_time::UnixTime;
use crate::*;
pub const MAX_ENTRIES_PER_HUNK: usize = 1000;
pub const HUNKS_PER_SUBDIR: u32 = 10_000;
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct IndexEntry {
pub apath: Apath,
pub kind: Kind,
#[serde(default)]
pub mtime: i64,
#[serde(default)]
#[serde(skip_serializing_if = "crate::misc::zero_u32")]
pub mtime_nanos: u32,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
pub addrs: Vec<blockdir::Address>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub target: Option<String>,
}
impl Entry for IndexEntry {
fn apath(&self) -> &Apath {
&self.apath
}
#[inline]
fn kind(&self) -> Kind {
self.kind
}
#[inline]
fn mtime(&self) -> UnixTime {
UnixTime {
secs: self.mtime,
nanosecs: self.mtime_nanos,
}
}
fn size(&self) -> Option<u64> {
Some(self.addrs.iter().map(|a| a.len).sum())
}
#[inline]
fn symlink_target(&self) -> &Option<String> {
&self.target
}
}
impl IndexEntry {
pub(crate) fn metadata_from<E: Entry>(source: &E) -> IndexEntry {
let mtime = source.mtime();
assert_eq!(
source.symlink_target().is_some(),
source.kind() == Kind::Symlink
);
IndexEntry {
apath: source.apath().clone(),
kind: source.kind(),
addrs: Vec::new(),
target: source.symlink_target().clone(),
mtime: mtime.secs,
mtime_nanos: mtime.nanosecs,
}
}
}
pub struct IndexWriter {
transport: Box<dyn Transport>,
entries: Vec<IndexEntry>,
sequence: u32,
check_order: apath::CheckOrder,
pub stats: IndexWriterStats,
compressor: Compressor,
}
impl IndexWriter {
pub fn new(transport: Box<dyn Transport>) -> IndexWriter {
IndexWriter {
transport,
entries: Vec::<IndexEntry>::with_capacity(MAX_ENTRIES_PER_HUNK),
sequence: 0,
check_order: apath::CheckOrder::new(),
stats: IndexWriterStats::default(),
compressor: Compressor::new(),
}
}
pub fn finish(mut self) -> Result<IndexWriterStats> {
self.finish_hunk()?;
Ok(self.stats)
}
pub(crate) fn push_entry(&mut self, entry: IndexEntry) {
self.entries.push(entry);
}
pub(crate) fn append_entries(&mut self, entries: &mut Vec<IndexEntry>) {
self.entries.append(entries);
}
pub fn finish_hunk(&mut self) -> Result<()> {
if self.entries.is_empty() {
return Ok(());
}
self.entries.sort_by(|a, b| {
debug_assert!(a.apath != b.apath);
a.apath.cmp(&b.apath)
});
self.check_order.check(&self.entries[0].apath);
if self.entries.len() > 1 {
self.check_order.check(&self.entries.last().unwrap().apath);
}
let relpath = hunk_relpath(self.sequence);
let write_error = |source| Error::WriteIndex {
path: relpath.clone(),
source,
};
let json =
serde_json::to_vec(&self.entries).map_err(|source| Error::SerializeIndex { source })?;
if (self.sequence % HUNKS_PER_SUBDIR) == 0 {
self.transport
.create_dir(&subdir_relpath(self.sequence))
.map_err(write_error)?;
}
let compressed_bytes = self.compressor.compress(&json)?;
self.transport
.write_file(&relpath, compressed_bytes)
.map_err(write_error)?;
self.stats.index_hunks += 1;
self.stats.compressed_index_bytes += compressed_bytes.len() as u64;
self.stats.uncompressed_index_bytes += json.len() as u64;
self.entries.clear();
self.sequence += 1;
Ok(())
}
}
fn subdir_relpath(hunk_number: u32) -> String {
format!("{:05}", hunk_number / HUNKS_PER_SUBDIR)
}
fn hunk_relpath(hunk_number: u32) -> String {
format!("{:05}/{:09}", hunk_number / HUNKS_PER_SUBDIR, hunk_number)
}
#[derive(Debug, Clone)]
pub struct IndexRead {
transport: Box<dyn Transport>,
}
impl IndexRead {
#[allow(unused)]
pub(crate) fn open_path(path: &Path) -> IndexRead {
IndexRead::open(Box::new(LocalTransport::new(path)))
}
pub(crate) fn open(transport: Box<dyn Transport>) -> IndexRead {
IndexRead { transport }
}
pub fn count_hunks(&self) -> Result<u32> {
for i in 0.. {
let path = hunk_relpath(i);
if !self
.transport
.exists(&path)
.map_err(|source| Error::ReadIndex { source, path })?
{
return Ok(i);
}
}
unreachable!();
}
pub fn estimate_entry_count(&self) -> Result<u64> {
Ok(u64::from(self.count_hunks()?) * (MAX_ENTRIES_PER_HUNK as u64))
}
pub fn iter_entries(self) -> IndexEntryIter<IndexHunkIter> {
IndexEntryIter::new(self.iter_hunks())
}
pub fn iter_hunks(&self) -> IndexHunkIter {
IndexHunkIter {
next_hunk_number: 0,
transport: self.transport.box_clone(),
decompressor: Decompressor::new(),
compressed_buf: Vec::new(),
stats: IndexReadStats::default(),
after: None,
}
}
}
pub struct IndexHunkIter {
next_hunk_number: u32,
transport: Box<dyn Transport>,
decompressor: Decompressor,
compressed_buf: Vec<u8>,
pub stats: IndexReadStats,
after: Option<Apath>,
}
impl Iterator for IndexHunkIter {
type Item = Vec<IndexEntry>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let hunk_number = self.next_hunk_number;
let entries = match self.read_next_hunk() {
Ok(None) => return None,
Ok(Some(entries)) => entries,
Err(err) => {
self.stats.errors += 1;
ui::problem(&format!(
"Error reading index hunk {:?}: {:?} ",
hunk_number, err
));
continue;
}
};
if let Some(ref after) = self.after {
if let Some(last) = entries.last() {
if last.apath <= *after {
continue;
}
}
if let Some(first) = entries.first() {
if first.apath > *after {
self.after = None;
return Some(entries);
}
}
let idx = match entries.binary_search_by_key(&after, |entry| &entry.apath) {
Ok(idx) => idx + 1,
Err(idx) => idx,
};
return Some(Vec::from(&entries[idx..]));
}
if !entries.is_empty() {
return Some(entries);
}
}
}
}
impl IndexHunkIter {
pub fn advance_to_after(self, apath: &Apath) -> Self {
IndexHunkIter {
after: Some(apath.clone()),
..self
}
}
fn read_next_hunk(&mut self) -> Result<Option<Vec<IndexEntry>>> {
let path = &hunk_relpath(self.next_hunk_number);
self.next_hunk_number += 1;
if let Err(err) = self.transport.read_file(&path, &mut self.compressed_buf) {
if err.kind() == io::ErrorKind::NotFound {
return Ok(None);
} else {
return Err(Error::ReadIndex {
path: path.clone(),
source: err,
});
}
}
self.stats.index_hunks += 1;
self.stats.compressed_index_bytes += self.compressed_buf.len() as u64;
let index_bytes = self.decompressor.decompress(&self.compressed_buf)?;
self.stats.uncompressed_index_bytes += index_bytes.len() as u64;
let entries: Vec<IndexEntry> =
serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeIndex {
path: path.clone(),
source,
})?;
if entries.is_empty() {
}
Ok(Some(entries))
}
}
pub struct IndexEntryIter<HI: Iterator<Item = Vec<IndexEntry>>> {
buffered_entries: Peekable<vec::IntoIter<IndexEntry>>,
hunk_iter: HI,
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
pub(crate) fn new(hunk_iter: HI) -> Self {
IndexEntryIter {
buffered_entries: Vec::<IndexEntry>::new().into_iter().peekable(),
hunk_iter,
}
}
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> Iterator for IndexEntryIter<HI> {
type Item = IndexEntry;
fn next(&mut self) -> Option<IndexEntry> {
loop {
if let Some(entry) = self.buffered_entries.next() {
return Some(entry);
}
if !self.refill_entry_buffer_or_warn() {
return None;
}
}
}
}
impl<HI: Iterator<Item = Vec<IndexEntry>>> IndexEntryIter<HI> {
pub fn advance_to(&mut self, apath: &Apath) -> Option<IndexEntry> {
loop {
if let Some(cand) = self.buffered_entries.peek() {
match cand.apath.cmp(apath) {
Ordering::Less => {
self.buffered_entries.next().unwrap();
}
Ordering::Equal => {
return Some(self.buffered_entries.next().unwrap());
}
Ordering::Greater => {
return None;
}
}
} else if !self.refill_entry_buffer_or_warn() {
return None;
}
}
}
fn refill_entry_buffer_or_warn(&mut self) -> bool {
assert!(
self.buffered_entries.next().is_none(),
"refill_entry_buffer called with non-empty buffer"
);
if let Some(new_entries) = self.hunk_iter.next() {
self.buffered_entries = new_entries.into_iter().peekable();
true
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::transport::local::LocalTransport;
use super::*;
fn setup() -> (TempDir, IndexWriter) {
let testdir = TempDir::new().unwrap();
let ib = IndexWriter::new(Box::new(LocalTransport::new(testdir.path())));
(testdir, ib)
}
fn sample_entry(apath: &str) -> IndexEntry {
IndexEntry {
apath: apath.into(),
mtime: 1_461_736_377,
mtime_nanos: 0,
kind: Kind::File,
addrs: vec![],
target: None,
}
}
#[test]
fn serialize_index() {
let entries = [IndexEntry {
apath: "/a/b".into(),
mtime: 1_461_736_377,
mtime_nanos: 0,
kind: Kind::File,
addrs: vec![],
target: None,
}];
let index_json = serde_json::to_string(&entries).unwrap();
println!("{}", index_json);
assert_eq!(
index_json,
"[{\"apath\":\"/a/b\",\
\"kind\":\"File\",\
\"mtime\":1461736377}]"
);
}
#[test]
fn index_builder_sorts_entries() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/zzz"));
ib.push_entry(sample_entry("/aaa"));
ib.finish_hunk().unwrap();
}
#[test]
#[should_panic]
fn index_builder_checks_names() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("../escapecat"));
ib.finish_hunk().unwrap();
}
#[test]
#[should_panic]
fn no_duplicate_paths() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/again"));
ib.push_entry(sample_entry("/again"));
ib.finish_hunk().unwrap();
}
#[test]
#[should_panic]
fn no_duplicate_paths_across_hunks() {
let (_testdir, mut ib) = setup();
ib.push_entry(sample_entry("/again"));
ib.finish_hunk().unwrap();
ib.push_entry(sample_entry("/again"));
ib.finish_hunk().unwrap();
}
#[test]
fn path_for_hunk() {
assert_eq!(super::hunk_relpath(0), "00000/000000000");
}
#[test]
fn basic() {
let (testdir, mut ib) = setup();
ib.append_entries(&mut vec![sample_entry("/apple"), sample_entry("/banana")]);
let stats = ib.finish().unwrap();
assert_eq!(stats.index_hunks, 1);
assert!(stats.compressed_index_bytes > 30);
assert!(
stats.compressed_index_bytes < 70,
"expected shorter compressed index: {}",
stats.compressed_index_bytes
);
assert!(stats.uncompressed_index_bytes > 100);
assert!(stats.uncompressed_index_bytes < 200);
assert!(
std::fs::metadata(testdir.path().join("00000").join("000000000"))
.unwrap()
.is_file(),
"Index hunk file not found"
);
let mut it = IndexRead::open_path(&testdir.path()).iter_entries();
let entry = it.next().expect("Get first entry");
assert_eq!(&entry.apath, "/apple");
let entry = it.next().expect("Get second entry");
assert_eq!(&entry.apath, "/banana");
assert!(it.next().is_none(), "Expected no more entries");
}
#[test]
fn multiple_hunks() {
let (testdir, mut ib) = setup();
ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
ib.finish_hunk().unwrap();
ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
ib.finish_hunk().unwrap();
let index_read = IndexRead::open_path(&testdir.path());
let it = index_read.iter_entries();
let names: Vec<String> = it.map(|x| x.apath.into()).collect();
assert_eq!(names, &["/1.1", "/1.2", "/2.1", "/2.2"]);
let hunks: Vec<Vec<IndexEntry>> =
IndexRead::open_path(&testdir.path()).iter_hunks().collect();
assert_eq!(hunks.len(), 2);
assert_eq!(
hunks[0]
.iter()
.map(|entry| entry.apath())
.collect::<Vec<_>>(),
vec!["/1.1", "/1.2"]
);
assert_eq!(
hunks[1]
.iter()
.map(|entry| entry.apath())
.collect::<Vec<_>>(),
vec!["/2.1", "/2.2"]
);
}
#[test]
fn iter_hunks_advance_to_after() {
let (testdir, mut ib) = setup();
ib.append_entries(&mut vec![sample_entry("/1.1"), sample_entry("/1.2")]);
ib.finish_hunk().unwrap();
ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]);
ib.finish_hunk().unwrap();
let index_read = IndexRead::open_path(&testdir.path());
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.1", "/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/subdir".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert!(names.is_empty());
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.1.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/1.2", "/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.2".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/1.3".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.0".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.1", "/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.1".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, ["/2.2"]);
let names: Vec<String> = index_read
.iter_hunks()
.advance_to_after(&"/2.2".into())
.flatten()
.map(|entry| entry.apath.into())
.collect();
assert_eq!(names, [] as [&str; 0]);
}
#[test]
fn advance() {
let (testdir, mut ib) = setup();
ib.push_entry(sample_entry("/bar"));
ib.push_entry(sample_entry("/foo"));
ib.push_entry(sample_entry("/foobar"));
ib.finish_hunk().unwrap();
ib.push_entry(sample_entry("/g01"));
ib.push_entry(sample_entry("/g02"));
ib.push_entry(sample_entry("/g03"));
ib.finish_hunk().unwrap();
let mut it = IndexRead::open_path(&testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/foo")).unwrap().apath, "/foo");
assert_eq!(it.next().unwrap().apath, "/foobar");
assert_eq!(it.next().unwrap().apath, "/g01");
let mut it = IndexRead::open_path(&testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/fxxx")), None);
assert_eq!(it.next().unwrap().apath, "/g01");
assert_eq!(it.next().unwrap().apath, "/g02");
let mut it = IndexRead::open_path(&testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/aaaa")), None);
assert_eq!(it.next().unwrap().apath, "/bar");
assert_eq!(it.next().unwrap().apath, "/foo");
let mut it = IndexRead::open_path(&testdir.path()).iter_entries();
assert_eq!(it.advance_to(&Apath::from("/zz")), None);
assert_eq!(it.next(), None);
}
#[test]
fn no_final_empty_hunk() -> Result<()> {
let (testdir, mut ib) = setup();
for i in 0..MAX_ENTRIES_PER_HUNK {
ib.push_entry(sample_entry(&format!("/{:0>10}", i)));
}
ib.finish_hunk()?;
ib.finish_hunk()?;
let read_index = IndexRead::open_path(&testdir.path());
assert_eq!(read_index.count_hunks()?, 1);
Ok(())
}
}