1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5
6use crate::segment::Segment;
7use crate::snapshot::Snapshot;
8
9pub struct AtomicLog<T> {
15 pub(crate) shared: Arc<Shared<T>>,
16}
17
18pub struct Writer<T> {
23 pub(crate) shared: Arc<Shared<T>>,
24 pub(crate) state: WriterState<T>,
25}
26
27pub(crate) struct Shared<T> {
28 pub(crate) retained_capacity: usize,
29 pub(crate) segment_capacity: usize,
30 pub(crate) head: ArcSwap<Segment<T>>,
31}
32
33pub(crate) struct WriterState<T> {
34 pub(crate) head: Arc<Segment<T>>,
35 pub(crate) retained: VecDeque<Arc<Segment<T>>>,
36 pub(crate) retained_segments: usize,
37}
38
39impl<T> Clone for AtomicLog<T> {
40 fn clone(&self) -> Self {
41 Self {
42 shared: Arc::clone(&self.shared),
43 }
44 }
45}
46
47impl<T> From<Writer<T>> for AtomicLog<T> {
48 fn from(writer: Writer<T>) -> Self {
49 writer.log()
50 }
51}
52
53impl<T> AtomicLog<T> {
54 pub fn new(retained_capacity: usize, segment_capacity: usize) -> (Writer<T>, Self) {
66 assert!(retained_capacity > 0, "retained capacity must be non-zero");
67 assert!(segment_capacity > 0, "segment capacity must be non-zero");
68
69 let retained_segments = retained_capacity.div_ceil(segment_capacity) + 1;
70 let head = Segment::new(0, std::sync::Weak::new(), segment_capacity);
71 let mut retained = VecDeque::with_capacity(retained_segments);
72 retained.push_back(Arc::clone(&head));
73
74 let shared = Arc::new(Shared {
75 retained_capacity,
76 segment_capacity,
77 head: ArcSwap::from(Arc::clone(&head)),
78 });
79
80 let writer = Writer {
81 shared: Arc::clone(&shared),
82 state: WriterState {
83 head,
84 retained,
85 retained_segments,
86 },
87 };
88 let log = Self { shared };
89
90 (writer, log)
91 }
92
93 #[inline]
95 pub fn retained_capacity(&self) -> usize {
96 self.shared.retained_capacity
97 }
98
99 #[inline]
101 pub fn segment_capacity(&self) -> usize {
102 self.shared.segment_capacity
103 }
104
105 #[inline]
111 pub fn snapshot(&self) -> Snapshot<T> {
112 Snapshot::new(Arc::clone(&self.shared))
113 }
114}
115
116impl<T> Writer<T> {
117 #[inline]
119 pub fn log(&self) -> AtomicLog<T> {
120 AtomicLog {
121 shared: Arc::clone(&self.shared),
122 }
123 }
124
125 #[inline]
127 pub fn retained_capacity(&self) -> usize {
128 self.shared.retained_capacity
129 }
130
131 #[inline]
133 pub fn segment_capacity(&self) -> usize {
134 self.shared.segment_capacity
135 }
136
137 pub fn append(&mut self, value: T) {
142 if self.state.head.published_len() == self.shared.segment_capacity {
143 let next = Segment::new(
144 self.state.head.sequence + 1,
145 Arc::downgrade(&self.state.head),
146 self.shared.segment_capacity,
147 );
148 self.state.head = Arc::clone(&next);
149 self.state.retained.push_back(Arc::clone(&next));
150 while self.state.retained.len() > self.state.retained_segments {
151 self.state.retained.pop_front();
152 }
153 self.shared.head.store(next);
154 }
155
156 self.state.head.push(value);
157 }
158}