Skip to main content

atomic_log/
snapshot.rs

1use std::collections::VecDeque;
2use std::ops::Range;
3use std::sync::Arc;
4
5use crate::log::Shared;
6use crate::segment::Segment;
7
8/// A stable, zero-copy view of the data currently retained by an [`AtomicLog`](crate::AtomicLog).
9///
10/// A snapshot owns `Arc`s to every segment it exposes, so the underlying storage remains
11/// alive for the lifetime of the snapshot. Refreshing a snapshot replaces its contents with
12/// a newer captured view.
13pub struct Snapshot<T> {
14    pub(crate) shared: Arc<Shared<T>>,
15    len: usize,
16    chunks: VecDeque<SnapshotChunk<T>>,
17}
18
19/// A contiguous slice of values that came from one backing segment.
20///
21/// Exposed by [`Snapshot::chunks`] for consumers that want per-segment access or segment
22/// sequence metadata.
23pub struct SegmentSlice<'a, T> {
24    sequence: u64,
25    values: &'a [T],
26}
27
28struct SnapshotChunk<T> {
29    segment: Arc<Segment<T>>,
30    range: Range<usize>,
31}
32
33pub struct Iter<'a, T> {
34    chunks: std::collections::vec_deque::Iter<'a, SnapshotChunk<T>>,
35    current: Option<std::slice::Iter<'a, T>>,
36}
37
38/// Iterator over the segment-backed slices in a [`Snapshot`].
39pub struct Chunks<'a, T> {
40    chunks: std::collections::vec_deque::Iter<'a, SnapshotChunk<T>>,
41}
42
43impl<T> Snapshot<T> {
44    pub(crate) fn new(shared: Arc<Shared<T>>) -> Self {
45        let mut snapshot = Self {
46            shared,
47            len: 0,
48            chunks: VecDeque::new(),
49        };
50        snapshot.rebuild();
51        snapshot
52    }
53
54    fn rebuild(&mut self) {
55        let head = self.shared.head.load_full();
56        let mut reversed = Vec::with_capacity(self.chunks.len().max(1));
57        let mut cursor = Some(head);
58        let mut len = 0;
59
60        while let Some(segment) = cursor {
61            let published = segment.published_len();
62            if published > 0 {
63                len += published;
64                reversed.push(SnapshotChunk {
65                    segment: Arc::clone(&segment),
66                    range: 0..published,
67                });
68            }
69            cursor = segment.previous.upgrade();
70        }
71
72        reversed.reverse();
73        self.chunks.clear();
74        self.chunks.extend(reversed);
75        self.len = len;
76    }
77
78    /// Refreshes the snapshot in place to reflect the current state of the log.
79    ///
80    /// This attempts a cheap same-head extension first, then an incremental segment append,
81    /// and falls back to a full rebuild if continuity has been lost.
82    pub fn refresh(&mut self) {
83        let head = self.shared.head.load_full();
84        if self.refresh_same_head(&head) {
85            return;
86        }
87        if self.refresh_incremental(&head) {
88            return;
89        }
90
91        self.rebuild();
92    }
93
94    fn refresh_same_head(&mut self, head: &Arc<Segment<T>>) -> bool {
95        let Some(last) = self.chunks.back_mut() else {
96            return head.published_len() == 0;
97        };
98        if !Arc::ptr_eq(&last.segment, head) {
99            return false;
100        }
101
102        let published = head.published_len();
103        if published <= last.range.end {
104            return true;
105        }
106
107        let added = published - last.range.end;
108        last.range.end = published;
109        self.len += added;
110        true
111    }
112
113    fn refresh_incremental(&mut self, head: &Arc<Segment<T>>) -> bool {
114        let Some(last) = self.chunks.back_mut() else {
115            return false;
116        };
117
118        let mut cursor = Some(Arc::clone(head));
119        let mut new_segments: Vec<Arc<Segment<T>>> = Vec::new();
120        while let Some(segment) = cursor {
121            if Arc::ptr_eq(&segment, &last.segment) {
122                let published = segment.published_len();
123                if published > last.range.end {
124                    let added = published - last.range.end;
125                    last.range.end = published;
126                    self.len += added;
127                }
128
129                for segment in new_segments.into_iter().rev() {
130                    let published = segment.published_len();
131                    if published > 0 {
132                        self.len += published;
133                        self.chunks.push_back(SnapshotChunk {
134                            segment,
135                            range: 0..published,
136                        });
137                    }
138                }
139                return true;
140            }
141
142            cursor = segment.previous.upgrade();
143            new_segments.push(segment);
144        }
145
146        false
147    }
148
149    /// Returns the total number of elements visible through this snapshot.
150    #[inline]
151    pub fn len(&self) -> usize {
152        self.len
153    }
154
155    /// Returns `true` if the snapshot contains no elements.
156    #[inline]
157    pub fn is_empty(&self) -> bool {
158        self.len == 0
159    }
160
161    /// Iterates over all visible values as a flat `&T` stream.
162    #[inline]
163    pub fn iter(&self) -> Iter<'_, T> {
164        Iter {
165            chunks: self.chunks.iter(),
166            current: None,
167        }
168    }
169
170    /// Iterates over the snapshot one backing segment at a time.
171    #[inline]
172    pub fn chunks(&self) -> Chunks<'_, T> {
173        Chunks {
174            chunks: self.chunks.iter(),
175        }
176    }
177
178    /// Returns a read handle for the log this snapshot came from.
179    #[inline]
180    pub fn log(&self) -> crate::log::AtomicLog<T> {
181        crate::log::AtomicLog {
182            shared: Arc::clone(&self.shared),
183        }
184    }
185}
186
187impl<'a, T> SegmentSlice<'a, T> {
188    /// Returns the monotonically increasing sequence number of the backing segment.
189    #[inline]
190    pub fn sequence(&self) -> u64 {
191        self.sequence
192    }
193
194    /// Returns the values captured from this backing segment.
195    #[inline]
196    pub fn values(&self) -> &'a [T] {
197        self.values
198    }
199}
200
201impl<'a, T> Iterator for Iter<'a, T> {
202    type Item = &'a T;
203
204    fn next(&mut self) -> Option<Self::Item> {
205        loop {
206            if let Some(current) = &mut self.current
207                && let Some(value) = current.next()
208            {
209                return Some(value);
210            }
211
212            let chunk = self.chunks.next()?;
213            self.current = Some(chunk.as_slice().iter());
214        }
215    }
216
217    fn size_hint(&self) -> (usize, Option<usize>) {
218        let current = self.current.as_ref().map_or(0, ExactSizeIterator::len);
219        let rest: usize = self
220            .chunks
221            .clone()
222            .map(|chunk| chunk.range.end - chunk.range.start)
223            .sum();
224        let total = current + rest;
225        (total, Some(total))
226    }
227}
228
229impl<T> ExactSizeIterator for Iter<'_, T> {}
230
231impl<'a, T> Iterator for Chunks<'a, T> {
232    type Item = SegmentSlice<'a, T>;
233
234    fn next(&mut self) -> Option<Self::Item> {
235        let chunk = self.chunks.next()?;
236        Some(SegmentSlice {
237            sequence: chunk.segment.sequence,
238            values: chunk.as_slice(),
239        })
240    }
241}
242
243impl<T> SnapshotChunk<T> {
244    #[inline]
245    fn as_slice(&self) -> &[T] {
246        self.segment.slice(self.range.clone())
247    }
248}
249
250impl<T> std::fmt::Debug for Snapshot<T> {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        f.debug_struct("Snapshot")
253            .field("len", &self.len)
254            .field("chunks", &self.chunks.len())
255            .finish()
256    }
257}
258
259impl<T> From<crate::log::AtomicLog<T>> for Snapshot<T> {
260    fn from(log: crate::log::AtomicLog<T>) -> Self {
261        log.snapshot()
262    }
263}
264
265impl<T> From<Snapshot<T>> for crate::log::AtomicLog<T> {
266    fn from(snapshot: Snapshot<T>) -> Self {
267        snapshot.log()
268    }
269}
270
271impl<'a, T> IntoIterator for &'a Snapshot<T> {
272    type Item = &'a T;
273    type IntoIter = Iter<'a, T>;
274
275    fn into_iter(self) -> Self::IntoIter {
276        self.iter()
277    }
278}