use super::{
reader::ArchiveIndex, scan_file, ArchiveReader, BatchItem, CompressionHeader,
FileHeader, Id, MonotonicTimestamper, PathMapping, RecordHeader, RecordIndex,
RecordTooLarge, RecordTyp, Timestamp, COMMITTED_OFFSET, FILE_VERSION, MAX_RECORD_LEN,
PM_POOL,
};
use ahash::AHashMap;
use anyhow::Result;
use bytes::BufMut;
use chrono::prelude::*;
use fs3::{allocation_granularity, FileExt};
use indexmap::IndexMap;
use log::warn;
use memmap2::{Mmap, MmapMut};
use netidx::{pack::Pack, path::Path};
use nohash::IntSet;
use parking_lot::RwLock;
use poolshark::global::GPooled;
use std::{
self,
cmp::max,
fs::{File, OpenOptions},
iter::IntoIterator,
mem,
ops::Drop,
path::Path as FilePath,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub struct ArchiveWriter {
time: MonotonicTimestamper,
path_by_id: IndexMap<Id, Path, nohash::BuildNoHashHasher<Id>>,
id_by_path: AHashMap<Path, Id>,
file: Arc<File>,
_external_lock: Option<Arc<File>>,
end: Arc<AtomicUsize>,
committed: usize,
next_id: u32,
block_size: usize,
mmap: MmapMut,
indexed: bool,
index: IntSet<Id>,
index_vec: Vec<Id>,
}
impl Drop for ArchiveWriter {
fn drop(&mut self) {
let _ = self.flush();
let _ = self.mmap.flush(); let _ = FileExt::unlock(&*self.file); }
}
impl ArchiveWriter {
pub(super) fn open_full(
path: impl AsRef<FilePath>,
indexed: bool,
compress: Option<Vec<u8>>,
external_lock: Option<impl AsRef<FilePath>>,
) -> Result<Self> {
if mem::size_of::<usize>() < mem::size_of::<u64>() {
warn!("archive file size is limited to 4 GiB on this platform")
}
let time = MonotonicTimestamper::new();
let external_lock = if let Some(path) = external_lock {
let lock = if FilePath::is_file(path.as_ref()) {
OpenOptions::new().read(true).write(true).open(path.as_ref())?
} else {
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path.as_ref())?
};
lock.try_lock_exclusive()?;
Some(Arc::new(lock))
} else {
None
};
if FilePath::is_file(path.as_ref()) {
if compress.is_some() {
bail!("can't write to an already compressed file")
}
let mut time_basis = DateTime::<Utc>::MIN_UTC;
let file = OpenOptions::new().read(true).write(true).open(path.as_ref())?;
if external_lock.is_none() {
file.try_lock_exclusive()?;
}
let block_size = allocation_granularity(path)? as usize;
let mmap = unsafe { MmapMut::map_mut(&file)? };
let mut t = ArchiveWriter {
time,
path_by_id: IndexMap::default(),
id_by_path: AHashMap::default(),
file: Arc::new(file),
_external_lock: external_lock,
end: Arc::new(AtomicUsize::new(0)),
committed: 0,
next_id: 0,
block_size,
mmap,
indexed: false,
index: IntSet::default(),
index_vec: Vec::new(),
};
let mut compress = None;
let end = scan_file(
&mut t.indexed,
&mut compress,
&mut t.path_by_id,
&mut t.id_by_path,
None,
None,
&mut time_basis,
&mut t.next_id,
&mut &*t.mmap,
)?;
if compress.is_some() {
bail!("can't write to an already compressed file")
}
t.next_id += 1;
t.end.store(end, Ordering::Relaxed);
t.committed = end;
Ok(t)
} else {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path.as_ref())?;
if external_lock.is_none() {
file.try_lock_exclusive()?;
}
let block_size = allocation_granularity(path.as_ref())? as usize;
let fh_len = <FileHeader as Pack>::const_encoded_len().unwrap();
let rh_len = <RecordHeader as Pack>::const_encoded_len().unwrap();
let comp_hdr = compress.map(|dictionary| CompressionHeader { dictionary });
let ch_len = comp_hdr.as_ref().map(|ch| ch.encoded_len()).unwrap_or(0);
file.set_len(max(block_size, fh_len + rh_len + ch_len) as u64)?;
let mut mmap = unsafe { MmapMut::map_mut(&file)? };
let mut buf = &mut *mmap;
let committed = (fh_len + ch_len) as u64;
let fh = FileHeader {
compressed: comp_hdr.is_some(),
indexed,
version: FILE_VERSION,
committed,
};
<FileHeader as Pack>::encode(&fh, &mut buf)?;
if let Some(hdr) = comp_hdr {
hdr.encode(&mut buf)?;
}
mmap.flush()?;
Ok(ArchiveWriter {
time,
path_by_id: IndexMap::default(),
id_by_path: AHashMap::default(),
file: Arc::new(file),
_external_lock: external_lock,
end: Arc::new(AtomicUsize::new(committed as usize)),
committed: committed as usize,
next_id: 0,
block_size,
mmap,
indexed: true,
index: IntSet::default(),
index_vec: Vec::new(),
})
}
}
pub fn open(path: impl AsRef<FilePath>) -> Result<Self> {
Self::open_full(path, true, None, None::<&str>)
}
pub fn open_external(
path: impl AsRef<FilePath>,
external_lock: impl AsRef<FilePath>,
) -> Result<Self> {
Self::open_full(path, true, None, Some(external_lock))
}
fn reserve(&mut self, additional_capacity: usize) -> Result<()> {
let len = self.mmap.len();
let new_len = len + max(len >> 6, additional_capacity);
let new_blocks = (new_len / self.block_size as usize) + 1;
let new_size = new_blocks * self.block_size as usize;
self.file.set_len(new_size as u64)?;
Ok(drop(mem::replace(&mut self.mmap, unsafe { MmapMut::map_mut(&*self.file)? })))
}
fn check_reserve(&mut self, record_length: usize) -> Result<usize> {
if record_length > MAX_RECORD_LEN as usize {
bail!(RecordTooLarge);
}
let len = <RecordHeader as Pack>::const_encoded_len().unwrap() + record_length;
if self.mmap.len() - self.end.load(Ordering::Relaxed) < len {
self.reserve(len)?;
}
Ok(len)
}
pub fn flush(&mut self) -> Result<()> {
let end = self.end.load(Ordering::Relaxed);
if self.committed < end {
self.mmap.flush()?; let mut buf = &mut self.mmap[COMMITTED_OFFSET..];
buf.put_u64(end as u64);
self.mmap.flush()?; self.committed = end;
}
Ok(())
}
pub fn add_paths<'a>(
&'a mut self,
paths: impl IntoIterator<Item = &'a Path>,
) -> Result<()> {
let mut pms = PM_POOL.take();
for path in paths {
if !self.id_by_path.contains_key(path) {
let id = Id(self.next_id);
self.next_id += 1;
self.id_by_path.insert(path.clone(), id);
self.path_by_id.insert(id, path.clone());
pms.push(PathMapping(path.clone(), id));
}
}
self.add_raw_pathmappings(pms)
}
pub(super) fn add_raw_pathmappings(
&mut self,
pms: GPooled<Vec<PathMapping>>,
) -> Result<()> {
if pms.len() > 0 {
let record_length = <GPooled<Vec<PathMapping>> as Pack>::encoded_len(&pms);
let len = self.check_reserve(record_length)?;
let end = self.end.load(Ordering::Relaxed);
let mut buf = &mut self.mmap[end..];
let rh = RecordHeader {
record_type: RecordTyp::PathMappings,
record_length: record_length as u32,
timestamp: 0,
};
<RecordHeader as Pack>::encode(&rh, &mut buf)?;
<GPooled<Vec<PathMapping>> as Pack>::encode(&pms, &mut buf)?;
self.end.fetch_add(len, Ordering::AcqRel);
}
Ok(())
}
fn add_batch_f<F: FnOnce(&mut &mut [u8]) -> Result<()>>(
&mut self,
image: bool,
timestamp: Timestamp,
record_length: usize,
f: F,
) -> Result<()> {
if record_length > MAX_RECORD_LEN as usize {
bail!(RecordTooLarge)
}
match timestamp {
Timestamp::Offset(_) => (),
Timestamp::NewBasis(basis) => {
let record_length = <DateTime<Utc> as Pack>::encoded_len(&basis);
let rh = RecordHeader {
record_type: RecordTyp::Timestamp,
record_length: record_length as u32,
timestamp: 0,
};
let len = self.check_reserve(record_length)?;
let mut buf = &mut self.mmap[self.end.load(Ordering::Relaxed)..];
<RecordHeader as Pack>::encode(&rh, &mut buf)?;
<DateTime<Utc> as Pack>::encode(&basis, &mut buf)?;
self.end.fetch_add(len, Ordering::AcqRel);
}
}
let len = self.check_reserve(record_length)?;
let mut buf = &mut self.mmap[self.end.load(Ordering::Relaxed)..];
let rh = RecordHeader {
record_type: if image {
RecordTyp::ImageBatch
} else {
RecordTyp::DeltaBatch
},
record_length: record_length as u32,
timestamp: timestamp.offset(),
};
<RecordHeader as Pack>::encode(&rh, &mut buf)?;
f(&mut buf)?;
self.end.fetch_add(len, Ordering::AcqRel);
Ok(())
}
pub fn add_batch(
&mut self,
image: bool,
timestamp: DateTime<Utc>,
batch: &GPooled<Vec<BatchItem>>,
) -> Result<()> {
if batch.len() > 0 {
let timestamp = self.time.timestamp(timestamp);
let index = if self.indexed {
if !image {
for BatchItem(id, _) in batch.iter() {
self.index.insert(*id);
}
}
self.index_vec.extend(self.index.drain());
Some(RecordIndex { index: mem::replace(&mut self.index_vec, vec![]) })
} else {
None
};
let index_length = index
.as_ref()
.map(|i| <RecordIndex as Pack>::encoded_len(i))
.unwrap_or(0);
let record_length =
index_length + <GPooled<Vec<BatchItem>> as Pack>::encoded_len(&batch);
self.add_batch_f(image, timestamp, record_length, |buf| {
if let Some(index) = &index {
<RecordIndex as Pack>::encode(index, buf)?
}
Ok(<GPooled<Vec<BatchItem>> as Pack>::encode(&batch, buf)?)
})?;
if let Some(index) = index {
self.index_vec = index.index;
self.index_vec.clear()
}
}
Ok(())
}
pub(super) fn add_batch_raw(
&mut self,
image: bool,
timestamp: DateTime<Utc>,
batch: &[u8],
) -> Result<()> {
use std::io::Write;
if batch.len() > 0 {
let timestamp = self.time.timestamp(timestamp);
self.add_batch_f(image, timestamp, batch.len(), |buf| {
Ok(BufMut::writer(buf).write_all(batch)?)
})?
}
Ok(())
}
pub fn id_for_path(&self, path: &Path) -> Option<Id> {
self.id_by_path.get(path).copied()
}
pub fn path_for_id(&self, id: &Id) -> Option<&Path> {
self.path_by_id.get(id)
}
pub fn capacity(&self) -> usize {
self.mmap.len()
}
pub fn len(&self) -> usize {
self.end.load(Ordering::Relaxed)
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn reader(&self) -> Result<ArchiveReader> {
Ok(ArchiveReader {
index: Arc::new(RwLock::new(ArchiveIndex::new())),
compressed: None,
indexed: self.indexed,
file: self.file.clone(),
end: self.end.clone(),
mmap: Arc::new(RwLock::new(unsafe { Mmap::map(&*self.file)? })),
})
}
}