Skip to main content

atomic_log/
log.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5
6use crate::segment::Segment;
7use crate::snapshot::Snapshot;
8
9/// Shared read handle for a segmented rolling log.
10///
11/// `AtomicLog<T>` is cheaply clonable and can be sent to many reader threads. Readers use
12/// [`snapshot`](Self::snapshot) to capture a stable, zero-copy view of the currently
13/// retained data.
14pub struct AtomicLog<T> {
15    pub(crate) shared: Arc<Shared<T>>,
16}
17
18/// Single-writer append handle for an [`AtomicLog`].
19///
20/// The core design assumes exactly one active writer. If a caller wants to share write
21/// access across threads, it must add its own external synchronization.
22pub struct Writer<T> {
23    pub(crate) shared: Arc<Shared<T>>,
24    pub(crate) state: WriterState<T>,
25}
26
27pub(crate) struct Shared<T> {
28    pub(crate) retained_capacity: usize,
29    pub(crate) segment_capacity: usize,
30    pub(crate) head: ArcSwap<Segment<T>>,
31}
32
33pub(crate) struct WriterState<T> {
34    pub(crate) head: Arc<Segment<T>>,
35    pub(crate) retained: VecDeque<Arc<Segment<T>>>,
36    pub(crate) retained_segments: usize,
37}
38
39impl<T> Clone for AtomicLog<T> {
40    fn clone(&self) -> Self {
41        Self {
42            shared: Arc::clone(&self.shared),
43        }
44    }
45}
46
47impl<T> From<Writer<T>> for AtomicLog<T> {
48    fn from(writer: Writer<T>) -> Self {
49        writer.log()
50    }
51}
52
53impl<T> AtomicLog<T> {
54    /// Creates a new log and its corresponding writer.
55    ///
56    /// `retained_capacity` is the target logical retention size in elements. The current
57    /// implementation retains whole segments, so the observed window may exceed this value.
58    ///
59    /// `segment_capacity` is the number of elements stored in each segment and remains fixed
60    /// for the lifetime of the log.
61    ///
62    /// # Panics
63    ///
64    /// Panics if either capacity is zero.
65    pub fn new(retained_capacity: usize, segment_capacity: usize) -> (Writer<T>, Self) {
66        assert!(retained_capacity > 0, "retained capacity must be non-zero");
67        assert!(segment_capacity > 0, "segment capacity must be non-zero");
68
69        let retained_segments = retained_capacity.div_ceil(segment_capacity) + 1;
70        let head = Segment::new(0, std::sync::Weak::new(), segment_capacity);
71        let mut retained = VecDeque::with_capacity(retained_segments);
72        retained.push_back(Arc::clone(&head));
73
74        let shared = Arc::new(Shared {
75            retained_capacity,
76            segment_capacity,
77            head: ArcSwap::from(Arc::clone(&head)),
78        });
79
80        let writer = Writer {
81            shared: Arc::clone(&shared),
82            state: WriterState {
83                head,
84                retained,
85                retained_segments,
86            },
87        };
88        let log = Self { shared };
89
90        (writer, log)
91    }
92
93    /// Returns the configured logical retained capacity, in elements.
94    #[inline]
95    pub fn retained_capacity(&self) -> usize {
96        self.shared.retained_capacity
97    }
98
99    /// Returns the fixed segment size, in elements.
100    #[inline]
101    pub fn segment_capacity(&self) -> usize {
102        self.shared.segment_capacity
103    }
104
105    /// Captures a stable snapshot of the currently retained data.
106    ///
107    /// The snapshot borrows no locks and keeps its backing segments alive through `Arc`
108    /// ownership, so readers can iterate over the result without copying values out of the
109    /// log.
110    #[inline]
111    pub fn snapshot(&self) -> Snapshot<T> {
112        Snapshot::new(Arc::clone(&self.shared))
113    }
114}
115
116impl<T> Writer<T> {
117    /// Returns a clonable read handle for this writer's log.
118    #[inline]
119    pub fn log(&self) -> AtomicLog<T> {
120        AtomicLog {
121            shared: Arc::clone(&self.shared),
122        }
123    }
124
125    /// Returns the configured logical retained capacity, in elements.
126    #[inline]
127    pub fn retained_capacity(&self) -> usize {
128        self.shared.retained_capacity
129    }
130
131    /// Returns the fixed segment size, in elements.
132    #[inline]
133    pub fn segment_capacity(&self) -> usize {
134        self.shared.segment_capacity
135    }
136
137    /// Appends one value to the log and publishes it for readers.
138    ///
139    /// Values are written into the current head segment. If that segment is full, the writer
140    /// allocates a new head segment, publishes it, and continues there.
141    pub fn append(&mut self, value: T) {
142        if self.state.head.published_len() == self.shared.segment_capacity {
143            let next = Segment::new(
144                self.state.head.sequence + 1,
145                Arc::downgrade(&self.state.head),
146                self.shared.segment_capacity,
147            );
148            self.state.head = Arc::clone(&next);
149            self.state.retained.push_back(Arc::clone(&next));
150            while self.state.retained.len() > self.state.retained_segments {
151                self.state.retained.pop_front();
152            }
153            self.shared.head.store(next);
154        }
155
156        self.state.head.push(value);
157    }
158}