Skip to main content

atomic_log/
lib.rs

1//! A segmented, append-only, zero-copy rolling log for in-process fan-out.
2//!
3//! `atomic-log` provides [`AtomicLog`] and [`Writer`], a low-coordination primitive for
4//! publishing values from one producer to many readers. The intended use case is
5//! state-oriented streaming inside a process: readers care primarily about a recent,
6//! stable view of published data, not guaranteed delivery of every historical update.
7//!
8//! The log is split into fixed-capacity segments. The writer appends values into the
9//! current head segment, publishes them with atomics, and rolls to a new segment when
10//! the head fills. Readers take [`Snapshot`]s, which hold `Arc`s to the backing segments
11//! and expose zero-copy access through flat iteration or per-segment chunk iteration.
12//!
13//! # What This Crate Optimizes For
14//!
15//! - Single-writer, many-reader fan-out
16//! - Atomics-only publication and observation on the core path
17//! - Stable snapshots that do not block the writer
18//! - Zero-copy reads of immutable published values
19//! - Bounded retention through automatic segment reclamation
20//!
21//! # What It Does Not Provide
22//!
23//! - Multi-writer coordination
24//! - Delivery guarantees for every historical value
25//! - Backpressure from readers to the writer
26//! - Persistence or durability
27//! - Exactly-once or must-not-miss event delivery
28//!
29//! If every update matters, use a channel, queue, or durable log instead.
30//!
31//! # Snapshot Semantics
32//!
33//! A [`Snapshot`] is a stable captured view of the currently retained prefix reachable
34//! from the current head at the moment the snapshot is built or refreshed.
35//!
36//! - Readers only observe fully published values.
37//! - Published values are immutable after publication.
38//! - Holding a snapshot keeps its backing segments alive.
39//! - Refresh replaces the snapshot contents with a newer captured view.
40//! - Slow readers may lose continuity across refreshes if older segments have already
41//!   been reclaimed.
42//!
43//! The important distinction is that a single snapshot is internally stable, while
44//! continuity across time is best-effort.
45//!
46//! # Retention Model
47//!
48//! The constructor takes a logical `retained_capacity` and a fixed `segment_capacity`.
49//! The current implementation retains whole segments, so the live window is rounded to
50//! segment boundaries rather than truncated element-by-element. In practice that means
51//! the visible retained history can exceed `retained_capacity` by up to roughly one
52//! extra segment of historical data plus the current head segment.
53//!
54//! # Example
55//!
56//! ```
57//! use atomic_log::AtomicLog;
58//!
59//! let (mut writer, log) = AtomicLog::new(8, 4);
60//!
61//! for value in 0..6 {
62//!     writer.append(value);
63//! }
64//!
65//! let mut snapshot = log.snapshot();
66//! let initial: Vec<_> = snapshot.iter().copied().collect();
67//! assert_eq!(initial, vec![0, 1, 2, 3, 4, 5]);
68//!
69//! writer.append(6);
70//! writer.append(7);
71//! snapshot.refresh();
72//!
73//! let refreshed: Vec<_> = snapshot.iter().copied().collect();
74//! assert_eq!(refreshed, vec![0, 1, 2, 3, 4, 5, 6, 7]);
75//! ```
76//!
77//! # Reading Patterns
78//!
79//! [`Snapshot::iter`] yields a flat `&T` stream across the captured segments.
80//! [`Snapshot::chunks`] yields [`SegmentSlice`] values for consumers that care about
81//! segment-local slices or segment sequence numbers.
82mod log;
83mod segment;
84mod snapshot;
85
86pub use log::{AtomicLog, Writer};
87pub use snapshot::{Chunks, Iter, SegmentSlice, Snapshot};
88
89#[cfg(test)]
90mod tests {
91    use crate::Snapshot;
92    use crate::log::AtomicLog;
93    use std::sync::Arc;
94    use std::sync::atomic::{AtomicUsize, Ordering};
95    use std::thread;
96
97    #[test]
98    fn empty_snapshot_is_empty() {
99        let (_writer, log) = AtomicLog::<usize>::new(4, 2);
100
101        let snapshot = log.snapshot();
102
103        assert!(snapshot.is_empty());
104        assert_eq!(snapshot.len(), 0);
105        assert_eq!(snapshot.iter().count(), 0);
106    }
107
108    #[test]
109    fn snapshot_returns_full_retained_view() {
110        let (mut writer, log) = AtomicLog::new(5, 2);
111
112        for value in 0..8 {
113            writer.append(value);
114        }
115
116        let snapshot = log.snapshot();
117        let values: Vec<_> = snapshot.iter().copied().collect();
118
119        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
120    }
121
122    #[test]
123    fn snapshot_captures_full_retained_view() {
124        let (mut writer, log) = AtomicLog::new(8, 3);
125
126        for value in 0..7 {
127            writer.append(value);
128        }
129
130        let snapshot = log.snapshot();
131        let values: Vec<_> = snapshot.iter().copied().collect();
132
133        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6]);
134    }
135
136    #[test]
137    fn chunk_iteration_exposes_segment_sequences() {
138        let (mut writer, log) = AtomicLog::new(6, 2);
139
140        for value in 0..5 {
141            writer.append(value);
142        }
143
144        let chunks: Vec<_> = log
145            .snapshot()
146            .chunks()
147            .map(|chunk| (chunk.sequence(), chunk.values().to_vec()))
148            .collect();
149
150        assert_eq!(chunks, vec![(0, vec![0, 1]), (1, vec![2, 3]), (2, vec![4])]);
151    }
152
153    #[test]
154    fn held_snapshot_remains_stable_after_reclamation() {
155        let (mut writer, log) = AtomicLog::new(3, 1);
156        for value in 0..3 {
157            writer.append(value);
158        }
159        let snapshot = log.snapshot();
160
161        for value in 3..20 {
162            writer.append(value);
163        }
164
165        let old_values: Vec<_> = snapshot.iter().copied().collect();
166        let fresh_values: Vec<_> = log.snapshot().iter().copied().collect();
167
168        assert_eq!(old_values, vec![0, 1, 2]);
169        assert_eq!(fresh_values, vec![16, 17, 18, 19]);
170    }
171
172    #[test]
173    fn refresh_replaces_snapshot_with_latest_view() {
174        let (mut writer, log) = AtomicLog::new(4, 2);
175        for value in 0..4 {
176            writer.append(value);
177        }
178        let mut snapshot = log.snapshot();
179
180        for value in 4..9 {
181            writer.append(value);
182        }
183        snapshot.refresh();
184
185        let values: Vec<_> = snapshot.iter().copied().collect();
186        assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
187    }
188
189    #[test]
190    fn snapshot_refresh_extends_same_head_without_rebuild() {
191        let (mut writer, log) = AtomicLog::new(4, 8);
192        writer.append(0);
193        writer.append(1);
194        let mut snapshot = log.snapshot();
195
196        writer.append(2);
197        writer.append(3);
198        snapshot.refresh();
199
200        let values: Vec<_> = snapshot.iter().copied().collect();
201        assert_eq!(values, vec![0, 1, 2, 3]);
202        assert_eq!(snapshot.chunks().count(), 1);
203    }
204
205    #[test]
206    fn snapshot_refresh_appends_new_segments_when_continuous() {
207        let (mut writer, log) = AtomicLog::new(5, 2);
208        for value in 0..3 {
209            writer.append(value);
210        }
211        let mut snapshot = log.snapshot();
212
213        for value in 3..6 {
214            writer.append(value);
215        }
216        snapshot.refresh();
217
218        let values: Vec<_> = snapshot.iter().copied().collect();
219        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
220        assert_eq!(snapshot.chunks().count(), 3);
221    }
222
223    #[test]
224    fn drops_only_initialized_values() {
225        static DROPS: AtomicUsize = AtomicUsize::new(0);
226
227        struct CountDrop;
228        impl Drop for CountDrop {
229            fn drop(&mut self) {
230                DROPS.fetch_add(1, Ordering::Relaxed);
231            }
232        }
233
234        {
235            let (mut writer, _log) = AtomicLog::new(10, 8);
236            for _ in 0..3 {
237                writer.append(CountDrop);
238            }
239        }
240
241        assert_eq!(DROPS.load(Ordering::Relaxed), 3);
242    }
243
244    #[test]
245    fn many_readers_can_snapshot_while_writer_appends() {
246        let (mut writer, log) = AtomicLog::new(64, 8);
247        let log = Arc::new(log);
248        let stop = Arc::new(AtomicUsize::new(0));
249        let mut readers = Vec::new();
250
251        for _ in 0..4 {
252            let log = Arc::clone(&log);
253            let stop = Arc::clone(&stop);
254            readers.push(thread::spawn(move || {
255                while stop.load(Ordering::Acquire) == 0 {
256                    let values: Vec<_> = log.snapshot().iter().copied().collect();
257                    assert!(values.windows(2).all(|pair| pair[0] + 1 == pair[1]));
258                }
259            }));
260        }
261
262        for value in 0..1000 {
263            writer.append(value);
264        }
265        stop.store(1, Ordering::Release);
266
267        for reader in readers {
268            reader.join().unwrap();
269        }
270    }
271
272    #[test]
273    fn writer_can_be_shared_through_a_lock_when_requested() {
274        let (writer, log) = AtomicLog::new(8, 2);
275        let writer = std::sync::Arc::new(std::sync::Mutex::new(writer));
276
277        let first = {
278            let writer = std::sync::Arc::clone(&writer);
279            thread::spawn(move || writer.lock().unwrap().append(1))
280        };
281        let second = {
282            let writer = std::sync::Arc::clone(&writer);
283            thread::spawn(move || writer.lock().unwrap().append(2))
284        };
285
286        first.join().unwrap();
287        second.join().unwrap();
288
289        let values: Vec<_> = log.snapshot().iter().copied().collect();
290        assert_eq!(values.len(), 2);
291        assert!(values.contains(&1));
292        assert!(values.contains(&2));
293    }
294
295    #[test]
296    fn log_snapshot_and_writer_conversions_round_trip() {
297        let (mut writer, log) = AtomicLog::new(8, 2);
298        for value in 0..5 {
299            writer.append(value);
300        }
301
302        let log_from_writer = writer.log();
303        let snapshot = Snapshot::from(log_from_writer.clone());
304        let log_from_snapshot = AtomicLog::from(snapshot);
305
306        let values: Vec<_> = log_from_snapshot.snapshot().iter().copied().collect();
307        assert_eq!(values, vec![0, 1, 2, 3, 4]);
308
309        let snapshot = log.snapshot();
310        let cloned_log = snapshot.log();
311        let values: Vec<_> = cloned_log.snapshot().iter().copied().collect();
312        assert_eq!(values, vec![0, 1, 2, 3, 4]);
313    }
314}