use std::{path::PathBuf, rc::Rc};
use anyhow::{Context, Result, anyhow};
use futures::AsyncWriteExt;
use glommio::{
ByteSliceMutExt,
io::{DmaFile, ImmutableFileBuilder},
};
pub struct Writer {
pub(crate) path: PathBuf,
pub(crate) keys: caos::Writer<u64>,
pub(crate) keys_file: Rc<DmaFile>,
pub(crate) table_offsets: Vec<caos::Writer<u64>>,
pub(crate) table_offsets_files: Vec<Rc<DmaFile>>,
pub(crate) table_names: Vec<String>,
pub(crate) table_files: Vec<Rc<DmaFile>>,
pub(crate) write_offsets: Vec<u64>,
pub(crate) length: u64,
}
impl Writer {
pub fn table_names(&self) -> &[String] {
&self.table_names
}
pub async fn append(&mut self, key: u64, values: Vec<Vec<u8>>) -> Result<()> {
if values.len() != self.table_names.len() {
return Err(anyhow!(
"number of values ({}) does not equal the number of tables ({})",
values.len(),
self.table_names.len()
));
}
let new_write_offsets = self
.write_offsets
.iter()
.zip(values.iter())
.map(|(&offset, val)| offset + u64::try_from(val.len()).unwrap())
.collect::<Vec<u64>>();
let mut futs = Vec::with_capacity(self.table_names.len());
for ((file, &offset), value) in self
.table_files
.iter()
.zip(self.write_offsets.iter())
.zip(values.into_iter())
{
let file = file.clone();
futs.push(async move { read_write_at(&file, &value, offset).await });
}
futures::future::try_join_all(futs)
.await
.context("write to table data files")?;
let offset_write_offset = self.length * 8;
let mut futs = Vec::with_capacity(self.table_names.len());
for (file, &offset) in self
.table_offsets_files
.iter()
.zip(new_write_offsets.iter())
{
let file = file.clone();
futs.push(async move {
read_write_at(&file, &offset.to_be_bytes(), offset_write_offset).await
});
}
futures::future::try_join_all(futs)
.await
.context("write to table offset files")?;
read_write_at(&self.keys_file, &key.to_be_bytes(), offset_write_offset)
.await
.context("write to the keys file")?;
let mut path = self.path.clone();
path.push("new_length");
glommio::io::remove(&path).await.ok();
let mut sink = ImmutableFileBuilder::new(&path)
.build_sink()
.await
.map_err(|e| anyhow!("{}", e))
.context("build new length file")?;
sink.write_all(&(self.length + 1).to_be_bytes())
.await
.context("write to new length file")?;
sink.sync()
.await
.map_err(|e| anyhow!("{}", e))
.context("sync new length file to disk")?;
sink.close()
.await
.map_err(|e| anyhow!("{}", e))
.context("close new length file")?;
let mut final_path = self.path.clone();
final_path.push("length");
glommio::io::rename(&path, &final_path)
.await
.map_err(|e| anyhow!("{}", e))
.context("rename length file")?;
self.write_offsets = new_write_offsets;
self.length += 1;
for (offsets, &offset) in self.table_offsets.iter_mut().zip(self.write_offsets.iter()) {
offsets.append(&[offset]);
}
self.keys.append(&[key]);
Ok(())
}
pub async fn close(self) -> Result<()> {
futures::future::try_join_all(
self.table_files
.into_iter()
.chain(std::iter::once(self.keys_file))
.chain(self.table_offsets_files.into_iter())
.map(|f| Rc::try_unwrap(f).expect("unwrap file Rc").close()),
)
.await
.map_err(|e| anyhow!("{}", e))
.context("close all files")?;
Ok(())
}
}
async fn read_write_at(file: &DmaFile, data: &[u8], pos: u64) -> Result<()> {
let write_pos = file.align_down(pos);
assert!(write_pos <= pos);
let extra_read_size = usize::try_from(pos - write_pos).unwrap();
let bufsize = extra_read_size + data.len() + 8;
let mut buf = file.alloc_dma_buffer(
file.align_up(bufsize.try_into().unwrap())
.try_into()
.unwrap(),
);
if extra_read_size > 0 {
let read_buf = file
.read_at(write_pos, extra_read_size)
.await
.map_err(|e| anyhow!("{}", e))
.context("read extra data for alignment")?;
if read_buf.len() != extra_read_size {
return Err(anyhow!("failed to read extra data, size mismatch"));
}
buf.write_at(0, &*read_buf);
}
buf.write_at(extra_read_size, data);
file.write_at(buf, write_pos)
.await
.map_err(|e| anyhow!("{}", e))
.context("failed to write data")?;
file.fdatasync()
.await
.map_err(|e| anyhow!("{}", e))
.context("fdatasync file")?;
Ok(())
}