use crate::files;
use crate::files::*;
use crate::handle::*;
use crate::types::Result;
use std::collections::HashMap;
use tokio::io::AsyncWrite;
pub struct Segments {
location: PathBuf,
map: HashMap<u64, Segment>,
}
impl Segments {
pub fn open(location: PathBuf) -> Result<Segments> {
let ids = files::list_files_as_u64(&location, "index")?;
let mut map = HashMap::new();
for id in ids {
map.insert(id, Segment::new(location.to_owned(), id));
}
Ok(Segments { location, map })
}
pub async fn create(&mut self, id: u64) -> Result<&mut Segment> {
let segment = Segment::new(self.location.to_owned(), id);
self.map.insert(id, segment);
Ok(self.map.get_mut(&id).unwrap())
}
pub fn get(&self, id: u64) -> Option<&Segment> {
self.map.get(&id)
}
pub fn create_if_absent(&mut self, id: u64) -> &mut Segment {
if !self.map.contains_key(&id) {
let segment = Segment::new(self.location.to_owned(), id);
self.map.insert(id, segment);
}
self.map.get_mut(&id).unwrap()
}
pub fn get_mut(&mut self, id: u64) -> Option<&mut Segment> {
self.map.get_mut(&id)
}
pub fn len(&self) -> usize {
self.map.len()
}
}
pub struct Segment {
pub id: u64,
location: PathBuf,
handle: Option<Handle>,
}
impl Segment {
pub fn new(location: PathBuf, id: u64) -> Segment {
Segment {
id,
location: location,
handle: None,
}
}
pub async fn add(&mut self, entries: Vec<Vec<u8>>) -> Result<()> {
if self.handle.is_none() {
self.handle = Some(Handle::new(&self.location, self.id).await?)
}
let handle = self.handle.as_mut().unwrap();
handle.add(entries).await?;
Ok(())
}
pub async fn stream<T>(&mut self, offset: u64, bytes: usize, target: &mut T) -> Result<()>
where
T: AsyncWrite + Unpin + ?Sized,
{
if let Some(handle) = self.handle.as_mut() {
handle.stream(offset, bytes, target).await?;
}
Ok(())
}
pub async fn size(&self) -> u64 {
match self.handle.as_ref() {
Some(h) => h.log_size().await,
None => 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util as test;
#[tokio::test]
async fn create_new_segment() {
let location = test::create_a_test_directory();
let mut segments = Segments::open(location).unwrap();
let segment = segments.create(5).await.unwrap();
assert_eq!(segment.id, 5);
}
#[tokio::test]
async fn no_segments() {
let location = test::create_a_test_directory();
assert_eq!(Segments::open(location).unwrap().len(), 0);
}
#[tokio::test]
async fn add_entries_to_segment() {
let mut segments = Segments::open(test::create_a_test_directory()).unwrap();
let segment = segments.create(5).await.unwrap();
segment.add(vec![vec![1, 2], vec![5, 6]]).await.unwrap();
assert_eq!(segment.size().await, 28);
}
#[tokio::test]
async fn stream_entries_from_segment() {
let segment_id = 5;
let mut segments = Segments::open(test::create_a_test_directory()).unwrap();
let segment = segments.create(segment_id).await.unwrap();
segment.add(vec![vec![1, 2], vec![5, 6]]).await.unwrap();
let mut buf = Vec::new();
segment.stream(segment_id, 16000, &mut buf).await.unwrap();
assert_eq!(
&buf,
&vec![
0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 2, 1, 2, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 2, 5, 6 ]
);
}
}