use std::{
fmt::{
Debug,
Formatter,
},
sync::{
Arc,
atomic::{
AtomicBool,
AtomicU64,
Ordering::Relaxed,
},
},
};
use bytes::Bytes;
use parking_lot::Mutex;
use tracing::instrument;
use crate::{
block::BLOCK_SIZE,
errs::{
SegmentError,
SegmentError::{
Closing,
CorruptedBlock,
NotClosing,
},
},
index::Index,
map::{
MAX_GROWTH_INCREMENT,
Map,
},
segment::Metadata,
};
pub struct SegmentWriter {
pub(crate) map: Arc<Map>,
current_offset: Mutex<usize>,
block_count: AtomicU64,
closing: AtomicBool,
closed: AtomicBool,
}
impl SegmentWriter {
#[instrument(level = "trace")]
pub fn new(map: Arc<Map>) -> Result<Self, SegmentError> {
Ok(Self {
map,
current_offset: Mutex::new(0),
block_count: AtomicU64::new(0),
closing: AtomicBool::new(false),
closed: AtomicBool::new(false),
})
}
#[instrument(level = "trace")]
fn calculate_new_size(&self, required_size: usize) -> u64 {
let mut new_size = required_size as u64;
let current_size = self.map.len() as u64;
if new_size <= current_size + MAX_GROWTH_INCREMENT {
new_size = new_size.div_ceil(MAX_GROWTH_INCREMENT) * MAX_GROWTH_INCREMENT;
} else {
}
new_size.max(current_size)
}
pub(crate) fn write_block_direct<F>(&mut self, builder_fn: F) -> Result<(), SegmentError>
where
F: FnOnce(&mut [u8]), {
if self.closing.load(Relaxed) {
return Err(Closing);
}
let mut current_offset = self.current_offset.lock();
let required_size = *current_offset + BLOCK_SIZE;
if required_size > self.map.len() {
let new_size = self.calculate_new_size(required_size);
if let Err(e) = self.map.grow(new_size) {
return Err(e);
}
}
let block_range = *current_offset..(*current_offset + BLOCK_SIZE);
if let Err(e) = self.map.write_to_range(block_range, builder_fn) {
return Err(e);
}
*current_offset += BLOCK_SIZE;
self.block_count.fetch_add(1, Relaxed);
Ok(())
}
#[instrument(level = "trace")]
pub(crate) fn begin_close(&self) {
self.closing.store(true, Relaxed);
}
#[instrument(level = "trace")]
pub(crate) fn write_index(&self, index: &Index) -> Result<u64, SegmentError> {
self.begin_close();
let mut current_offset = self.current_offset.lock();
let index_start = *current_offset;
let index_end = *current_offset + index.size();
let index_bytes = Bytes::from(index);
let index_size = index_bytes.len();
if index_size == 0 || index_size < 56 {
return Err(CorruptedBlock);
}
if index_end > self.map.len() {
let new_size = self.calculate_new_size(index_end);
match self.map.grow(new_size) {
| Ok(_) => {},
| Err(e) => {
return Err(e);
},
};
}
let index_range = index_start..index_end;
match self.map.write_to_range(index_range, |slice| unsafe {
index.finalize(slice.as_mut_ptr());
}) {
| Ok(_) => {},
| Err(e) => {
return Err(e);
},
};
*current_offset = index_end;
Ok(index_start as u64)
}
#[instrument(level = "trace")]
pub(crate) fn write_metadata(&self, metadata: Metadata) -> Result<(), SegmentError> {
if !self.closing.load(Relaxed) {
return Err(NotClosing);
}
let metadata_size = metadata.serialized_size();
let mut current_offset = self.current_offset.lock();
let metadata_start = *current_offset;
let metadata_end = *current_offset + metadata_size;
if metadata_end > self.map.len() {
let new_size = self.calculate_new_size(metadata_end);
match self.map.grow(new_size) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
}
let metadata_range = metadata_start..metadata_end;
match self.map.write_to_range(metadata_range, |slice| unsafe {
metadata.finalize(slice.as_mut_ptr());
}) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
*current_offset = metadata_end;
Ok(())
}
pub(crate) fn current_offset(&self) -> usize {
*self.current_offset.lock()
}
pub(crate) fn block_count(&self) -> u64 {
self.block_count.load(Relaxed)
}
#[instrument(level = "trace")]
pub(crate) fn close(&self) -> Result<(), SegmentError> {
if self.closed.load(Relaxed) {
return Ok(());
}
let current_offset = self.current_offset();
self.map.close()?;
self.map.shrink(current_offset as u64)?;
self.closed.store(true, Relaxed);
Ok(())
}
}
impl Debug for SegmentWriter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentWriter")
.field("current_offset", &self.current_offset())
.field("block_count", &self.block_count())
.field("map_size", &self.map.len())
.finish()
}
}
#[cfg(test)]
#[allow(clippy::question_mark_used)]
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::undocumented_unsafe_blocks)]
mod tests {
use tempfile::tempdir;
use super::*;
use crate::block::BLOCK_SIZE;
fn create_test_map() -> Result<(Arc<Map>, tempfile::TempDir), SegmentError> {
let dir = tempdir().expect("failed to create temp dir");
let file_path = dir.path().join("test-segment");
let map = Arc::new(Map::new(file_path, BLOCK_SIZE as u64 * 10)?);
Ok((map, dir))
}
#[test]
fn test_segment_writer_creation() {
let (map, _dir) = create_test_map().expect("failed to create map");
let writer = SegmentWriter::new(map);
assert!(writer.is_ok(), "failed to create segment writer");
}
}