#![deny(clippy::all)]
mod app;
mod error;
mod hyperext;
mod lfs;
mod logger;
mod lru;
mod sha256;
mod storage;
mod util;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use futures::future::{self, Future, TryFutureExt};
use hyper::{
self,
server::conn::{AddrIncoming, AddrStream},
service::make_service_fn,
};
use crate::app::App;
use crate::error::Error;
use crate::logger::Logger;
use crate::storage::{Cached, Disk, Encrypted, Retrying, Storage, Verify, S3};
#[cfg(feature = "faulty")]
use crate::storage::Faulty;
pub trait Server: Future<Output = hyper::Result<()>> {
fn addr(&self) -> SocketAddr;
}
impl<S, E> Server for hyper::Server<AddrIncoming, S, E>
where
hyper::Server<AddrIncoming, S, E>: Future<Output = hyper::Result<()>>,
{
fn addr(&self) -> SocketAddr {
self.local_addr()
}
}
#[derive(Debug)]
pub struct Cache {
dir: PathBuf,
max_size: u64,
}
impl Cache {
pub fn new(dir: PathBuf, max_size: u64) -> Self {
Self { dir, max_size }
}
}
#[derive(Debug)]
pub struct S3ServerBuilder {
bucket: String,
key: Option<[u8; 32]>,
prefix: Option<String>,
cdn: Option<String>,
cache: Option<Cache>,
}
impl S3ServerBuilder {
pub fn new(bucket: String) -> Self {
Self {
bucket,
prefix: None,
cdn: None,
key: None,
cache: None,
}
}
pub fn bucket(&mut self, bucket: String) -> &mut Self {
self.bucket = bucket;
self
}
pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
self.key = Some(key);
self
}
pub fn prefix(&mut self, prefix: String) -> &mut Self {
self.prefix = Some(prefix);
self
}
pub fn cdn(&mut self, url: String) -> &mut Self {
self.cdn = Some(url);
self
}
pub fn cache(&mut self, cache: Cache) -> &mut Self {
self.cache = Some(cache);
self
}
pub async fn spawn(
mut self,
addr: SocketAddr,
) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
{
let prefix = self.prefix.unwrap_or_else(|| String::from("lfs"));
if self.cdn.is_some() {
tracing::warn!(
"A CDN was specified. Since uploads and downloads do not flow \
through Rudolfs in this case, they will *not* be encrypted."
);
if self.cache.take().is_some() {
tracing::warn!(
"A local disk cache does not work with a CDN and will be \
disabled."
);
}
}
let s3 = S3::new(self.bucket, prefix, self.cdn)
.map_err(Error::from)
.await?;
let s3 = Retrying::new(s3);
#[cfg(feature = "faulty")]
let s3 = Faulty::new(s3);
match self.cache {
Some(cache) => {
let disk = Disk::new(cache.dir).map_err(Error::from).await?;
#[cfg(feature = "faulty")]
let disk = Faulty::new(disk);
let cache = Cached::new(cache.max_size, disk, s3).await?;
match self.key {
Some(key) => {
let storage = Verify::new(Encrypted::new(key, cache));
Ok(Box::new(spawn_server(storage, &addr)))
}
None => {
let storage = Verify::new(cache);
Ok(Box::new(spawn_server(storage, &addr)))
}
}
}
None => match self.key {
Some(key) => {
let storage = Verify::new(Encrypted::new(key, s3));
Ok(Box::new(spawn_server(storage, &addr)))
}
None => {
let storage = Verify::new(s3);
Ok(Box::new(spawn_server(storage, &addr)))
}
},
}
}
pub async fn run(
self,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let server = self.spawn(addr).await?;
tracing::info!("Listening on {}", server.addr());
server.await?;
Ok(())
}
}
#[derive(Debug)]
pub struct LocalServerBuilder {
path: PathBuf,
key: Option<[u8; 32]>,
cache: Option<Cache>,
}
impl LocalServerBuilder {
pub fn new(path: PathBuf) -> Self {
Self {
path,
key: None,
cache: None,
}
}
pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
self.key = Some(key);
self
}
pub fn cache(&mut self, cache: Cache) -> &mut Self {
self.cache = Some(cache);
self
}
pub async fn spawn(
self,
addr: SocketAddr,
) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
{
let storage = Disk::new(self.path).map_err(Error::from).await?;
match self.key {
Some(key) => {
let storage = Verify::new(Encrypted::new(key, storage));
tracing::info!(
"Local disk storage initialized (with encryption)."
);
Ok(Box::new(spawn_server(storage, &addr)))
}
None => {
let storage = Verify::new(storage);
tracing::info!(
"Local disk storage initialized (without encryption)."
);
Ok(Box::new(spawn_server(storage, &addr)))
}
}
}
pub async fn run(
self,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let server = self.spawn(addr).await?;
tracing::info!("Listening on {}", server.addr());
server.await?;
Ok(())
}
}
fn spawn_server<S>(storage: S, addr: &SocketAddr) -> impl Server
where
S: Storage + Send + Sync + 'static,
S::Error: Into<Error>,
Error: From<S::Error>,
{
let storage = Arc::new(storage);
let new_service = make_service_fn(move |socket: &AddrStream| {
let service = App::new(storage.clone());
future::ok::<_, Infallible>(Logger::new(socket.remote_addr(), service))
});
hyper::Server::bind(addr).serve(new_service)
}