Skip to main content

atomic_log/
lib.rs

1//! A segmented, append-only, zero-copy rolling log for in-process fan-out.
2//!
3//! `atomic-log` provides [`AtomicLog`] and [`Writer`], a low-coordination primitive for
4//! publishing values from one producer to many readers. The intended use case is
5//! state-oriented streaming inside a process: readers care primarily about a recent,
6//! stable view of published data, not guaranteed delivery of every historical update.
7//!
8//! The log is split into fixed-capacity segments. The writer appends values into the
9//! current head segment, publishes them with atomics, and rolls to a new segment when
10//! the head fills. Readers take [`Snapshot`]s, which hold `Arc`s to the backing segments
11//! and expose zero-copy access through flat iteration or per-segment chunk iteration.
12//!
13//! # What This Crate Optimizes For
14//!
15//! - Single-writer, many-reader fan-out
16//! - Low-coordination publication and atomics-only observation on the read path
17//! - Stable snapshots that do not block the writer
18//! - Zero-copy reads of immutable published values
19//! - Bounded retention through automatic segment reclamation
20//! - Reclaimable write access while the log owns retained history
21//!
22//! # What It Does Not Provide
23//!
24//! - Multi-writer coordination
25//! - Delivery guarantees for every historical value
26//! - Backpressure from readers to the writer
27//! - Persistence or durability
28//! - Exactly-once or must-not-miss event delivery
29//!
30//! If every update matters, use a channel, queue, or durable log instead.
31//!
32//! # Snapshot Semantics
33//!
34//! A [`Snapshot`] is a stable captured view of the currently retained prefix reachable
35//! from the current head at the moment the snapshot is built or refreshed.
36//!
37//! - Readers only observe fully published values.
38//! - Published values are immutable after publication.
39//! - Holding a snapshot keeps its backing segments alive.
40//! - Refresh replaces the snapshot contents with a newer captured view.
41//! - Slow readers may lose continuity across refreshes if older segments have already
42//!   been reclaimed.
43//! - Dropping a writer does not discard the log's retained segments.
44//!
45//! The important distinction is that a single snapshot is internally stable, while
46//! continuity across time is best-effort.
47//!
48//! # Retention Model
49//!
50//! The constructor takes a logical `retained_capacity` and a fixed `segment_capacity`.
51//! The current implementation retains whole segments, so the live window is rounded to
52//! segment boundaries rather than truncated element-by-element. In practice that means
53//! the visible retained history can exceed `retained_capacity` by up to roughly one
54//! extra segment of historical data plus the current head segment.
55//!
56//! # Example
57//!
58//! ```
59//! use atomic_log::AtomicLog;
60//!
61//! let (mut writer, log) = AtomicLog::new_claimed(8, 4);
62//!
63//! for value in 0..6 {
64//!     writer.append(value);
65//! }
66//!
67//! let mut snapshot = log.snapshot();
68//! let initial: Vec<_> = snapshot.iter().copied().collect();
69//! assert_eq!(initial, vec![0, 1, 2, 3, 4, 5]);
70//!
71//! writer.append(6);
72//! writer.append(7);
73//! snapshot.refresh();
74//!
75//! let refreshed: Vec<_> = snapshot.iter().copied().collect();
76//! assert_eq!(refreshed, vec![0, 1, 2, 3, 4, 5, 6, 7]);
77//! ```
78//!
79//! # Reading Patterns
80//!
81//! [`Snapshot::iter`] yields a flat `&T` stream across the captured segments.
82//! [`Snapshot::chunks`] yields [`SegmentSlice`] values for consumers that care about
83//! segment-local slices or segment sequence numbers.
84//! [`AtomicLog::try_claim_writer`] recreates a writer after the previous writer has
85//! been dropped.
86mod claim;
87mod log;
88mod segment;
89mod snapshot;
90
91pub use log::{AtomicLog, Writer};
92pub use snapshot::{Chunks, Iter, SegmentSlice, Snapshot};
93
94#[cfg(test)]
95mod tests {
96    use crate::Snapshot;
97    use crate::log::AtomicLog;
98    use std::sync::Arc;
99    use std::sync::atomic::{AtomicUsize, Ordering};
100    use std::thread;
101
102    #[test]
103    fn empty_snapshot_is_empty() {
104        let log = AtomicLog::<usize>::new(4, 2);
105
106        assert!(!log.is_writer_claimed());
107
108        let snapshot = log.snapshot();
109
110        assert!(snapshot.is_empty());
111        assert_eq!(snapshot.len(), 0);
112        assert_eq!(snapshot.iter().count(), 0);
113    }
114
115    #[test]
116    fn new_starts_without_claimed_writer() {
117        let log = AtomicLog::<usize>::new(8, 2);
118
119        assert!(!log.is_writer_claimed());
120        let writer = log.try_claim_writer();
121        assert!(writer.is_some());
122        assert!(log.is_writer_claimed());
123    }
124
125    #[test]
126    fn snapshot_returns_full_retained_view() {
127        let (mut writer, log) = AtomicLog::new_claimed(5, 2);
128
129        for value in 0..8 {
130            writer.append(value);
131        }
132
133        let snapshot = log.snapshot();
134        let values: Vec<_> = snapshot.iter().copied().collect();
135
136        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
137    }
138
139    #[test]
140    fn snapshot_captures_full_retained_view() {
141        let (mut writer, log) = AtomicLog::new_claimed(8, 3);
142
143        for value in 0..7 {
144            writer.append(value);
145        }
146
147        let snapshot = log.snapshot();
148        let values: Vec<_> = snapshot.iter().copied().collect();
149
150        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6]);
151    }
152
153    #[test]
154    fn chunk_iteration_exposes_segment_sequences() {
155        let (mut writer, log) = AtomicLog::new_claimed(6, 2);
156
157        for value in 0..5 {
158            writer.append(value);
159        }
160
161        let chunks: Vec<_> = log
162            .snapshot()
163            .chunks()
164            .map(|chunk| (chunk.sequence(), chunk.values().to_vec()))
165            .collect();
166
167        assert_eq!(chunks, vec![(0, vec![0, 1]), (1, vec![2, 3]), (2, vec![4])]);
168    }
169
170    #[test]
171    fn held_snapshot_remains_stable_after_reclamation() {
172        let (mut writer, log) = AtomicLog::new_claimed(3, 1);
173        for value in 0..3 {
174            writer.append(value);
175        }
176        let snapshot = log.snapshot();
177
178        for value in 3..20 {
179            writer.append(value);
180        }
181
182        let old_values: Vec<_> = snapshot.iter().copied().collect();
183        let fresh_values: Vec<_> = log.snapshot().iter().copied().collect();
184
185        assert_eq!(old_values, vec![0, 1, 2]);
186        assert_eq!(fresh_values, vec![16, 17, 18, 19]);
187    }
188
189    #[test]
190    fn refresh_replaces_snapshot_with_latest_view() {
191        let (mut writer, log) = AtomicLog::new_claimed(4, 2);
192        for value in 0..4 {
193            writer.append(value);
194        }
195        let mut snapshot = log.snapshot();
196
197        for value in 4..9 {
198            writer.append(value);
199        }
200        snapshot.refresh();
201
202        let values: Vec<_> = snapshot.iter().copied().collect();
203        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
204    }
205
206    #[test]
207    fn snapshot_refresh_extends_same_head_without_rebuild() {
208        let (mut writer, log) = AtomicLog::new_claimed(4, 8);
209        writer.append(0);
210        writer.append(1);
211        let mut snapshot = log.snapshot();
212
213        writer.append(2);
214        writer.append(3);
215        snapshot.refresh();
216
217        let values: Vec<_> = snapshot.iter().copied().collect();
218        assert_eq!(values, vec![0, 1, 2, 3]);
219        assert_eq!(snapshot.chunks().count(), 1);
220    }
221
222    #[test]
223    fn snapshot_refresh_appends_new_segments_when_continuous() {
224        let (mut writer, log) = AtomicLog::new_claimed(5, 2);
225        for value in 0..3 {
226            writer.append(value);
227        }
228        let mut snapshot = log.snapshot();
229
230        for value in 3..6 {
231            writer.append(value);
232        }
233        snapshot.refresh();
234
235        let values: Vec<_> = snapshot.iter().copied().collect();
236        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
237        assert_eq!(snapshot.chunks().count(), 3);
238    }
239
240    #[test]
241    fn writer_drop_preserves_retained_segments_for_refresh() {
242        let (mut writer, log) = AtomicLog::new_claimed(8, 2);
243        for value in 0..3 {
244            writer.append(value);
245        }
246        let mut snapshot = log.snapshot();
247
248        for value in 3..8 {
249            writer.append(value);
250        }
251        drop(writer);
252
253        snapshot.refresh();
254
255        let values: Vec<_> = snapshot.iter().copied().collect();
256        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
257    }
258
259    #[test]
260    fn writer_can_be_reclaimed_after_drop() {
261        let (mut writer, log) = AtomicLog::new_claimed(8, 2);
262        writer.append(1);
263        assert!(log.is_writer_claimed());
264        drop(writer);
265        assert!(!log.is_writer_claimed());
266
267        let mut writer = log
268            .try_claim_writer()
269            .expect("writer claim should be released");
270        assert!(log.is_writer_claimed());
271        writer.append(2);
272
273        let values: Vec<_> = log.snapshot().iter().copied().collect();
274        assert_eq!(values, vec![1, 2]);
275    }
276
277    #[test]
278    fn writer_cannot_be_reclaimed_while_existing_writer_lives() {
279        let (_writer, log) = AtomicLog::<usize>::new_claimed(8, 2);
280
281        assert!(log.is_writer_claimed());
282        assert!(log.try_claim_writer().is_none());
283    }
284
285    #[test]
286    fn drops_only_initialized_values() {
287        static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289        struct CountDrop;
290        impl Drop for CountDrop {
291            fn drop(&mut self) {
292                DROPS.fetch_add(1, Ordering::Relaxed);
293            }
294        }
295
296        {
297            let (mut writer, _log) = AtomicLog::new_claimed(10, 8);
298            for _ in 0..3 {
299                writer.append(CountDrop);
300            }
301        }
302
303        assert_eq!(DROPS.load(Ordering::Relaxed), 3);
304    }
305
306    #[test]
307    fn many_readers_can_snapshot_while_writer_appends() {
308        let (mut writer, log) = AtomicLog::new_claimed(64, 8);
309        let log = Arc::new(log);
310        let stop = Arc::new(AtomicUsize::new(0));
311        let mut readers = Vec::new();
312
313        for _ in 0..4 {
314            let log = Arc::clone(&log);
315            let stop = Arc::clone(&stop);
316            readers.push(thread::spawn(move || {
317                while stop.load(Ordering::Acquire) == 0 {
318                    let values: Vec<_> = log.snapshot().iter().copied().collect();
319                    assert!(values.windows(2).all(|pair| pair[0] + 1 == pair[1]));
320                }
321            }));
322        }
323
324        for value in 0..1000 {
325            writer.append(value);
326        }
327        stop.store(1, Ordering::Release);
328
329        for reader in readers {
330            reader.join().unwrap();
331        }
332    }
333
334    #[test]
335    fn writer_can_be_shared_through_a_lock_when_requested() {
336        let (writer, log) = AtomicLog::new_claimed(8, 2);
337        let writer = std::sync::Arc::new(std::sync::Mutex::new(writer));
338
339        let first = {
340            let writer = std::sync::Arc::clone(&writer);
341            thread::spawn(move || writer.lock().unwrap().append(1))
342        };
343        let second = {
344            let writer = std::sync::Arc::clone(&writer);
345            thread::spawn(move || writer.lock().unwrap().append(2))
346        };
347
348        first.join().unwrap();
349        second.join().unwrap();
350
351        let values: Vec<_> = log.snapshot().iter().copied().collect();
352        assert_eq!(values.len(), 2);
353        assert!(values.contains(&1));
354        assert!(values.contains(&2));
355    }
356
357    #[test]
358    fn append_batch_produces_same_result_as_sequential_append() {
359        let (mut w_seq, log_seq) = AtomicLog::new_claimed(8, 4);
360        for v in 0..7 {
361            w_seq.append(v);
362        }
363
364        let (mut w_batch, log_batch) = AtomicLog::new_claimed(8, 4);
365        w_batch.append_batch(0..7);
366
367        let seq: Vec<_> = log_seq.snapshot().iter().copied().collect();
368        let batch: Vec<_> = log_batch.snapshot().iter().copied().collect();
369        assert_eq!(seq, batch);
370    }
371
372    #[test]
373    fn append_batch_empty_iterator_is_a_no_op() {
374        let (mut writer, log) = AtomicLog::new_claimed(8, 4);
375        writer.append(1);
376        writer.append_batch(std::iter::empty::<i32>());
377
378        let values: Vec<_> = log.snapshot().iter().copied().collect();
379        assert_eq!(values, vec![1]);
380    }
381
382    #[test]
383    fn append_batch_empty_iterator_on_full_head_does_not_allocate_extra_segment() {
384        // Fill the log exactly to one full segment, then batch-append nothing.
385        // If we incorrectly rolled before checking the iterator, we'd see a second
386        // (empty) segment in the snapshot chunks.
387        let (mut writer, log) = AtomicLog::new_claimed(4, 4);
388        writer.append_batch(0..4);
389        writer.append_batch(std::iter::empty::<i32>());
390
391        let snapshot = log.snapshot();
392        let chunks: Vec<_> = snapshot.chunks().collect();
393        assert_eq!(
394            chunks.len(),
395            1,
396            "expected exactly one segment, got {}",
397            chunks.len()
398        );
399        assert_eq!(chunks[0].values(), &[0, 1, 2, 3]);
400    }
401
402    #[test]
403    fn append_batch_spanning_segment_boundary() {
404        // segment_capacity = 3; batch of 7 should produce three segments: [0,1,2], [3,4,5], [6]
405        let (mut writer, log) = AtomicLog::new_claimed(12, 3);
406        writer.append_batch(0..7);
407
408        let chunks: Vec<_> = log
409            .snapshot()
410            .chunks()
411            .map(|c| (c.sequence(), c.values().to_vec()))
412            .collect();
413
414        assert_eq!(
415            chunks,
416            vec![(0, vec![0, 1, 2]), (1, vec![3, 4, 5]), (2, vec![6])]
417        );
418    }
419
420    #[test]
421    fn append_batch_exactly_fills_current_segment_without_spurious_roll() {
422        let (mut writer, log) = AtomicLog::new_claimed(8, 4);
423        writer.append(0); // head is now 1/4 full
424        writer.append_batch(1..4); // fills it exactly to 4/4
425
426        let snapshot = log.snapshot();
427        let chunks: Vec<_> = snapshot.chunks().collect();
428        assert_eq!(chunks.len(), 1);
429        assert_eq!(chunks[0].values(), &[0, 1, 2, 3]);
430    }
431
432    #[test]
433    fn append_batch_interleaves_correctly_with_append() {
434        let (mut writer, log) = AtomicLog::new_claimed(16, 4);
435        writer.append_batch(0..3);
436        writer.append(3);
437        writer.append_batch(4..8);
438        writer.append(8);
439
440        let values: Vec<_> = log.snapshot().iter().copied().collect();
441        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
442    }
443
444    #[test]
445    fn log_snapshot_and_writer_conversions_round_trip() {
446        let (mut writer, log) = AtomicLog::new_claimed(8, 2);
447        for value in 0..5 {
448            writer.append(value);
449        }
450
451        let log_from_writer = writer.log();
452        let snapshot = Snapshot::from(log_from_writer.clone());
453        let log_from_snapshot = AtomicLog::from(snapshot);
454
455        let values: Vec<_> = log_from_snapshot.snapshot().iter().copied().collect();
456        assert_eq!(values, vec![0, 1, 2, 3, 4]);
457
458        let snapshot = log.snapshot();
459        let cloned_log = snapshot.log();
460        let values: Vec<_> = cloned_log.snapshot().iter().copied().collect();
461        assert_eq!(values, vec![0, 1, 2, 3, 4]);
462    }
463}