1mod log;
83mod segment;
84mod snapshot;
85
86pub use log::{AtomicLog, Writer};
87pub use snapshot::{Chunks, Iter, SegmentSlice, Snapshot};
88
89#[cfg(test)]
90mod tests {
91 use crate::Snapshot;
92 use crate::log::AtomicLog;
93 use std::sync::Arc;
94 use std::sync::atomic::{AtomicUsize, Ordering};
95 use std::thread;
96
97 #[test]
98 fn empty_snapshot_is_empty() {
99 let (_writer, log) = AtomicLog::<usize>::new(4, 2);
100
101 let snapshot = log.snapshot();
102
103 assert!(snapshot.is_empty());
104 assert_eq!(snapshot.len(), 0);
105 assert_eq!(snapshot.iter().count(), 0);
106 }
107
108 #[test]
109 fn snapshot_returns_full_retained_view() {
110 let (mut writer, log) = AtomicLog::new(5, 2);
111
112 for value in 0..8 {
113 writer.append(value);
114 }
115
116 let snapshot = log.snapshot();
117 let values: Vec<_> = snapshot.iter().copied().collect();
118
119 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
120 }
121
122 #[test]
123 fn snapshot_captures_full_retained_view() {
124 let (mut writer, log) = AtomicLog::new(8, 3);
125
126 for value in 0..7 {
127 writer.append(value);
128 }
129
130 let snapshot = log.snapshot();
131 let values: Vec<_> = snapshot.iter().copied().collect();
132
133 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6]);
134 }
135
136 #[test]
137 fn chunk_iteration_exposes_segment_sequences() {
138 let (mut writer, log) = AtomicLog::new(6, 2);
139
140 for value in 0..5 {
141 writer.append(value);
142 }
143
144 let chunks: Vec<_> = log
145 .snapshot()
146 .chunks()
147 .map(|chunk| (chunk.sequence(), chunk.values().to_vec()))
148 .collect();
149
150 assert_eq!(chunks, vec![(0, vec![0, 1]), (1, vec![2, 3]), (2, vec![4])]);
151 }
152
153 #[test]
154 fn held_snapshot_remains_stable_after_reclamation() {
155 let (mut writer, log) = AtomicLog::new(3, 1);
156 for value in 0..3 {
157 writer.append(value);
158 }
159 let snapshot = log.snapshot();
160
161 for value in 3..20 {
162 writer.append(value);
163 }
164
165 let old_values: Vec<_> = snapshot.iter().copied().collect();
166 let fresh_values: Vec<_> = log.snapshot().iter().copied().collect();
167
168 assert_eq!(old_values, vec![0, 1, 2]);
169 assert_eq!(fresh_values, vec![16, 17, 18, 19]);
170 }
171
172 #[test]
173 fn refresh_replaces_snapshot_with_latest_view() {
174 let (mut writer, log) = AtomicLog::new(4, 2);
175 for value in 0..4 {
176 writer.append(value);
177 }
178 let mut snapshot = log.snapshot();
179
180 for value in 4..9 {
181 writer.append(value);
182 }
183 snapshot.refresh();
184
185 let values: Vec<_> = snapshot.iter().copied().collect();
186 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
187 }
188
189 #[test]
190 fn snapshot_refresh_extends_same_head_without_rebuild() {
191 let (mut writer, log) = AtomicLog::new(4, 8);
192 writer.append(0);
193 writer.append(1);
194 let mut snapshot = log.snapshot();
195
196 writer.append(2);
197 writer.append(3);
198 snapshot.refresh();
199
200 let values: Vec<_> = snapshot.iter().copied().collect();
201 assert_eq!(values, vec![0, 1, 2, 3]);
202 assert_eq!(snapshot.chunks().count(), 1);
203 }
204
205 #[test]
206 fn snapshot_refresh_appends_new_segments_when_continuous() {
207 let (mut writer, log) = AtomicLog::new(5, 2);
208 for value in 0..3 {
209 writer.append(value);
210 }
211 let mut snapshot = log.snapshot();
212
213 for value in 3..6 {
214 writer.append(value);
215 }
216 snapshot.refresh();
217
218 let values: Vec<_> = snapshot.iter().copied().collect();
219 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
220 assert_eq!(snapshot.chunks().count(), 3);
221 }
222
223 #[test]
224 fn drops_only_initialized_values() {
225 static DROPS: AtomicUsize = AtomicUsize::new(0);
226
227 struct CountDrop;
228 impl Drop for CountDrop {
229 fn drop(&mut self) {
230 DROPS.fetch_add(1, Ordering::Relaxed);
231 }
232 }
233
234 {
235 let (mut writer, _log) = AtomicLog::new(10, 8);
236 for _ in 0..3 {
237 writer.append(CountDrop);
238 }
239 }
240
241 assert_eq!(DROPS.load(Ordering::Relaxed), 3);
242 }
243
244 #[test]
245 fn many_readers_can_snapshot_while_writer_appends() {
246 let (mut writer, log) = AtomicLog::new(64, 8);
247 let log = Arc::new(log);
248 let stop = Arc::new(AtomicUsize::new(0));
249 let mut readers = Vec::new();
250
251 for _ in 0..4 {
252 let log = Arc::clone(&log);
253 let stop = Arc::clone(&stop);
254 readers.push(thread::spawn(move || {
255 while stop.load(Ordering::Acquire) == 0 {
256 let values: Vec<_> = log.snapshot().iter().copied().collect();
257 assert!(values.windows(2).all(|pair| pair[0] + 1 == pair[1]));
258 }
259 }));
260 }
261
262 for value in 0..1000 {
263 writer.append(value);
264 }
265 stop.store(1, Ordering::Release);
266
267 for reader in readers {
268 reader.join().unwrap();
269 }
270 }
271
272 #[test]
273 fn writer_can_be_shared_through_a_lock_when_requested() {
274 let (writer, log) = AtomicLog::new(8, 2);
275 let writer = std::sync::Arc::new(std::sync::Mutex::new(writer));
276
277 let first = {
278 let writer = std::sync::Arc::clone(&writer);
279 thread::spawn(move || writer.lock().unwrap().append(1))
280 };
281 let second = {
282 let writer = std::sync::Arc::clone(&writer);
283 thread::spawn(move || writer.lock().unwrap().append(2))
284 };
285
286 first.join().unwrap();
287 second.join().unwrap();
288
289 let values: Vec<_> = log.snapshot().iter().copied().collect();
290 assert_eq!(values.len(), 2);
291 assert!(values.contains(&1));
292 assert!(values.contains(&2));
293 }
294
295 #[test]
296 fn log_snapshot_and_writer_conversions_round_trip() {
297 let (mut writer, log) = AtomicLog::new(8, 2);
298 for value in 0..5 {
299 writer.append(value);
300 }
301
302 let log_from_writer = writer.log();
303 let snapshot = Snapshot::from(log_from_writer.clone());
304 let log_from_snapshot = AtomicLog::from(snapshot);
305
306 let values: Vec<_> = log_from_snapshot.snapshot().iter().copied().collect();
307 assert_eq!(values, vec![0, 1, 2, 3, 4]);
308
309 let snapshot = log.snapshot();
310 let cloned_log = snapshot.log();
311 let values: Vec<_> = cloned_log.snapshot().iter().copied().collect();
312 assert_eq!(values, vec![0, 1, 2, 3, 4]);
313 }
314}