1mod claim;
87mod log;
88mod segment;
89mod snapshot;
90
91pub use log::{AtomicLog, Writer};
92pub use snapshot::{Chunks, Iter, SegmentSlice, Snapshot};
93
94#[cfg(test)]
95mod tests {
96 use crate::Snapshot;
97 use crate::log::AtomicLog;
98 use std::sync::Arc;
99 use std::sync::atomic::{AtomicUsize, Ordering};
100 use std::thread;
101
102 #[test]
103 fn empty_snapshot_is_empty() {
104 let log = AtomicLog::<usize>::new(4, 2);
105
106 assert!(!log.is_writer_claimed());
107
108 let snapshot = log.snapshot();
109
110 assert!(snapshot.is_empty());
111 assert_eq!(snapshot.len(), 0);
112 assert_eq!(snapshot.iter().count(), 0);
113 }
114
115 #[test]
116 fn new_starts_without_claimed_writer() {
117 let log = AtomicLog::<usize>::new(8, 2);
118
119 assert!(!log.is_writer_claimed());
120 let writer = log.try_claim_writer();
121 assert!(writer.is_some());
122 assert!(log.is_writer_claimed());
123 }
124
125 #[test]
126 fn snapshot_returns_full_retained_view() {
127 let (mut writer, log) = AtomicLog::new_claimed(5, 2);
128
129 for value in 0..8 {
130 writer.append(value);
131 }
132
133 let snapshot = log.snapshot();
134 let values: Vec<_> = snapshot.iter().copied().collect();
135
136 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
137 }
138
139 #[test]
140 fn snapshot_captures_full_retained_view() {
141 let (mut writer, log) = AtomicLog::new_claimed(8, 3);
142
143 for value in 0..7 {
144 writer.append(value);
145 }
146
147 let snapshot = log.snapshot();
148 let values: Vec<_> = snapshot.iter().copied().collect();
149
150 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6]);
151 }
152
153 #[test]
154 fn chunk_iteration_exposes_segment_sequences() {
155 let (mut writer, log) = AtomicLog::new_claimed(6, 2);
156
157 for value in 0..5 {
158 writer.append(value);
159 }
160
161 let chunks: Vec<_> = log
162 .snapshot()
163 .chunks()
164 .map(|chunk| (chunk.sequence(), chunk.values().to_vec()))
165 .collect();
166
167 assert_eq!(chunks, vec![(0, vec![0, 1]), (1, vec![2, 3]), (2, vec![4])]);
168 }
169
170 #[test]
171 fn held_snapshot_remains_stable_after_reclamation() {
172 let (mut writer, log) = AtomicLog::new_claimed(3, 1);
173 for value in 0..3 {
174 writer.append(value);
175 }
176 let snapshot = log.snapshot();
177
178 for value in 3..20 {
179 writer.append(value);
180 }
181
182 let old_values: Vec<_> = snapshot.iter().copied().collect();
183 let fresh_values: Vec<_> = log.snapshot().iter().copied().collect();
184
185 assert_eq!(old_values, vec![0, 1, 2]);
186 assert_eq!(fresh_values, vec![16, 17, 18, 19]);
187 }
188
189 #[test]
190 fn refresh_replaces_snapshot_with_latest_view() {
191 let (mut writer, log) = AtomicLog::new_claimed(4, 2);
192 for value in 0..4 {
193 writer.append(value);
194 }
195 let mut snapshot = log.snapshot();
196
197 for value in 4..9 {
198 writer.append(value);
199 }
200 snapshot.refresh();
201
202 let values: Vec<_> = snapshot.iter().copied().collect();
203 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
204 }
205
206 #[test]
207 fn snapshot_refresh_extends_same_head_without_rebuild() {
208 let (mut writer, log) = AtomicLog::new_claimed(4, 8);
209 writer.append(0);
210 writer.append(1);
211 let mut snapshot = log.snapshot();
212
213 writer.append(2);
214 writer.append(3);
215 snapshot.refresh();
216
217 let values: Vec<_> = snapshot.iter().copied().collect();
218 assert_eq!(values, vec![0, 1, 2, 3]);
219 assert_eq!(snapshot.chunks().count(), 1);
220 }
221
222 #[test]
223 fn snapshot_refresh_appends_new_segments_when_continuous() {
224 let (mut writer, log) = AtomicLog::new_claimed(5, 2);
225 for value in 0..3 {
226 writer.append(value);
227 }
228 let mut snapshot = log.snapshot();
229
230 for value in 3..6 {
231 writer.append(value);
232 }
233 snapshot.refresh();
234
235 let values: Vec<_> = snapshot.iter().copied().collect();
236 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
237 assert_eq!(snapshot.chunks().count(), 3);
238 }
239
240 #[test]
241 fn writer_drop_preserves_retained_segments_for_refresh() {
242 let (mut writer, log) = AtomicLog::new_claimed(8, 2);
243 for value in 0..3 {
244 writer.append(value);
245 }
246 let mut snapshot = log.snapshot();
247
248 for value in 3..8 {
249 writer.append(value);
250 }
251 drop(writer);
252
253 snapshot.refresh();
254
255 let values: Vec<_> = snapshot.iter().copied().collect();
256 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7]);
257 }
258
259 #[test]
260 fn writer_can_be_reclaimed_after_drop() {
261 let (mut writer, log) = AtomicLog::new_claimed(8, 2);
262 writer.append(1);
263 assert!(log.is_writer_claimed());
264 drop(writer);
265 assert!(!log.is_writer_claimed());
266
267 let mut writer = log
268 .try_claim_writer()
269 .expect("writer claim should be released");
270 assert!(log.is_writer_claimed());
271 writer.append(2);
272
273 let values: Vec<_> = log.snapshot().iter().copied().collect();
274 assert_eq!(values, vec![1, 2]);
275 }
276
277 #[test]
278 fn writer_cannot_be_reclaimed_while_existing_writer_lives() {
279 let (_writer, log) = AtomicLog::<usize>::new_claimed(8, 2);
280
281 assert!(log.is_writer_claimed());
282 assert!(log.try_claim_writer().is_none());
283 }
284
285 #[test]
286 fn drops_only_initialized_values() {
287 static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289 struct CountDrop;
290 impl Drop for CountDrop {
291 fn drop(&mut self) {
292 DROPS.fetch_add(1, Ordering::Relaxed);
293 }
294 }
295
296 {
297 let (mut writer, _log) = AtomicLog::new_claimed(10, 8);
298 for _ in 0..3 {
299 writer.append(CountDrop);
300 }
301 }
302
303 assert_eq!(DROPS.load(Ordering::Relaxed), 3);
304 }
305
306 #[test]
307 fn many_readers_can_snapshot_while_writer_appends() {
308 let (mut writer, log) = AtomicLog::new_claimed(64, 8);
309 let log = Arc::new(log);
310 let stop = Arc::new(AtomicUsize::new(0));
311 let mut readers = Vec::new();
312
313 for _ in 0..4 {
314 let log = Arc::clone(&log);
315 let stop = Arc::clone(&stop);
316 readers.push(thread::spawn(move || {
317 while stop.load(Ordering::Acquire) == 0 {
318 let values: Vec<_> = log.snapshot().iter().copied().collect();
319 assert!(values.windows(2).all(|pair| pair[0] + 1 == pair[1]));
320 }
321 }));
322 }
323
324 for value in 0..1000 {
325 writer.append(value);
326 }
327 stop.store(1, Ordering::Release);
328
329 for reader in readers {
330 reader.join().unwrap();
331 }
332 }
333
334 #[test]
335 fn writer_can_be_shared_through_a_lock_when_requested() {
336 let (writer, log) = AtomicLog::new_claimed(8, 2);
337 let writer = std::sync::Arc::new(std::sync::Mutex::new(writer));
338
339 let first = {
340 let writer = std::sync::Arc::clone(&writer);
341 thread::spawn(move || writer.lock().unwrap().append(1))
342 };
343 let second = {
344 let writer = std::sync::Arc::clone(&writer);
345 thread::spawn(move || writer.lock().unwrap().append(2))
346 };
347
348 first.join().unwrap();
349 second.join().unwrap();
350
351 let values: Vec<_> = log.snapshot().iter().copied().collect();
352 assert_eq!(values.len(), 2);
353 assert!(values.contains(&1));
354 assert!(values.contains(&2));
355 }
356
357 #[test]
358 fn append_batch_produces_same_result_as_sequential_append() {
359 let (mut w_seq, log_seq) = AtomicLog::new_claimed(8, 4);
360 for v in 0..7 {
361 w_seq.append(v);
362 }
363
364 let (mut w_batch, log_batch) = AtomicLog::new_claimed(8, 4);
365 w_batch.append_batch(0..7);
366
367 let seq: Vec<_> = log_seq.snapshot().iter().copied().collect();
368 let batch: Vec<_> = log_batch.snapshot().iter().copied().collect();
369 assert_eq!(seq, batch);
370 }
371
372 #[test]
373 fn append_batch_empty_iterator_is_a_no_op() {
374 let (mut writer, log) = AtomicLog::new_claimed(8, 4);
375 writer.append(1);
376 writer.append_batch(std::iter::empty::<i32>());
377
378 let values: Vec<_> = log.snapshot().iter().copied().collect();
379 assert_eq!(values, vec![1]);
380 }
381
382 #[test]
383 fn append_batch_empty_iterator_on_full_head_does_not_allocate_extra_segment() {
384 let (mut writer, log) = AtomicLog::new_claimed(4, 4);
388 writer.append_batch(0..4);
389 writer.append_batch(std::iter::empty::<i32>());
390
391 let snapshot = log.snapshot();
392 let chunks: Vec<_> = snapshot.chunks().collect();
393 assert_eq!(
394 chunks.len(),
395 1,
396 "expected exactly one segment, got {}",
397 chunks.len()
398 );
399 assert_eq!(chunks[0].values(), &[0, 1, 2, 3]);
400 }
401
402 #[test]
403 fn append_batch_spanning_segment_boundary() {
404 let (mut writer, log) = AtomicLog::new_claimed(12, 3);
406 writer.append_batch(0..7);
407
408 let chunks: Vec<_> = log
409 .snapshot()
410 .chunks()
411 .map(|c| (c.sequence(), c.values().to_vec()))
412 .collect();
413
414 assert_eq!(
415 chunks,
416 vec![(0, vec![0, 1, 2]), (1, vec![3, 4, 5]), (2, vec![6])]
417 );
418 }
419
420 #[test]
421 fn append_batch_exactly_fills_current_segment_without_spurious_roll() {
422 let (mut writer, log) = AtomicLog::new_claimed(8, 4);
423 writer.append(0); writer.append_batch(1..4); let snapshot = log.snapshot();
427 let chunks: Vec<_> = snapshot.chunks().collect();
428 assert_eq!(chunks.len(), 1);
429 assert_eq!(chunks[0].values(), &[0, 1, 2, 3]);
430 }
431
432 #[test]
433 fn append_batch_interleaves_correctly_with_append() {
434 let (mut writer, log) = AtomicLog::new_claimed(16, 4);
435 writer.append_batch(0..3);
436 writer.append(3);
437 writer.append_batch(4..8);
438 writer.append(8);
439
440 let values: Vec<_> = log.snapshot().iter().copied().collect();
441 assert_eq!(values, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
442 }
443
444 #[test]
445 fn log_snapshot_and_writer_conversions_round_trip() {
446 let (mut writer, log) = AtomicLog::new_claimed(8, 2);
447 for value in 0..5 {
448 writer.append(value);
449 }
450
451 let log_from_writer = writer.log();
452 let snapshot = Snapshot::from(log_from_writer.clone());
453 let log_from_snapshot = AtomicLog::from(snapshot);
454
455 let values: Vec<_> = log_from_snapshot.snapshot().iter().copied().collect();
456 assert_eq!(values, vec![0, 1, 2, 3, 4]);
457
458 let snapshot = log.snapshot();
459 let cloned_log = snapshot.log();
460 let values: Vec<_> = cloned_log.snapshot().iter().copied().collect();
461 assert_eq!(values, vec![0, 1, 2, 3, 4]);
462 }
463}