Skip to main content

atomic_log/
log.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5
6use crate::claim::Claimed;
7use crate::segment::Segment;
8use crate::snapshot::Snapshot;
9
10/// Shared read handle for a segmented rolling log.
11///
12/// `AtomicLog<T>` is cheaply clonable and can be sent to many reader threads. Readers use
13/// [`snapshot`](Self::snapshot) to capture a stable, zero-copy view of the currently
14/// retained data.
15pub struct AtomicLog<T> {
16    pub(crate) shared: Arc<Shared<T>>,
17}
18
19/// Single-writer append handle for an [`AtomicLog`].
20///
21/// The core design assumes exactly one active writer. If a caller wants to share write
22/// access across threads, it must add its own external synchronization.
23pub struct Writer<T> {
24    pub(crate) shared: Arc<Shared<T>>,
25}
26
27pub(crate) struct Shared<T> {
28    pub(crate) retained_capacity: usize,
29    pub(crate) segment_capacity: usize,
30    pub(crate) head: ArcSwap<Segment<T>>,
31    writer_state: Claimed<WriterState<T>>,
32}
33
34struct WriterState<T> {
35    head: Arc<Segment<T>>,
36    retained: VecDeque<Arc<Segment<T>>>,
37    retained_segments: usize,
38}
39
40impl<T> Clone for AtomicLog<T> {
41    fn clone(&self) -> Self {
42        Self {
43            shared: Arc::clone(&self.shared),
44        }
45    }
46}
47
48impl<T> From<Writer<T>> for AtomicLog<T> {
49    fn from(writer: Writer<T>) -> Self {
50        writer.log()
51    }
52}
53
54impl<T> AtomicLog<T> {
55    /// Creates a new log without claiming a writer.
56    ///
57    /// `retained_capacity` is the target logical retention size in elements. The current
58    /// implementation retains whole segments, so the observed window may exceed this value.
59    ///
60    /// `segment_capacity` is the number of elements stored in each segment and remains fixed
61    /// for the lifetime of the log.
62    ///
63    /// # Panics
64    ///
65    /// Panics if either capacity is zero.
66    pub fn new(retained_capacity: usize, segment_capacity: usize) -> Self {
67        assert!(retained_capacity > 0, "retained capacity must be non-zero");
68        assert!(segment_capacity > 0, "segment capacity must be non-zero");
69
70        let retained_segments = retained_capacity.div_ceil(segment_capacity) + 1;
71        let head = Segment::new(0, std::sync::Weak::new(), segment_capacity);
72        let mut retained = VecDeque::with_capacity(retained_segments);
73        retained.push_back(Arc::clone(&head));
74
75        let shared = Arc::new(Shared {
76            retained_capacity,
77            segment_capacity,
78            head: ArcSwap::from(Arc::clone(&head)),
79            writer_state: Claimed::new_unclaimed(WriterState {
80                head,
81                retained,
82                retained_segments,
83            }),
84        });
85
86        Self { shared }
87    }
88
89    /// Creates a new log and immediately claims its writer.
90    #[inline]
91    pub fn new_claimed(retained_capacity: usize, segment_capacity: usize) -> (Writer<T>, Self) {
92        let log = Self::new(retained_capacity, segment_capacity);
93        let writer = log
94            .try_claim_writer()
95            .expect("freshly constructed log must allow claiming a writer");
96        (writer, log)
97    }
98
99    /// Returns the configured logical retained capacity, in elements.
100    #[inline]
101    pub fn retained_capacity(&self) -> usize {
102        self.shared.retained_capacity
103    }
104
105    /// Returns the fixed segment size, in elements.
106    #[inline]
107    pub fn segment_capacity(&self) -> usize {
108        self.shared.segment_capacity
109    }
110
111    /// Returns `true` if this log currently has a claimed writer.
112    #[inline]
113    pub fn is_writer_claimed(&self) -> bool {
114        self.shared.writer_state.is_claimed()
115    }
116
117    /// Captures a stable snapshot of the currently retained data.
118    ///
119    /// The snapshot borrows no locks and keeps its backing segments alive through `Arc`
120    /// ownership, so readers can iterate over the result without copying values out of the
121    /// log.
122    #[inline]
123    pub fn snapshot(&self) -> Snapshot<T> {
124        Snapshot::new(Arc::clone(&self.shared))
125    }
126
127    /// Attempts to claim exclusive write access to this log.
128    ///
129    /// Returns `None` if another [`Writer`] currently exists. Dropping the returned writer
130    /// releases the claim without discarding the log's retained segment state.
131    #[inline]
132    pub fn try_claim_writer(&self) -> Option<Writer<T>> {
133        self.shared.writer_state.try_claim().then(|| Writer {
134            shared: Arc::clone(&self.shared),
135        })
136    }
137}
138
139impl<T> Writer<T> {
140    /// Returns a clonable read handle for this writer's log.
141    #[inline]
142    pub fn log(&self) -> AtomicLog<T> {
143        AtomicLog {
144            shared: Arc::clone(&self.shared),
145        }
146    }
147
148    /// Returns the configured logical retained capacity, in elements.
149    #[inline]
150    pub fn retained_capacity(&self) -> usize {
151        self.shared.retained_capacity
152    }
153
154    /// Returns the fixed segment size, in elements.
155    #[inline]
156    pub fn segment_capacity(&self) -> usize {
157        self.shared.segment_capacity
158    }
159
160    /// Appends multiple values to the log.
161    ///
162    /// Values within the same segment are published with a single atomic store rather
163    /// than one per element. Segment rolls are performed as needed, the same as
164    /// [`append`](Self::append).
165    pub fn append_batch(&mut self, values: impl IntoIterator<Item = T>) {
166        let mut iter = values.into_iter().peekable();
167        unsafe {
168            self.shared.writer_state.with_claimed_mut(|state| {
169                while iter.peek().is_some() {
170                    if state.head.published_len() == self.shared.segment_capacity {
171                        let next = Segment::new(
172                            state.head.sequence + 1,
173                            Arc::downgrade(&state.head),
174                            self.shared.segment_capacity,
175                        );
176                        state.retained.push_back(Arc::clone(&next));
177                        while state.retained.len() > state.retained_segments {
178                            state.retained.pop_front();
179                        }
180                        state.head = Arc::clone(&next);
181                        self.shared.head.store(next);
182                    }
183
184                    state.head.push_batch(&mut iter);
185                }
186            });
187        }
188    }
189
190    /// Appends one value to the log and publishes it for readers.
191    ///
192    /// Values are written into the current head segment. If that segment is full, the writer
193    /// allocates a new head segment, publishes it, and continues there.
194    pub fn append(&mut self, value: T) {
195        unsafe {
196            self.shared.writer_state.with_claimed_mut(|state| {
197                if state.head.published_len() == self.shared.segment_capacity {
198                    let next = Segment::new(
199                        state.head.sequence + 1,
200                        Arc::downgrade(&state.head),
201                        self.shared.segment_capacity,
202                    );
203                    state.retained.push_back(Arc::clone(&next));
204                    while state.retained.len() > state.retained_segments {
205                        state.retained.pop_front();
206                    }
207                    state.head = Arc::clone(&next);
208                    self.shared.head.store(next);
209                }
210
211                state.head.push(value);
212            });
213        }
214    }
215}
216
217impl<T> Drop for Writer<T> {
218    fn drop(&mut self) {
219        self.shared.writer_state.release();
220    }
221}