use super::{
config,
key::{self, PublicKey},
Container,
};
use crate::{npk::npk::Npk as NpkNpk, runtime::ipc::RawFdExt};
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use futures::{future::try_join_all, FutureExt};
use log::{debug, info, warn};
use mpsc::Receiver;
use nanoid::nanoid;
use std::{
collections::HashMap,
fmt,
future::ready,
io::{BufReader, SeekFrom},
os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd},
path::{Path, PathBuf},
};
use tokio::{
fs::{self},
io::{AsyncSeekExt, AsyncWriteExt},
sync::mpsc,
task,
time::Instant,
};
pub(super) type Npk = NpkNpk<BufReader<std::fs::File>>;
#[async_trait::async_trait]
pub(super) trait Repository: fmt::Debug {
async fn insert(&mut self, rx: &mut Receiver<Bytes>) -> Result<Container>;
async fn remove(&mut self, container: &Container) -> Result<()>;
fn get(&self, container: &Container) -> Option<&Npk>;
fn key(&self) -> Option<&PublicKey>;
fn containers(&self) -> Vec<&Npk>;
}
#[derive(Debug)]
pub(super) struct DirRepository {
dir: PathBuf,
key: Option<PublicKey>,
containers: HashMap<Container, (PathBuf, Npk, u64)>,
capacity_num: Option<u32>,
capacity_size: Option<u64>,
}
impl DirRepository {
pub async fn new(dir: &Path, configuration: &config::Repository) -> Result<DirRepository> {
let mut containers = HashMap::new();
let key = if let Some(ref key) = configuration.key {
info!(
"Loading repository {} with key {}",
dir.display(),
dir.display()
);
Some(key::load(key).await.context("failed to load key")?)
} else {
info!("Loading repository {} (unverified)", dir.display());
None
};
let mut readir = fs::read_dir(&dir)
.await
.with_context(|| format!("failed to read dir {}", dir.display()))?;
let start = Instant::now();
let mut tasks = Vec::new();
while let Ok(Some(entry)) = readir.next_entry().await {
let file = entry.path();
let load_task = task::spawn_blocking(move || {
debug!(
"Loading {}{}",
file.display(),
if key.is_some() { " [verified]" } else { "" }
);
let reader = std::fs::File::open(&file)
.with_context(|| format!("failed to open {}", file.display()))?;
let size = file.metadata()?.len();
let reader = std::io::BufReader::new(reader);
let npk = NpkNpk::from_reader(reader, key.as_ref())
.with_context(|| format!("failed to read npk {}", file.display()))?;
let name = npk.manifest().name.clone();
let version = npk.manifest().version.clone();
let container = Container::new(name, version);
Result::<_, anyhow::Error>::Ok((container, (file, npk, size)))
})
.then(|r| ready(r.expect("Task error")));
tasks.push(load_task);
}
for result in try_join_all(tasks).await? {
let (container, (file, npk, size)) = result;
containers.insert(container, (file, npk, size));
}
let duration = start.elapsed();
info!(
"Loaded {} containers from {} in {:.03}s",
containers.len(),
dir.display(),
duration.as_secs_f32(),
);
Ok(DirRepository {
dir: dir.to_owned(),
key,
containers,
capacity_num: configuration.capacity_num,
capacity_size: configuration.capacity_size,
})
}
fn size(&self) -> u64 {
self.containers.values().map(|(_, _, size)| size).sum()
}
}
#[async_trait::async_trait]
impl Repository for DirRepository {
async fn insert(&mut self, rx: &mut Receiver<Bytes>) -> Result<Container> {
let current_size_sum = self.size();
if let Some(num) = self.capacity_num {
if self.containers.len() >= num as usize {
bail!("max number of container reached");
}
}
if let Some(size) = self.capacity_size {
if current_size_sum >= size {
bail!("size limit reached");
}
}
let dest = self.dir.join(format!("{}.npk", nanoid!()));
let mut file = fs::File::create(&dest)
.await
.with_context(|| format!("failed create repository {}", dest.display()))?;
let mut written = 0;
while let Some(r) = rx.recv().await {
match file.write_all(&r).await {
Ok(_) => {
written += r.len() as u64;
if let Some(size) = self.capacity_size {
if written + current_size_sum > size as u64 {
drop(file);
fs::remove_file(&dest)
.await
.with_context(|| format!("failed to remove {}", dest.display()))?;
bail!("size limit reached");
}
}
}
Err(e) => {
drop(file);
fs::remove_file(&dest)
.await
.with_context(|| format!("failed to remove {}", dest.display()))?;
return Err(e.into());
}
}
}
file.flush().await.context("failed to flush npk")?;
drop(file);
debug!("Loading temporary npk {}", dest.display());
let npk = match Npk::from_path(dest.as_path(), self.key.as_ref())
.with_context(|| format!("failed to read npk {}", dest.display()))
{
Ok(n) => Ok(n),
Err(e) => {
fs::remove_file(&dest)
.await
.with_context(|| format!("failed to remove {}", dest.display()))?;
Err(e)
}
}?;
let container = npk.manifest().container();
info!("Loaded {} from {}", container, dest.display());
if self.containers.contains_key(&container) {
warn!("Container {} is already present in repository", container);
fs::remove_file(&dest)
.await
.with_context(|| format!("failed to remove {}", dest.display()))?;
bail!("{} already in {}", container, self.dir.display())
} else {
let old = dest;
let new = self.dir.join(format!("{}.npk", container));
debug!("Moving {} to {}", old.display(), new.display());
fs::rename(&old, &new)
.await
.context("Rename file in repository")?;
self.containers
.insert(container.clone(), (new, npk, written));
Ok(container)
}
}
async fn remove(&mut self, container: &Container) -> Result<()> {
let (path, npk, _) = self
.containers
.remove(container)
.expect("Container not found");
debug!("Removing {}", path.display());
drop(npk);
fs::remove_file(&path)
.await
.with_context(|| format!("failed to remove {}", path.display()))?;
Ok(())
}
fn get(&self, container: &Container) -> Option<&Npk> {
self.containers.get(container).map(|(_, npk, _)| npk)
}
fn key(&self) -> Option<&PublicKey> {
self.key.as_ref()
}
fn containers(&self) -> Vec<&Npk> {
self.containers.values().map(|(_, npk, _)| npk).collect()
}
}
#[derive(Debug)]
pub(super) struct MemRepository {
key: Option<PublicKey>,
containers: HashMap<Container, (Npk, u64)>,
capacity_num: Option<u32>,
capacity_size: Option<u64>,
}
impl MemRepository {
pub async fn new(configuration: &config::Repository) -> Result<MemRepository> {
let key = if let Some(ref key) = configuration.key {
info!("Loading memory repository with key {}", key.display());
Some(key::load(key).await.context("failed to load key")?)
} else {
info!("Loading repository (unverified)");
None
};
Ok(MemRepository {
key,
containers: HashMap::new(),
capacity_num: configuration.capacity_num,
capacity_size: configuration.capacity_size,
})
}
}
#[async_trait::async_trait]
impl Repository for MemRepository {
async fn insert(&mut self, rx: &mut Receiver<Bytes>) -> Result<Container> {
if let Some(num) = self.capacity_num {
if self.containers.len() >= num as usize {
bail!("max number of container reached");
}
}
let opts = memfd::MemfdOptions::default().allow_sealing(true);
let fd = opts.create(nanoid!()).context("failed to create memfd")?;
let mut file = unsafe { fs::File::from_raw_fd(fd.as_raw_fd()) };
file.set_nonblocking(true)
.context("failed to set nonblocking")?;
while let Some(r) = rx.recv().await {
file.write_all(&r).await.context("failed stream npk")?;
}
file.seek(SeekFrom::Start(0)).await.context("failed seek")?;
let npk_size = file.metadata().await?.len();
if let Some(size) = self.capacity_size {
if self.containers.values().map(|a| a.1).sum::<u64>() + npk_size >= size {
bail!("repository capacity limit reached");
}
}
let seals = memfd::SealsHashSet::from_iter([
memfd::FileSeal::SealGrow,
memfd::FileSeal::SealShrink,
memfd::FileSeal::SealWrite,
]);
fd.add_seals(&seals)
.and_then(|_| fd.add_seal(memfd::FileSeal::SealSeal))
.context("failed to add memfd seals")?;
fd.into_raw_fd();
file.set_nonblocking(false)
.context("failed to set blocking")?;
let file = BufReader::new(file.into_std().await);
debug!("Loading memfd as npk");
let npk = NpkNpk::from_reader(file, self.key.as_ref()).context("failed to read npk")?;
let container = npk.manifest().container();
info!("Loaded {} from memfd", container);
if self.containers.contains_key(&container) {
warn!(
"Container {} is already present in repository. Dropping...",
container
);
bail!("{} already in repository", container)
} else {
self.containers.insert(container.clone(), (npk, npk_size));
Ok(container)
}
}
async fn remove(&mut self, container: &Container) -> Result<()> {
debug_assert!(self.containers.contains_key(container));
self.containers.remove(container);
Ok(())
}
fn get(&self, container: &Container) -> Option<&Npk> {
self.containers.get(container).map(|a| &a.0)
}
fn containers(&self) -> Vec<&Npk> {
self.containers.values().map(|a| &a.0).collect()
}
fn key(&self) -> Option<&PublicKey> {
self.key.as_ref()
}
}