use crate::repository::backend::common::files::*;
use crate::repository::backend::common::segment::*;
use crate::repository::backend::SegmentDescriptor;
use crate::repository::ChunkID;
use anyhow::{anyhow, Context, Result};
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::{Spawn, SpawnExt};
use lru::LruCache;
use std::fs::{create_dir, File};
use std::path::{Path, PathBuf};
use walkdir::WalkDir;
struct SegmentPair<R>(u64, Segment<R>);
struct InternalSegmentHandler {
current_segment: Option<SegmentPair<LockedFile>>,
highest_segment: u64,
size_limit: u64,
ro_segment_cache: LruCache<u64, SegmentPair<File>>,
path: PathBuf,
segments_per_directory: u64,
}
impl InternalSegmentHandler {
fn open(
repository_path: impl AsRef<Path>,
size_limit: u64,
segments_per_directory: u64,
) -> Result<InternalSegmentHandler> {
let data_path = repository_path.as_ref().join("data");
if !data_path.exists() {
create_dir(&data_path)?;
}
let max_segment = WalkDir::new(&data_path)
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
.filter_map(|e| {
e.path()
.file_name()
.map(|x| x.to_str().unwrap().to_string())
})
.filter_map(|e| Result::ok(e.parse::<u64>()))
.max()
.unwrap_or(0);
let mut segment_handler = InternalSegmentHandler {
current_segment: None,
highest_segment: max_segment,
size_limit,
ro_segment_cache: LruCache::new(100),
path: data_path,
segments_per_directory,
};
segment_handler.open_segment_write()?;
Ok(segment_handler)
}
fn open_segement_read(&mut self, segment_id: u64) -> Result<&mut SegmentPair<File>> {
let cache = &mut self.ro_segment_cache;
if !cache.contains(&segment_id) {
let folder_id = segment_id / self.segments_per_directory;
let folder_path = self.path.join(folder_id.to_string());
if !(folder_path.exists() && folder_path.is_dir()) {
return Err(anyhow!(
"Segment directory {} for segment {} does not exist or is not a folder",
folder_id,
segment_id
));
}
let segment_path = folder_path.join(segment_id.to_string());
if !(segment_path.exists() && segment_path.is_file()) {
return Err(anyhow!(
"File for segment {} opened in read only mode does not exists",
segment_id
));
}
let segment_file = File::open(segment_path)?;
let segment_pair =
SegmentPair(segment_id, Segment::new(segment_file, self.size_limit)?);
cache.put(segment_id, segment_pair);
}
let segment_pair = cache.get_mut(&segment_id).unwrap();
Ok(segment_pair)
}
fn segment_exists(&self, segment_id: u64) -> bool {
let folder_id = segment_id / self.segments_per_directory;
let folder_path = self.path.join(folder_id.to_string());
if !(folder_path.exists() && folder_path.is_dir()) {
return false;
}
let segment_path = folder_path.join(segment_id.to_string());
segment_path.exists() && segment_path.is_file()
}
fn open_segment_write(&mut self) -> Result<&mut SegmentPair<LockedFile>> {
if self.current_segment.is_none() {
while self.segment_exists(self.highest_segment) {
self.highest_segment += 1;
}
let segment_id = self.highest_segment;
let folder_id = segment_id / self.segments_per_directory;
let folder_path = self.path.join(folder_id.to_string());
if !folder_path.exists() {
create_dir(&folder_path)?;
}
let segment_path = folder_path.join(segment_id.to_string());
let segment_file = LockedFile::open_read_write(&segment_path)?
.with_context(|| "Unable to lock newly created segement file")?;
let segment = SegmentPair(segment_id, Segment::new(segment_file, self.size_limit)?);
self.current_segment = Some(segment);
}
Ok(self.current_segment.as_mut().unwrap())
}
fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Vec<u8>> {
let segment_id = location.segment_id;
let segment = self.open_segement_read(segment_id)?;
segment.1.read_chunk(location.start, 0)
}
fn write_chunk(&mut self, chunk: &[u8], id: ChunkID) -> Result<SegmentDescriptor> {
let segment = self.open_segment_write()?;
let (start, length) = segment.1.write_chunk(&chunk, id)?;
let descriptor = SegmentDescriptor {
segment_id: segment.0,
start,
};
if start + length >= self.size_limit {
self.current_segment = None
}
Ok(descriptor)
}
}
enum SegmentHandlerCommand {
ReadChunk(SegmentDescriptor, oneshot::Sender<Result<Vec<u8>>>),
WriteChunk(Vec<u8>, ChunkID, oneshot::Sender<Result<SegmentDescriptor>>),
}
#[derive(Clone)]
pub struct SegmentHandler {
input: mpsc::Sender<SegmentHandlerCommand>,
path: String,
}
impl SegmentHandler {
pub fn open(
repository_path: impl AsRef<Path>,
size_limit: u64,
segments_per_directory: u64,
pool: impl Spawn,
) -> Result<SegmentHandler> {
let mut handler =
InternalSegmentHandler::open(repository_path, size_limit, segments_per_directory)?;
let path = handler.path.to_str().unwrap().to_string();
let (input, mut output) = mpsc::channel(100);
pool.spawn(async move {
while let Some(command) = output.next().await {
match command {
SegmentHandlerCommand::ReadChunk(location, ret) => {
ret.send(handler.read_chunk(location)).unwrap();
}
SegmentHandlerCommand::WriteChunk(chunk, id, ret) => {
ret.send(handler.write_chunk(&chunk, id)).unwrap();
}
}
}
})
.expect("Failed to spawn segment handler task");
Ok(SegmentHandler { input, path })
}
pub async fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Vec<u8>> {
let (input, output) = oneshot::channel();
self.input
.send(SegmentHandlerCommand::ReadChunk(location, input))
.await
.unwrap();
output.await.unwrap()
}
pub async fn write_chunk(&mut self, chunk: Vec<u8>, id: ChunkID) -> Result<SegmentDescriptor> {
let (input, output) = oneshot::channel();
self.input
.send(SegmentHandlerCommand::WriteChunk(chunk, id, input))
.await
.unwrap();
output.await.unwrap()
}
}
impl std::fmt::Debug for SegmentHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SegmentHandler: {:?}", self.path)
}
}