1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5
6use crate::claim::Claimed;
7use crate::segment::Segment;
8use crate::snapshot::Snapshot;
9
10pub struct AtomicLog<T> {
16 pub(crate) shared: Arc<Shared<T>>,
17}
18
19pub struct Writer<T> {
24 pub(crate) shared: Arc<Shared<T>>,
25}
26
27pub(crate) struct Shared<T> {
28 pub(crate) retained_capacity: usize,
29 pub(crate) segment_capacity: usize,
30 pub(crate) head: ArcSwap<Segment<T>>,
31 writer_state: Claimed<WriterState<T>>,
32}
33
34struct WriterState<T> {
35 head: Arc<Segment<T>>,
36 retained: VecDeque<Arc<Segment<T>>>,
37 retained_segments: usize,
38}
39
40impl<T> Clone for AtomicLog<T> {
41 fn clone(&self) -> Self {
42 Self {
43 shared: Arc::clone(&self.shared),
44 }
45 }
46}
47
48impl<T> From<Writer<T>> for AtomicLog<T> {
49 fn from(writer: Writer<T>) -> Self {
50 writer.log()
51 }
52}
53
54impl<T> AtomicLog<T> {
55 pub fn new(retained_capacity: usize, segment_capacity: usize) -> Self {
67 assert!(retained_capacity > 0, "retained capacity must be non-zero");
68 assert!(segment_capacity > 0, "segment capacity must be non-zero");
69
70 let retained_segments = retained_capacity.div_ceil(segment_capacity) + 1;
71 let head = Segment::new(0, std::sync::Weak::new(), segment_capacity);
72 let mut retained = VecDeque::with_capacity(retained_segments);
73 retained.push_back(Arc::clone(&head));
74
75 let shared = Arc::new(Shared {
76 retained_capacity,
77 segment_capacity,
78 head: ArcSwap::from(Arc::clone(&head)),
79 writer_state: Claimed::new_unclaimed(WriterState {
80 head,
81 retained,
82 retained_segments,
83 }),
84 });
85
86 Self { shared }
87 }
88
89 #[inline]
91 pub fn new_claimed(retained_capacity: usize, segment_capacity: usize) -> (Writer<T>, Self) {
92 let log = Self::new(retained_capacity, segment_capacity);
93 let writer = log
94 .try_claim_writer()
95 .expect("freshly constructed log must allow claiming a writer");
96 (writer, log)
97 }
98
99 #[inline]
101 pub fn retained_capacity(&self) -> usize {
102 self.shared.retained_capacity
103 }
104
105 #[inline]
107 pub fn segment_capacity(&self) -> usize {
108 self.shared.segment_capacity
109 }
110
111 #[inline]
113 pub fn is_writer_claimed(&self) -> bool {
114 self.shared.writer_state.is_claimed()
115 }
116
117 #[inline]
123 pub fn snapshot(&self) -> Snapshot<T> {
124 Snapshot::new(Arc::clone(&self.shared))
125 }
126
127 #[inline]
132 pub fn try_claim_writer(&self) -> Option<Writer<T>> {
133 self.shared.writer_state.try_claim().then(|| Writer {
134 shared: Arc::clone(&self.shared),
135 })
136 }
137}
138
139impl<T> Writer<T> {
140 #[inline]
142 pub fn log(&self) -> AtomicLog<T> {
143 AtomicLog {
144 shared: Arc::clone(&self.shared),
145 }
146 }
147
148 #[inline]
150 pub fn retained_capacity(&self) -> usize {
151 self.shared.retained_capacity
152 }
153
154 #[inline]
156 pub fn segment_capacity(&self) -> usize {
157 self.shared.segment_capacity
158 }
159
160 pub fn append_batch(&mut self, values: impl IntoIterator<Item = T>) {
166 let mut iter = values.into_iter().peekable();
167 unsafe {
168 self.shared.writer_state.with_claimed_mut(|state| {
169 while iter.peek().is_some() {
170 if state.head.published_len() == self.shared.segment_capacity {
171 let next = Segment::new(
172 state.head.sequence + 1,
173 Arc::downgrade(&state.head),
174 self.shared.segment_capacity,
175 );
176 state.retained.push_back(Arc::clone(&next));
177 while state.retained.len() > state.retained_segments {
178 state.retained.pop_front();
179 }
180 state.head = Arc::clone(&next);
181 self.shared.head.store(next);
182 }
183
184 state.head.push_batch(&mut iter);
185 }
186 });
187 }
188 }
189
190 pub fn append(&mut self, value: T) {
195 unsafe {
196 self.shared.writer_state.with_claimed_mut(|state| {
197 if state.head.published_len() == self.shared.segment_capacity {
198 let next = Segment::new(
199 state.head.sequence + 1,
200 Arc::downgrade(&state.head),
201 self.shared.segment_capacity,
202 );
203 state.retained.push_back(Arc::clone(&next));
204 while state.retained.len() > state.retained_segments {
205 state.retained.pop_front();
206 }
207 state.head = Arc::clone(&next);
208 self.shared.head.store(next);
209 }
210
211 state.head.push(value);
212 });
213 }
214 }
215}
216
217impl<T> Drop for Writer<T> {
218 fn drop(&mut self) {
219 self.shared.writer_state.release();
220 }
221}