use std::collections::VecDeque;
use std::sync::Arc;
use arc_swap::ArcSwap;
use crate::claim::Claimed;
use crate::segment::Segment;
use crate::snapshot::Snapshot;
pub struct AtomicLog<T> {
pub(crate) shared: Arc<Shared<T>>,
}
pub struct Writer<T> {
pub(crate) shared: Arc<Shared<T>>,
}
pub(crate) struct Shared<T> {
pub(crate) retained_capacity: usize,
pub(crate) segment_capacity: usize,
pub(crate) head: ArcSwap<Segment<T>>,
writer_state: Claimed<WriterState<T>>,
}
struct WriterState<T> {
head: Arc<Segment<T>>,
retained: VecDeque<Arc<Segment<T>>>,
retained_segments: usize,
}
impl<T> Clone for AtomicLog<T> {
fn clone(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
}
}
}
impl<T> From<Writer<T>> for AtomicLog<T> {
fn from(writer: Writer<T>) -> Self {
writer.log()
}
}
impl<T> AtomicLog<T> {
pub fn new(retained_capacity: usize, segment_capacity: usize) -> Self {
assert!(retained_capacity > 0, "retained capacity must be non-zero");
assert!(segment_capacity > 0, "segment capacity must be non-zero");
let retained_segments = retained_capacity.div_ceil(segment_capacity) + 1;
let head = Segment::new(0, std::sync::Weak::new(), segment_capacity);
let mut retained = VecDeque::with_capacity(retained_segments);
retained.push_back(Arc::clone(&head));
let shared = Arc::new(Shared {
retained_capacity,
segment_capacity,
head: ArcSwap::from(Arc::clone(&head)),
writer_state: Claimed::new_unclaimed(WriterState {
head,
retained,
retained_segments,
}),
});
Self { shared }
}
#[inline]
pub fn new_claimed(retained_capacity: usize, segment_capacity: usize) -> (Writer<T>, Self) {
let log = Self::new(retained_capacity, segment_capacity);
let writer = log
.try_claim_writer()
.expect("freshly constructed log must allow claiming a writer");
(writer, log)
}
#[inline]
pub fn retained_capacity(&self) -> usize {
self.shared.retained_capacity
}
#[inline]
pub fn segment_capacity(&self) -> usize {
self.shared.segment_capacity
}
#[inline]
pub fn is_writer_claimed(&self) -> bool {
self.shared.writer_state.is_claimed()
}
#[inline]
pub fn snapshot(&self) -> Snapshot<T> {
Snapshot::new(Arc::clone(&self.shared))
}
#[inline]
pub fn try_claim_writer(&self) -> Option<Writer<T>> {
self.shared.writer_state.try_claim().then(|| Writer {
shared: Arc::clone(&self.shared),
})
}
}
impl<T> Writer<T> {
#[inline]
pub fn log(&self) -> AtomicLog<T> {
AtomicLog {
shared: Arc::clone(&self.shared),
}
}
#[inline]
pub fn retained_capacity(&self) -> usize {
self.shared.retained_capacity
}
#[inline]
pub fn segment_capacity(&self) -> usize {
self.shared.segment_capacity
}
pub fn append_batch(&mut self, values: impl IntoIterator<Item = T>) {
let mut iter = values.into_iter().peekable();
unsafe {
self.shared.writer_state.with_claimed_mut(|state| {
while iter.peek().is_some() {
if state.head.published_len() == self.shared.segment_capacity {
let next = Segment::new(
state.head.sequence + 1,
Arc::downgrade(&state.head),
self.shared.segment_capacity,
);
state.retained.push_back(Arc::clone(&next));
while state.retained.len() > state.retained_segments {
state.retained.pop_front();
}
state.head = Arc::clone(&next);
self.shared.head.store(next);
}
state.head.push_batch(&mut iter);
}
});
}
}
pub fn append(&mut self, value: T) {
unsafe {
self.shared.writer_state.with_claimed_mut(|state| {
if state.head.published_len() == self.shared.segment_capacity {
let next = Segment::new(
state.head.sequence + 1,
Arc::downgrade(&state.head),
self.shared.segment_capacity,
);
state.retained.push_back(Arc::clone(&next));
while state.retained.len() > state.retained_segments {
state.retained.pop_front();
}
state.head = Arc::clone(&next);
self.shared.head.store(next);
}
state.head.push(value);
});
}
}
}
impl<T> Drop for Writer<T> {
fn drop(&mut self) {
self.shared.writer_state.release();
}
}