use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use crate::io::{FileExt, Io};
use crate::segment::compacted::CompactedSegmentDataHeader;
use crate::storage::{Error, Result};
use libsql_sys::name::NamespaceName;
use super::{Backend, SegmentMeta};
pub struct FsBackend<I, S> {
prefix: PathBuf,
io: Arc<I>,
remote_storage: Arc<S>,
}
impl<I: Io, S> FsBackend<I, S> {
fn new(prefix: PathBuf, io: I, remote_storage: S) -> Result<Self> {
io.create_dir_all(&prefix.join("segments")).unwrap();
Ok(FsBackend {
prefix,
io: Arc::new(io),
remote_storage: Arc::new(remote_storage),
})
}
}
pub(crate) trait RemoteStorage: Send + Sync + 'static {
type FetchStream: AsyncBufRead + Unpin;
fn upload(
&self,
file_path: &Path,
meta: &SegmentMeta,
) -> impl Future<Output = Result<()>> + Send;
fn fetch(
&self,
namespace: &NamespaceName,
frame_no: u64,
) -> impl Future<Output = Result<(String, Self::FetchStream)>> + Send;
}
impl RemoteStorage for () {
type FetchStream = tokio::io::Empty;
async fn upload(&self, _file_path: &Path, _meta: &SegmentMeta) -> Result<()> {
Ok(())
}
async fn fetch(
&self,
_namespace: &NamespaceName,
frame_no: u64,
) -> Result<(String, Self::FetchStream)> {
Err(Error::FrameNotFound(frame_no))
}
}
impl<I: Io, S: RemoteStorage> Backend for FsBackend<I, S> {
type Config = ();
async fn store(
&self,
_config: &Self::Config,
meta: super::SegmentMeta,
segment_data: impl crate::io::file::FileExt,
_segment_index: Vec<u8>,
) -> Result<()> {
let key = format!(
"{:019}-{:019}-{:019}.segment",
meta.start_frame_no,
meta.end_frame_no,
meta.created_at.timestamp()
);
let path = self.prefix.join("segments").join(&key);
let buf = Vec::with_capacity(segment_data.len().unwrap() as usize);
let f = self.io.open(true, false, true, &path).unwrap();
let (buf, res) = segment_data.read_exact_at_async(buf, 0).await;
res?;
let (_, res) = f.write_all_at_async(buf, 0).await;
res?;
self.remote_storage.upload(&path, &meta).await?;
Ok(())
}
async fn fetch_segment(
&self,
_config: &Self::Config,
namespace: NamespaceName,
frame_no: u64,
dest_path: &Path,
) -> Result<()> {
let dir = self.prefix.join("segments");
let mut dirs = tokio::fs::read_dir(&dir).await?;
while let Some(entry) = dirs.next_entry().await? {
let file = entry.file_name();
let key = file.to_str().unwrap().split(".").next().unwrap();
let mut comp = key.split("-");
let start_frame = comp.next().unwrap();
let end_frame = comp.next().unwrap();
let start_frame: u64 = start_frame.parse().unwrap();
let end_frame: u64 = end_frame.parse().unwrap();
if start_frame <= frame_no && end_frame >= frame_no {
#[cfg(debug_assertions)]
{
use crate::io::buf::ZeroCopyBuf;
let header_buf = ZeroCopyBuf::<CompactedSegmentDataHeader>::new_uninit();
let file = self.io.open(false, true, false, &entry.path()).unwrap();
let (header_buf, res) = file.read_exact_at_async(header_buf, 0).await;
res.unwrap();
let header = header_buf.get_ref();
let start_frame_from_header = header.start_frame_no.get();
let end_frame_from_header = header.end_frame_no.get();
assert_eq!(start_frame, start_frame_from_header);
assert_eq!(end_frame, end_frame_from_header);
}
self.io.hard_link(&entry.path(), dest_path)?;
return Ok(());
}
}
let (file_name, mut reader) = self.remote_storage.fetch(&namespace, frame_no).await?;
let file_path = dir.join(file_name);
let file = self.io.open(true, true, true, &file_path).unwrap();
let mut offset = 0;
loop {
let buf = reader.fill_buf().await.unwrap();
let buf = Vec::from(buf);
if buf.is_empty() {
break;
}
let (buf, res) = file.write_all_at_async(buf, offset).await;
res?;
offset += buf.len() as u64;
reader.consume(buf.len());
}
self.io.hard_link(&file_path, dest_path)?;
Ok(())
}
async fn meta(
&self,
_config: &Self::Config,
_namespace: NamespaceName,
) -> Result<super::DbMeta> {
todo!()
}
fn default_config(&self) -> std::sync::Arc<Self::Config> {
todo!()
}
}
pub(super) fn parse_segment_file_name(name: &str) -> Result<(u64, u64)> {
tracing::debug!("parsing file name: {}", name);
let key = name.split(".").next().unwrap();
let mut comp = key.split("-");
let start_frame = comp.next().unwrap();
let end_frame = comp.next().unwrap();
let start_frame: u64 = start_frame.parse().unwrap();
let end_frame: u64 = end_frame.parse().unwrap();
Ok((start_frame, end_frame))
}
pub(super) fn generate_key(meta: &SegmentMeta) -> String {
format!(
"{:019}-{:019}-{:019}.segment",
meta.start_frame_no,
meta.end_frame_no,
meta.created_at.timestamp()
)
}
#[cfg(test)]
mod tests {
use std::io::Read;
use chrono::Utc;
use tempfile::tempdir;
use uuid::Uuid;
use zerocopy::{AsBytes, FromZeroes};
use super::*;
use crate::io::StdIO;
#[tokio::test]
async fn read_write() {
let dir = tempdir().unwrap();
let fs = FsBackend::new(dir.path().into(), StdIO::default(), ()).unwrap();
let namespace = NamespaceName::from_string("".into());
let segment = CompactedSegmentDataHeader {
start_frame_no: 0.into(),
frame_count: 10.into(),
segment_id: 0.into(),
end_frame_no: 64.into(),
};
fs.store(
&(),
crate::storage::backend::SegmentMeta {
namespace: namespace.clone(),
segment_id: Uuid::new_v4(),
start_frame_no: 0,
end_frame_no: 64,
created_at: Utc::now(),
},
segment.as_bytes().to_vec(),
Vec::new(),
)
.await
.unwrap();
let path = dir.path().join("fetched_segment");
fs.fetch_segment(&(), namespace.clone(), 5, &path)
.await
.unwrap();
let mut file = std::fs::File::open(path).unwrap();
let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed();
file.read_exact(header.as_bytes_mut()).unwrap();
assert_eq!(header.start_frame_no.get(), 0);
assert_eq!(header.end_frame_no.get(), 64);
}
}