use std::ffi::OsStr;
use std::fs::Metadata;
use std::io;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use async_stream::try_stream;
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::{StreamExt, TryStreamExt};
use tokio::{self, fs};
use tokio_util::codec::{Decoder, Encoder, Framed};
use uuid::Uuid;
use super::{LFSObject, Namespace, Storage, StorageKey, StorageStream};
use crate::lfs::Oid;
use crate::util::NamedTempFile;
pub struct Backend {
root: PathBuf,
}
impl Backend {
pub async fn new(root: PathBuf) -> Result<Self, io::Error> {
fs::create_dir_all(&root).await?;
Ok(Backend { root })
}
fn key_to_path(&self, key: &StorageKey) -> PathBuf {
self.root.join(format!(
"objects/{}/{}",
key.namespace(),
key.oid().path()
))
}
}
#[async_trait]
impl Storage for Backend {
type Error = io::Error;
async fn get(
&self,
key: &StorageKey,
) -> Result<Option<LFSObject>, Self::Error> {
let file = match fs::File::open(self.key_to_path(key)).await {
Ok(file) => file,
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
return Ok(None);
} else {
return Err(err);
}
}
};
let metadata = file.metadata().await?;
let stream =
Framed::new(file, BytesCodec::new()).map_ok(BytesMut::freeze);
Ok(Some(LFSObject::new(metadata.len(), Box::pin(stream))))
}
async fn put(
&self,
key: StorageKey,
value: LFSObject,
) -> Result<(), Self::Error> {
let path = self.key_to_path(&key);
let dir = path.parent().unwrap().to_path_buf();
let (len, stream) = value.into_parts();
let incomplete = self.root.join("incomplete");
let temp_path = incomplete.join(Uuid::new_v4().to_string());
fs::create_dir_all(incomplete).await?;
let file = NamedTempFile::new(temp_path).await?;
let mut sink = Framed::new(file, BytesCodec::new());
stream.forward(&mut sink).await?;
let written = sink.codec().written();
let file = sink.into_inner();
if written != len {
Err(io::Error::new(
io::ErrorKind::Other,
"got incomplete object",
))
} else {
fs::create_dir_all(dir).await?;
file.persist(path).await?;
Ok(())
}
}
async fn size(&self, key: &StorageKey) -> Result<Option<u64>, Self::Error> {
let path = self.key_to_path(key);
fs::metadata(path)
.await
.map(move |metadata| Some(metadata.len()))
.or_else(move |err| match err.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(err),
})
}
async fn delete(&self, key: &StorageKey) -> Result<(), Self::Error> {
fs::remove_file(self.key_to_path(key))
.await
.or_else(move |err| match err.kind() {
io::ErrorKind::NotFound => Ok(()),
_ => Err(err),
})
}
fn list(&self) -> StorageStream<(StorageKey, u64), Self::Error> {
let path = self.root.join("objects");
let objects = try_stream! {
let mut orgs = fs::read_dir(path).await?;
while let Some(entry) = orgs.next_entry().await? {
let mut projects = fs::read_dir(entry.path()).await?;
while let Some(entry) = projects.next_entry().await? {
let mut tier1 = fs::read_dir(entry.path()).await?;
while let Some(entry) = tier1.next_entry().await? {
let mut tier2 = fs::read_dir(entry.path()).await?;
while let Some(entry) = tier2.next_entry().await? {
yield entry;
}
}
}
}
};
Box::pin(objects.filter_map(move |entry: io::Result<_>| {
fn do_it(
path: PathBuf,
metadata: Metadata,
) -> Option<(StorageKey, u64)> {
let project_path = path.parent()?.parent()?.parent()?;
let project = project_path.file_name()?.to_str()?;
let org = project_path.parent()?.file_name()?.to_str()?;
let namespace = Namespace::new(org.into(), project.into());
let oid = path
.file_name()
.and_then(OsStr::to_str)
.and_then(|s| Oid::from_str(s).ok())?;
if metadata.is_file() {
Some((StorageKey::new(namespace, oid), metadata.len()))
} else {
None
}
}
async move {
match entry {
Ok(entry) => {
let path = entry.path();
match entry.metadata().await {
Err(err) => Some(Err(err)),
Ok(metadata) => do_it(path, metadata).map(Ok),
}
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
None
} else {
Some(Err(err))
}
}
}
}
}))
}
fn public_url(&self, _key: &StorageKey) -> Option<String> {
None
}
async fn upload_url(
&self,
_key: &StorageKey,
_expires_in: Duration,
) -> Option<String> {
None
}
}
struct BytesCodec {
written: u64,
}
impl BytesCodec {
pub fn new() -> Self {
BytesCodec { written: 0 }
}
pub fn written(&self) -> u64 {
self.written
}
}
impl Decoder for BytesCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(
&mut self,
buf: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if !buf.is_empty() {
let len = buf.len();
Ok(Some(buf.split_to(len)))
} else {
Ok(None)
}
}
}
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
fn encode(
&mut self,
data: Bytes,
buf: &mut BytesMut,
) -> Result<(), io::Error> {
let len = data.len();
self.written += len as u64;
buf.reserve(len);
buf.put(data);
Ok(())
}
}