1use std::collections::VecDeque;
2use std::ops::Range;
3use std::sync::Arc;
4
5use crate::log::Shared;
6use crate::segment::Segment;
7
8pub struct Snapshot<T> {
14 pub(crate) shared: Arc<Shared<T>>,
15 len: usize,
16 chunks: VecDeque<SnapshotChunk<T>>,
17}
18
19pub struct SegmentSlice<'a, T> {
24 sequence: u64,
25 values: &'a [T],
26}
27
28struct SnapshotChunk<T> {
29 segment: Arc<Segment<T>>,
30 range: Range<usize>,
31}
32
33pub struct Iter<'a, T> {
34 chunks: std::collections::vec_deque::Iter<'a, SnapshotChunk<T>>,
35 current: Option<std::slice::Iter<'a, T>>,
36}
37
38pub struct Chunks<'a, T> {
40 chunks: std::collections::vec_deque::Iter<'a, SnapshotChunk<T>>,
41}
42
43impl<T> Snapshot<T> {
44 pub(crate) fn new(shared: Arc<Shared<T>>) -> Self {
45 let mut snapshot = Self {
46 shared,
47 len: 0,
48 chunks: VecDeque::new(),
49 };
50 snapshot.rebuild();
51 snapshot
52 }
53
54 fn rebuild(&mut self) {
55 let head = self.shared.head.load_full();
56 let mut reversed = Vec::with_capacity(self.chunks.len().max(1));
57 let mut cursor = Some(head);
58 let mut len = 0;
59
60 while let Some(segment) = cursor {
61 let published = segment.published_len();
62 if published > 0 {
63 len += published;
64 reversed.push(SnapshotChunk {
65 segment: Arc::clone(&segment),
66 range: 0..published,
67 });
68 }
69 cursor = segment.previous.upgrade();
70 }
71
72 reversed.reverse();
73 self.chunks.clear();
74 self.chunks.extend(reversed);
75 self.len = len;
76 }
77
78 pub fn refresh(&mut self) {
83 let head = self.shared.head.load_full();
84 if self.refresh_same_head(&head) {
85 return;
86 }
87 if self.refresh_incremental(&head) {
88 return;
89 }
90
91 self.rebuild();
92 }
93
94 fn refresh_same_head(&mut self, head: &Arc<Segment<T>>) -> bool {
95 let Some(last) = self.chunks.back_mut() else {
96 return head.published_len() == 0;
97 };
98 if !Arc::ptr_eq(&last.segment, head) {
99 return false;
100 }
101
102 let published = head.published_len();
103 if published <= last.range.end {
104 return true;
105 }
106
107 let added = published - last.range.end;
108 last.range.end = published;
109 self.len += added;
110 true
111 }
112
113 fn refresh_incremental(&mut self, head: &Arc<Segment<T>>) -> bool {
114 let Some(last) = self.chunks.back_mut() else {
115 return false;
116 };
117
118 let mut cursor = Some(Arc::clone(head));
119 let mut new_segments: Vec<Arc<Segment<T>>> = Vec::new();
120 while let Some(segment) = cursor {
121 if Arc::ptr_eq(&segment, &last.segment) {
122 let published = segment.published_len();
123 if published > last.range.end {
124 let added = published - last.range.end;
125 last.range.end = published;
126 self.len += added;
127 }
128
129 for segment in new_segments.into_iter().rev() {
130 let published = segment.published_len();
131 if published > 0 {
132 self.len += published;
133 self.chunks.push_back(SnapshotChunk {
134 segment,
135 range: 0..published,
136 });
137 }
138 }
139 return true;
140 }
141
142 cursor = segment.previous.upgrade();
143 new_segments.push(segment);
144 }
145
146 false
147 }
148
149 #[inline]
151 pub fn len(&self) -> usize {
152 self.len
153 }
154
155 #[inline]
157 pub fn is_empty(&self) -> bool {
158 self.len == 0
159 }
160
161 #[inline]
163 pub fn iter(&self) -> Iter<'_, T> {
164 Iter {
165 chunks: self.chunks.iter(),
166 current: None,
167 }
168 }
169
170 #[inline]
172 pub fn chunks(&self) -> Chunks<'_, T> {
173 Chunks {
174 chunks: self.chunks.iter(),
175 }
176 }
177
178 #[inline]
180 pub fn log(&self) -> crate::log::AtomicLog<T> {
181 crate::log::AtomicLog {
182 shared: Arc::clone(&self.shared),
183 }
184 }
185}
186
187impl<'a, T> SegmentSlice<'a, T> {
188 #[inline]
190 pub fn sequence(&self) -> u64 {
191 self.sequence
192 }
193
194 #[inline]
196 pub fn values(&self) -> &'a [T] {
197 self.values
198 }
199}
200
201impl<'a, T> Iterator for Iter<'a, T> {
202 type Item = &'a T;
203
204 fn next(&mut self) -> Option<Self::Item> {
205 loop {
206 if let Some(current) = &mut self.current
207 && let Some(value) = current.next()
208 {
209 return Some(value);
210 }
211
212 let chunk = self.chunks.next()?;
213 self.current = Some(chunk.as_slice().iter());
214 }
215 }
216
217 fn size_hint(&self) -> (usize, Option<usize>) {
218 let current = self.current.as_ref().map_or(0, ExactSizeIterator::len);
219 let rest: usize = self
220 .chunks
221 .clone()
222 .map(|chunk| chunk.range.end - chunk.range.start)
223 .sum();
224 let total = current + rest;
225 (total, Some(total))
226 }
227}
228
229impl<T> ExactSizeIterator for Iter<'_, T> {}
230
231impl<'a, T> Iterator for Chunks<'a, T> {
232 type Item = SegmentSlice<'a, T>;
233
234 fn next(&mut self) -> Option<Self::Item> {
235 let chunk = self.chunks.next()?;
236 Some(SegmentSlice {
237 sequence: chunk.segment.sequence,
238 values: chunk.as_slice(),
239 })
240 }
241}
242
243impl<T> SnapshotChunk<T> {
244 #[inline]
245 fn as_slice(&self) -> &[T] {
246 self.segment.slice(self.range.clone())
247 }
248}
249
250impl<T> std::fmt::Debug for Snapshot<T> {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 f.debug_struct("Snapshot")
253 .field("len", &self.len)
254 .field("chunks", &self.chunks.len())
255 .finish()
256 }
257}
258
259impl<T> From<crate::log::AtomicLog<T>> for Snapshot<T> {
260 fn from(log: crate::log::AtomicLog<T>) -> Self {
261 log.snapshot()
262 }
263}
264
265impl<T> From<Snapshot<T>> for crate::log::AtomicLog<T> {
266 fn from(snapshot: Snapshot<T>) -> Self {
267 snapshot.log()
268 }
269}
270
271impl<'a, T> IntoIterator for &'a Snapshot<T> {
272 type Item = &'a T;
273 type IntoIter = Iter<'a, T>;
274
275 fn into_iter(self) -> Self::IntoIter {
276 self.iter()
277 }
278}