use std::collections::VecDeque;
use std::path::Path;
use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
use thiserror::Error;
use time::OffsetDateTime;
use tracing::error;
use super::db::{Db, DbEntry};
use crate::metablock::{
InodeInformation, InodeKind, InodeStat, Lookup, NEVER_EXPIRE_TTL, ROOT_INODE_NO, S3Location, ValidKey,
ValidKeyError, ValidName,
};
use crate::s3::S3Path;
use crate::sync::Arc;
#[derive(Debug, Error)]
pub enum ManifestError {
#[error("failed to read from the metadata store")]
DbError(#[from] rusqlite::Error),
#[error("s3 key from the metadata store is invalid")]
InvalidKey(#[from] ValidKeyError),
#[error("read invalid row with id {0}")]
InvalidRow(u64),
#[error("read invalid channel row with id {0}")]
InvalidChannel(u64),
#[error("invalid checksum for the entry with key {0}, computed {1}, received {2}")]
InvalidChecksum(String, u32, u32),
}
#[derive(Debug, Clone)]
enum ManifestEntryKind {
File { etag: String, size: usize },
Directory,
}
#[derive(Debug, Clone)]
pub struct ManifestEntry {
id: u64,
parent_id: u64,
channel_id: usize,
parent_partial_key: Option<ValidKey>,
name: String,
kind: ManifestEntryKind,
checksum: Crc32c,
}
impl ManifestEntry {
pub fn parent_id(&self) -> u64 {
self.parent_id
}
pub fn into_lookup(self, channels: &[Arc<S3Path>], mount_time: OffsetDateTime) -> Result<Lookup, ManifestError> {
let s3_path = self.channel(channels)?;
let (id, partial_key, entry_kind) = self.validate_checksum(s3_path.as_ref())?;
let stat = Self::stat(entry_kind, mount_time);
let inode_kind = partial_key.kind();
let s3_location = self
.parent_partial_key
.map(move |_| S3Location::new(s3_path, partial_key));
Ok(Lookup::new(id, stat, inode_kind, s3_location))
}
pub fn into_inode_information(
self,
channels: &[Arc<S3Path>],
mount_time: OffsetDateTime,
) -> Result<(InodeInformation, String), ManifestError> {
let s3_path = self.channel(channels)?;
let (id, partial_key, entry_kind) = self.validate_checksum(s3_path.as_ref())?;
let stat = Self::stat(entry_kind, mount_time);
let inode_kind = partial_key.kind();
let name = partial_key.name();
Ok((InodeInformation::new(id, stat, inode_kind), name.to_string()))
}
fn channel(&self, channels: &[Arc<S3Path>]) -> Result<Arc<S3Path>, ManifestError> {
let channel_id = self.channel_id;
if channel_id >= channels.len() {
error!("channel id {} specified in entry {} is invalid", channel_id, self.id);
return Err(ManifestError::InvalidRow(self.id));
}
Ok(channels[channel_id].clone())
}
fn validate_checksum(&self, path: &S3Path) -> Result<(u64, ValidKey, ManifestEntryKind), ManifestError> {
let name = ValidName::parse_str(&self.name).map_err(|_| ManifestError::InvalidRow(self.id))?;
let (partial_key, computed_checksum, _) = match &self.kind {
ManifestEntryKind::File { etag, size } => compute_checksum(
self.id,
self.parent_id,
self.parent_partial_key.as_ref(),
name,
Some(etag),
Some(*size),
path,
)?,
ManifestEntryKind::Directory => compute_checksum(
self.id,
self.parent_id,
self.parent_partial_key.as_ref(),
name,
None,
None,
path,
)?,
};
if computed_checksum != self.checksum {
return Err(ManifestError::InvalidChecksum(
self.name.clone(),
computed_checksum.value(),
self.checksum.value(),
));
}
Ok((self.id, partial_key, self.kind.clone()))
}
fn stat(kind: ManifestEntryKind, mount_time: OffsetDateTime) -> InodeStat {
match kind {
ManifestEntryKind::File { etag, size } => {
InodeStat::for_file(
size,
mount_time,
Some(etag.into()),
None,
None,
NEVER_EXPIRE_TTL,
)
}
ManifestEntryKind::Directory => InodeStat::for_directory(mount_time, NEVER_EXPIRE_TTL),
}
}
}
impl TryFrom<DbEntry> for ManifestEntry {
type Error = ManifestError;
fn try_from(db_entry: DbEntry) -> Result<Self, Self::Error> {
let parent_partial_key = db_entry.parent_partial_key.map(ValidKey::try_from).transpose()?;
if parent_partial_key.is_none() && db_entry.parent_id != ROOT_INODE_NO {
error!("only channel directories may have no parent_key, id: {}", db_entry.id);
return Err(ManifestError::InvalidRow(db_entry.id));
}
let kind = match (db_entry.etag, db_entry.size) {
(None, None) => ManifestEntryKind::Directory,
(Some(etag), Some(size)) => ManifestEntryKind::File { etag, size },
_ => return Err(ManifestError::InvalidRow(db_entry.id)),
};
Ok(ManifestEntry {
id: db_entry.id,
parent_id: db_entry.parent_id,
channel_id: db_entry.channel_id,
parent_partial_key,
name: db_entry.name,
kind,
checksum: Crc32c::new(db_entry.checksum),
})
}
}
pub fn compute_checksum(
id: u64,
parent_id: u64,
parent_partial_key: Option<&ValidKey>,
name: ValidName,
etag: Option<&str>,
size: Option<usize>,
s3_path: &S3Path,
) -> Result<(ValidKey, Crc32c, Crc32c), ValidKeyError> {
let kind = if etag.is_some() && size.is_some() {
InodeKind::File
} else {
InodeKind::Directory
};
let partial_key = if let Some(parent_key) = parent_partial_key {
parent_key.new_child(name, kind)?
} else {
ValidKey::root().new_child(name, kind)?
};
let mut hasher = crc32c::Hasher::new();
hasher.update(partial_key.as_bytes());
if let Some(etag) = etag {
hasher.update(etag.as_bytes());
}
if let Some(size) = size {
let size = size as u64;
hasher.update(size.to_be_bytes().as_ref());
}
let partial_checksum = hasher.clone().finalize();
hasher.update(id.to_be_bytes().as_ref());
hasher.update(parent_id.to_be_bytes().as_ref());
hasher.update(s3_path.bucket.as_bytes());
hasher.update(s3_path.prefix.as_str().as_bytes());
let full_checksum = hasher.finalize();
Ok((partial_key, full_checksum, partial_checksum))
}
#[derive(Debug, Clone)]
pub struct Manifest {
db: Db,
}
impl Manifest {
pub fn new(manifest_db_path: &Path) -> Result<Self, rusqlite::Error> {
let db = Db::new(manifest_db_path)?;
Ok(Self { db })
}
pub fn manifest_lookup(&self, parent_id: u64, name: &str) -> Result<Option<ManifestEntry>, ManifestError> {
let db_entry = self.db.select_entry(parent_id, name)?;
match db_entry {
Some(db_entry) => Ok(Some(db_entry.try_into()?)),
None => Ok(None),
}
}
pub fn manifest_lookup_by_id(&self, ino: u64) -> Result<Option<ManifestEntry>, ManifestError> {
let db_entry = self.db.select_entry_by_id(ino)?;
match db_entry {
Some(db_entry) => Ok(Some(db_entry.try_into()?)),
None => Ok(None),
}
}
pub fn dir_iter(&self, parent_id: u64) -> ManifestDirIter {
ManifestDirIter::new(self.db.clone(), parent_id)
}
pub fn load_channels(&self) -> Result<Vec<S3Path>, ManifestError> {
self.db.load_channels()
}
}
#[derive(Debug)]
pub struct ManifestDirIter {
db: Db,
entries: VecDeque<ManifestEntry>,
parent_id: u64,
next_offset: usize,
batch_size: usize,
finished: bool,
}
impl ManifestDirIter {
fn new(db: Db, parent_id: u64) -> Self {
let batch_size = 10000;
Self {
db,
entries: Default::default(),
parent_id,
next_offset: 0,
batch_size,
finished: false,
}
}
pub fn next_entry(&mut self) -> Result<Option<ManifestEntry>, ManifestError> {
if self.entries.is_empty() && !self.finished {
self.search_next_entries()?;
}
let entry = self.entries.pop_front();
if entry.is_some() {
self.next_offset += 1;
}
Ok(entry)
}
pub fn readd(&mut self, entry: ManifestEntry) {
self.next_offset -= 1;
self.entries.push_front(entry);
}
pub fn seek(&mut self, offset: usize) -> Result<(), ManifestError> {
if offset != self.next_offset {
metrics::counter!("manifest.readdir.out_of_order").increment(1);
self.entries.clear();
self.next_offset = offset;
self.finished = false;
};
Ok(())
}
fn search_next_entries(&mut self) -> Result<(), ManifestError> {
let db_entries = self
.db
.select_children(self.parent_id, self.next_offset, self.batch_size)?;
if db_entries.len() < self.batch_size {
self.finished = true;
}
let manifest_entries: Result<Vec<ManifestEntry>, ManifestError> =
db_entries.into_iter().map(|db_entry| db_entry.try_into()).collect();
self.entries.extend(manifest_entries?);
Ok(())
}
}