use std::collections::{BTreeMap, BTreeSet};
use std::fs::{self, File};
use std::io::prelude::*;
use std::sync::{
Mutex,
atomic::{AtomicU64, Ordering},
mpsc::{Receiver, SyncSender},
};
use anyhow::{Context, Result, anyhow, bail, ensure};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use serde_derive::{Deserialize, Serialize};
use tracing::*;
use crate::backend;
use crate::counters;
use crate::file_util::{check_magic, nice_size};
use crate::hashing::{HashingReader, HashingWriter, ObjectId};
use crate::pack::{PackManifest, PackMetadata};
const MAGIC_BYTES: &[u8] = b"MKBAKIDX1";
pub const WIP_NAME: &str = "backpak-wip.index";
pub type PackMap = BTreeMap<ObjectId, PackManifest>;
#[derive(Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Index {
pub supersedes: BTreeSet<ObjectId>,
pub packs: PackMap,
}
impl Index {
#[inline]
fn is_empty(&self) -> bool {
self.supersedes.is_empty() && self.packs.is_empty()
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum Resumable {
No,
Yes,
}
pub fn index(
resumable: Resumable,
starting_index: Index,
rx: Receiver<PackMetadata>,
to_upload: SyncSender<(String, File)>,
indexed_packs: &AtomicU64,
) -> Result<bool> {
let mut index = starting_index;
let mut persisted = None;
if !index.is_empty() && resumable == Resumable::Yes {
persisted = Some(to_temp_file(&index)?);
}
while let Ok(PackMetadata { id, manifest }) = rx.recv() {
ensure!(
index.packs.insert(id, manifest).is_none(),
"Duplicate pack received: {}",
id
);
indexed_packs.fetch_add(1, Ordering::Relaxed);
if resumable == Resumable::Yes {
persisted = Some(to_temp_file(&index)?);
}
}
if !index.is_empty() && resumable == Resumable::No {
persisted = Some(to_temp_file(&index)?);
}
if let Some((index_id, mut fh)) = persisted {
fh.seek(std::io::SeekFrom::Start(0))?;
let index_name = format!("{}.index", index_id);
let mut renamed = fs::OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&index_name)
.with_context(|| format!("Couldn't open {index_name} to write the final index"))?;
std::io::copy(&mut fh, &mut renamed)?;
renamed.sync_all()?;
drop(fh);
debug!(
"Index {} finished ({})",
index_id,
nice_size(renamed.metadata()?.len())
);
to_upload
.send((index_name, renamed))
.context("indexer -> uploader channel exited early")?;
Ok(true)
} else {
debug!("No new indexes created - nothing changed");
Ok(false)
}
}
fn to_temp_file(index: &Index) -> Result<(ObjectId, File)> {
let mut tf = tempfile::Builder::new()
.prefix("temp-backpak-")
.suffix(".index")
.tempfile_in(".")
.context("Couldn't open temporary index for writing")?;
let id = to_file(tf.as_file_mut(), index)?;
let f = tf
.persist(WIP_NAME)
.with_context(|| format!("Couldn't persist WIP index to {}", WIP_NAME))?;
Ok((id, f))
}
fn to_file(fh: &mut fs::File, index: &Index) -> Result<ObjectId> {
fh.write_all(MAGIC_BYTES)?;
let mut zstd = zstd::stream::write::Encoder::new(fh, 0)?;
zstd.multithread(num_cpus::get_physical() as u32)?;
let mut hasher = HashingWriter::new(zstd);
ciborium::into_writer(index, &mut hasher)?;
let (id, zstd) = hasher.finalize();
let fh = zstd.finish()?;
fh.sync_all()?;
Ok(id)
}
pub fn build_master_index(cached_backend: &backend::CachedBackend) -> Result<Index> {
build_master_index_with_sizes(cached_backend).map(|(mi, _ts)| mi)
}
pub fn build_master_index_with_sizes(
cached_backend: &backend::CachedBackend,
) -> Result<(Index, Vec<u64>)> {
info!("Building a master index");
#[derive(Debug, Default)]
struct Results {
bad_indexes: BTreeSet<ObjectId>,
superseded_indexes: BTreeSet<ObjectId>,
loaded_indexes: BTreeMap<ObjectId, PackMap>,
sizes: Vec<u64>,
}
let shared = Mutex::new(Results::default());
cached_backend
.list_indexes()?
.par_iter()
.try_for_each_with(&shared, |shared, (index_file, index_len)| {
let index_id = backend::id_from_path(index_file)?;
let mut loaded_index = match load(&index_id, cached_backend) {
Ok(l) => l,
Err(e) => {
error!("{:?}", e);
shared.lock().unwrap().bad_indexes.insert(index_id);
return Ok(());
}
};
let mut guard = shared.lock().unwrap();
guard.sizes.push(*index_len);
guard
.superseded_indexes
.append(&mut loaded_index.supersedes);
ensure!(
guard
.loaded_indexes
.insert(index_id, loaded_index.packs)
.is_none(),
"Duplicate index {} read from backend!",
index_file
);
Ok(())
})?;
let mut shared = shared.into_inner().unwrap();
if !shared.bad_indexes.is_empty() {
bail!(
"Errors loading indexes {:?}. Consider running backpak rebuild-index.",
shared.bad_indexes
);
}
for superseded in &shared.superseded_indexes {
if shared.loaded_indexes.remove(superseded).is_some() {
debug!("Index {} is superseded and can be deleted.", superseded);
}
}
let mut master_pack_map = BTreeMap::new();
for index in shared.loaded_indexes.values_mut() {
master_pack_map.append(index);
}
Ok((
Index {
supersedes: shared.superseded_indexes,
packs: master_pack_map,
},
shared.sizes,
))
}
pub type BlobMap = FxHashMap<ObjectId, ObjectId>;
pub fn blob_to_pack_map(index: &Index) -> Result<BlobMap> {
debug!("Building a blob -> pack map");
let mut mapping = FxHashMap::default();
for (pack_id, manifest) in &index.packs {
for blob in manifest {
if let Some(other_pack) = mapping.insert(blob.id, *pack_id) {
bail!(
"Duplicate blob {} in pack {}, previously seen in pack {}",
blob.id,
pack_id,
other_pack
);
}
}
}
Ok(mapping)
}
pub fn blob_id_set(index: &Index) -> Result<FxHashSet<ObjectId>> {
debug!("Building a set of all blob IDs");
let mut blobs = FxHashSet::default();
for (pack_id, manifest) in &index.packs {
for blob in manifest {
if !blobs.insert(blob.id) {
bail!("Duplicate blob {} in pack {}", blob.id, pack_id);
}
}
}
Ok(blobs)
}
pub fn blob_to_size_map(index: &Index) -> Result<FxHashMap<ObjectId, u32>> {
debug!("Mapping blobs IDs to their size");
let mut size_map = FxHashMap::default();
for (pack_id, manifest) in &index.packs {
for blob in manifest {
if size_map.insert(blob.id, blob.length).is_some() {
bail!("Duplicate blob {} in pack {}", blob.id, pack_id);
}
}
}
Ok(size_map)
}
fn from_reader<R: Read>(r: &mut R) -> Result<(Index, ObjectId)> {
check_magic(r, MAGIC_BYTES).context("Wrong magic bytes for index file")?;
let decoder =
zstd::stream::read::Decoder::new(r).context("Decompression of index file failed")?;
let mut hasher = HashingReader::new(decoder);
let index = ciborium::from_reader(&mut hasher).context("CBOR decoding of index file failed")?;
let (id, _) = hasher.finalize();
Ok((index, id))
}
pub fn read_wip() -> Result<Option<Index>> {
let mut fd = match File::open(WIP_NAME) {
Ok(w) => w,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(None);
}
let e = anyhow!(e).context(format!("Couldn't open {WIP_NAME}"));
return Err(e);
}
};
let (index, _) = from_reader(&mut fd)?;
Ok(Some(index))
}
pub fn load(id: &ObjectId, cached_backend: &backend::CachedBackend) -> Result<Index> {
let (index, calculated_id) = from_reader(&mut cached_backend.read_index(id)?)
.with_context(|| format!("Couldn't load index {}", id))?;
ensure!(
*id == calculated_id,
"Index {}'s contents changed! Now hashes to {}",
id,
calculated_id
);
counters::bump(counters::Op::IndexLoad);
Ok(index)
}
#[cfg(test)]
mod test {
use super::*;
use tempfile::tempfile;
use crate::blob;
use crate::pack::PackManifestEntry;
fn build_test_index() -> Index {
let mut supersedes = BTreeSet::new();
supersedes.insert(ObjectId::hash(b"Some previous index"));
supersedes.insert(ObjectId::hash(b"Another previous index"));
let mut packs = BTreeMap::new();
packs.insert(
ObjectId::hash(b"pack o' chunks"),
vec![
PackManifestEntry {
blob_type: blob::Type::Chunk,
length: 42,
id: ObjectId::hash(b"a chunk"),
},
PackManifestEntry {
blob_type: blob::Type::Chunk,
length: 9001,
id: ObjectId::hash(b"another chunk"),
},
],
);
packs.insert(
ObjectId::hash(b"pack o'trees"),
vec![
PackManifestEntry {
blob_type: blob::Type::Tree,
length: 182,
id: ObjectId::hash(b"first tree"),
},
PackManifestEntry {
blob_type: blob::Type::Tree,
length: 22,
id: ObjectId::hash(b"second tree"),
},
PackManifestEntry {
blob_type: blob::Type::Tree,
length: 11,
id: ObjectId::hash(b"third tree"),
},
],
);
Index { supersedes, packs }
}
#[test]
fn stability() -> Result<()> {
let index = build_test_index();
let mut index_cbor = Vec::new();
ciborium::into_writer(&index, &mut index_cbor)?;
let id = ObjectId::hash(&index_cbor);
assert_eq!(
format!("{}", id),
"e3vr9p4gmumq8i1dafgum50iirupu6ahk9fn6c781b09a"
);
let from_example = fs::read("tests/references/index.stability")?;
assert_eq!(index_cbor, from_example);
Ok(())
}
#[test]
fn round_trip() -> Result<()> {
let index = build_test_index();
let mut fh = tempfile()?;
let written_id = to_file(&mut fh, &index)?;
fh.seek(std::io::SeekFrom::Start(0))?;
let (read_index, read_id) = from_reader(&mut fh)?;
assert_eq!(index, read_index);
assert_eq!(written_id, read_id);
Ok(())
}
}