1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
//! Tests for the explicit flush API (`flush_oldest_frozen`, `flush_all_frozen`).
//!
//! These tests verify the new public flush methods that replace the previous
//! inline-flush behaviour. Writes no longer auto-flush frozen memtables;
//! callers (or a background worker) must invoke these methods explicitly.
#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
use crate::engine::Engine;
use crate::engine::tests::helpers::*;
use std::collections::HashMap;
use tempfile::TempDir;
// ================================================================
// flush_oldest_frozen: basic contract
// ================================================================
/// # Scenario
/// `flush_oldest_frozen()` returns `false` when there are no frozen
/// memtables.
///
/// # Expected behavior
/// Returns `Ok(false)` — nothing to flush.
#[test]
fn flush_oldest_frozen_noop_when_empty() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), memtable_only_config()).unwrap();
// No data written, no frozen memtables
let flushed = engine.flush_oldest_frozen().unwrap();
assert!(!flushed, "Should return false when no frozen memtables");
assert_eq!(engine.stats().unwrap().sstables_count, 0);
}
/// # Scenario
/// `flush_oldest_frozen()` flushes exactly one frozen memtable per call.
///
/// # Actions
/// 1. Write keys with a small buffer until frozen_count >= 2.
/// 2. Call `flush_oldest_frozen()` once.
/// 3. Check that frozen_count decreased by 1 and sstables_count increased.
///
/// # Expected behavior
/// One frozen memtable is converted to an SSTable; the rest remain frozen.
#[test]
fn flush_oldest_frozen_flushes_one() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
// Write enough to create multiple frozen memtables
for i in 0..200u32 {
engine
.put(
format!("k_{:04}", i).into_bytes(),
format!("v_{:04}", i).into_bytes(),
)
.unwrap();
}
let before = engine.stats().unwrap();
assert!(
before.frozen_count >= 2,
"Need at least 2 frozen, got {}",
before.frozen_count
);
let flushed = engine.flush_oldest_frozen().unwrap();
assert!(flushed, "Should return true when a frozen was flushed");
let after = engine.stats().unwrap();
assert_eq!(
after.frozen_count,
before.frozen_count - 1,
"frozen_count should decrease by 1"
);
assert_eq!(
after.sstables_count,
before.sstables_count + 1,
"sstables_count should increase by 1"
);
}
// ================================================================
// flush_all_frozen: basic contract
// ================================================================
/// # Scenario
/// `flush_all_frozen()` returns 0 when there are no frozen memtables.
///
/// # Expected behavior
/// Returns `Ok(0)` — nothing to flush.
#[test]
fn flush_all_frozen_noop_when_empty() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), memtable_only_config()).unwrap();
let count = engine.flush_all_frozen().unwrap();
assert_eq!(count, 0);
assert_eq!(engine.stats().unwrap().sstables_count, 0);
}
/// # Scenario
/// `flush_all_frozen()` drains all frozen memtables to SSTables.
///
/// # Actions
/// 1. Write many keys with a small buffer → accumulate frozen memtables.
/// 2. Call `flush_all_frozen()`.
/// 3. Check frozen_count = 0 and sstables_count equals the returned count.
///
/// # Expected behavior
/// All frozen memtables are flushed; the returned count matches the
/// number that were pending.
#[test]
fn flush_all_frozen_drains_all() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
for i in 0..200u32 {
engine
.put(
format!("k_{:04}", i).into_bytes(),
format!("v_{:04}", i).into_bytes(),
)
.unwrap();
}
let before = engine.stats().unwrap();
assert!(before.frozen_count > 0, "Should have frozen memtables");
let flushed_count = engine.flush_all_frozen().unwrap();
assert_eq!(
flushed_count, before.frozen_count,
"Should flush exactly the number of frozen memtables"
);
let after = engine.stats().unwrap();
assert_eq!(after.frozen_count, 0, "All frozen should be drained");
assert_eq!(
after.sstables_count, flushed_count,
"Each frozen should become one SSTable"
);
}
// ================================================================
// Data integrity: writes without flush are still readable
// ================================================================
/// # Scenario
/// Writes that accumulate in frozen memtables (without any flush) must
/// still be readable via `get()` and `scan()`.
///
/// # Actions
/// 1. Write 100 keys with a small buffer → many frozen memtables.
/// 2. Do NOT call any flush method.
/// 3. Get every key.
///
/// # Expected behavior
/// All 100 keys are readable from frozen memtables. No SSTables exist.
#[test]
fn reads_work_without_flush() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
for i in 0..100u32 {
engine
.put(
format!("k_{:04}", i).into_bytes(),
format!("v_{:04}", i).into_bytes(),
)
.unwrap();
}
let stats = engine.stats().unwrap();
assert!(stats.frozen_count > 0, "Should have frozen memtables");
assert_eq!(stats.sstables_count, 0, "No flush called, no SSTables");
// All keys must be readable from frozen memtables
for i in 0..100u32 {
let key = format!("k_{:04}", i).into_bytes();
let expected = format!("v_{:04}", i).into_bytes();
assert_eq!(
engine.get(key).unwrap(),
Some(expected),
"k_{:04} should be readable from frozen memtables",
i
);
}
}
// ================================================================
// put/delete/delete_range return value
// ================================================================
/// # Scenario
/// Verify that put/delete/delete_range return `true` when a freeze
/// occurs, and `false` otherwise.
///
/// # Actions
/// 1. Write a key with memtable-only config (large buffer) — no freeze.
/// 2. Write keys with small buffer until a freeze is signalled.
///
/// # Expected behavior
/// The first put returns `false`; eventually a put returns `true`.
#[test]
fn write_methods_return_freeze_signal() {
// Large buffer — no freeze expected
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), memtable_only_config()).unwrap();
let froze = engine.put(b"hello".to_vec(), b"world".to_vec()).unwrap();
assert!(!froze, "Large buffer should not trigger freeze");
let froze = engine.delete(b"hello".to_vec()).unwrap();
assert!(!froze, "Large buffer delete should not trigger freeze");
let froze = engine.delete_range(b"a".to_vec(), b"z".to_vec()).unwrap();
assert!(
!froze,
"Large buffer range-delete should not trigger freeze"
);
// Small buffer — freeze expected eventually
let dir2 = TempDir::new().unwrap();
let engine2 = Engine::open(dir2.path(), small_buffer_config()).unwrap();
let mut saw_freeze = false;
for i in 0..100u32 {
let froze = engine2
.put(
format!("k_{:04}", i).into_bytes(),
format!("v_{:04}", i).into_bytes(),
)
.unwrap();
if froze {
saw_freeze = true;
break;
}
}
assert!(
saw_freeze,
"Small buffer should trigger at least one freeze"
);
}
// ================================================================
// Flush after mixed operations
// ================================================================
/// # Scenario
/// Flush correctly persists puts, deletes, and range-deletes that
/// accumulated in frozen memtables.
///
/// # Actions
/// 1. Put 5 keys with small buffer.
/// 2. Delete one key.
/// 3. Range-delete two keys.
/// 4. Flush all frozen.
/// 5. Verify data via get.
///
/// # Expected behavior
/// After flush, reads still return correct results — the SSTable
/// correctly encodes puts, tombstones, and range tombstones.
#[test]
fn flush_preserves_mixed_operations() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
for i in 0..20u32 {
engine
.put(
format!("mk_{:04}", i).into_bytes(),
format!("mv_{:04}", i).into_bytes(),
)
.unwrap();
}
engine.delete(b"mk_0005".to_vec()).unwrap();
engine
.delete_range(b"mk_0010".to_vec(), b"mk_0015".to_vec())
.unwrap();
engine.flush_all_frozen().unwrap();
// Point-deleted key
assert_eq!(engine.get(b"mk_0005".to_vec()).unwrap(), None);
// Range-deleted keys
for i in 10..15u32 {
assert_eq!(
engine.get(format!("mk_{:04}", i).into_bytes()).unwrap(),
None,
"mk_{:04} should be range-deleted",
i
);
}
// Surviving keys
for i in [0u32, 1, 2, 3, 4, 6, 7, 8, 9, 15, 16, 17, 18, 19] {
assert_eq!(
engine.get(format!("mk_{:04}", i).into_bytes()).unwrap(),
Some(format!("mv_{:04}", i).into_bytes()),
"mk_{:04} should survive flush",
i
);
}
}
// ================================================================
// Multiple flush rounds
// ================================================================
/// # Scenario
/// Write → flush → write → flush produces an increasing SSTable count,
/// and all data across both rounds is readable.
///
/// # Expected behavior
/// SSTable count grows with each flush round; data integrity maintained.
#[test]
fn multiple_flush_rounds() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
// Round 1
for i in 0..50u32 {
engine
.put(
format!("r1_{:04}", i).into_bytes(),
format!("v1_{:04}", i).into_bytes(),
)
.unwrap();
}
let c1 = engine.flush_all_frozen().unwrap();
assert!(c1 > 0);
let s1 = engine.stats().unwrap().sstables_count;
// Round 2
for i in 0..50u32 {
engine
.put(
format!("r2_{:04}", i).into_bytes(),
format!("v2_{:04}", i).into_bytes(),
)
.unwrap();
}
let c2 = engine.flush_all_frozen().unwrap();
assert!(c2 > 0);
let s2 = engine.stats().unwrap().sstables_count;
assert!(s2 > s1, "More SSTables after round 2");
// All data from both rounds must be readable
for i in 0..50u32 {
assert_eq!(
engine.get(format!("r1_{:04}", i).into_bytes()).unwrap(),
Some(format!("v1_{:04}", i).into_bytes()),
);
assert_eq!(
engine.get(format!("r2_{:04}", i).into_bytes()).unwrap(),
Some(format!("v2_{:04}", i).into_bytes()),
);
}
}
// ================================================================
// Crash recovery with many frozen memtables (no inline flushing)
// ================================================================
/// # Scenario
/// Many frozen memtables accumulate without flushing, then the engine
/// crashes (drop without close). On reopen, all data must be recovered.
///
/// This tests the crash recovery code path when there are potentially
/// hundreds of frozen WALs to replay (the new behaviour since inline
/// flushing was removed from the write path).
#[test]
fn crash_recovery_many_frozen_no_flush() {
let dir = TempDir::new().unwrap();
let mut expected: HashMap<Vec<u8>, Option<Vec<u8>>> = HashMap::new();
{
let engine = Engine::open(dir.path(), default_config()).unwrap();
// Put 500 keys
for i in 0..500u32 {
let key = format!("key_{:04}", i).into_bytes();
let value = format!("val_{:04}", i).into_bytes();
engine.put(key.clone(), value.clone()).unwrap();
expected.insert(key, Some(value));
}
// Point-delete some keys
for i in (0..100).step_by(3) {
let key = format!("key_{:04}", i).into_bytes();
engine.delete(key.clone()).unwrap();
expected.insert(key, None);
}
// Range-delete
let start = b"key_0200".to_vec();
let end = b"key_0250".to_vec();
engine.delete_range(start.clone(), end.clone()).unwrap();
for (k, v) in expected.iter_mut() {
if k.as_slice() >= start.as_slice() && k.as_slice() < end.as_slice() {
*v = None;
}
}
let stats = engine.stats().unwrap();
assert!(
stats.frozen_count > 0,
"Should have accumulated frozen memtables"
);
assert_eq!(
stats.sstables_count, 0,
"No flush called, should have 0 SSTables"
);
// Drop without close — simulates crash
}
let engine = reopen(dir.path());
// Verify every key
for (key, expected_val) in &expected {
let actual = engine.get(key.clone()).unwrap();
assert_eq!(
&actual,
expected_val,
"Mismatch for key {:?}",
String::from_utf8_lossy(key)
);
}
}
/// Range delete hides puts in older frozen memtables (live engine, no flush).
#[test]
fn range_delete_across_frozen_memtables() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(dir.path(), small_buffer_config()).unwrap();
// Put target key — with 128-byte buffer it'll be frozen quickly
engine
.put(b"key_0488".to_vec(), b"original".to_vec())
.unwrap();
// Write padding to push key_0488 into frozen memtables
for i in 0..200u32 {
engine
.put(
format!("pad_{:04}", i).into_bytes(),
format!("v_{}", i).into_bytes(),
)
.unwrap();
}
let stats = engine.stats().unwrap();
assert!(
stats.frozen_count > 0,
"key_0488 should be in a frozen memtable"
);
// key_0488 should be in a frozen memtable now
assert_eq!(
engine.get(b"key_0488".to_vec()).unwrap(),
Some(b"original".to_vec()),
"key_0488 must be readable from frozen memtable"
);
// Issue range delete covering key_0488
engine
.delete_range(b"key_0473".to_vec(), b"key_0490".to_vec())
.unwrap();
// Range delete should hide the put
let result = engine.get(b"key_0488".to_vec()).unwrap();
assert_eq!(result, None, "key_0488 should be hidden by range delete");
}
/// Reproduce crash recovery stress test with random ops and no inline flush.
#[test]
fn crash_recovery_stress_random_ops() {
let dir = TempDir::new().unwrap();
let mut expected: HashMap<Vec<u8>, Option<Vec<u8>>> = HashMap::new();
// Simple LCG PRNG
struct Rng(u64);
impl Rng {
fn new(seed: u64) -> Self {
Self(seed)
}
fn next_u64(&mut self) -> u64 {
self.0 = self
.0
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
self.0
}
fn next_usize(&mut self, bound: usize) -> usize {
(self.next_u64() % bound as u64) as usize
}
}
fn apply_range_delete(
expected: &mut HashMap<Vec<u8>, Option<Vec<u8>>>,
start: &[u8],
end: &[u8],
) {
let keys: Vec<Vec<u8>> = expected
.keys()
.filter(|k| k.as_slice() >= start && k.as_slice() < end)
.cloned()
.collect();
for k in keys {
expected.insert(k, None);
}
}
let mut rng = Rng::new(0xBEEF);
{
let engine = Engine::open(dir.path(), default_config()).unwrap();
let num_keys = 500;
let num_ops = 8000;
for _op_num in 0..num_ops {
let op = rng.next_usize(100);
let idx = rng.next_usize(num_keys);
let key = format!("key_{:04}", idx).into_bytes();
if op < 60 {
let value = format!("v{}_{}", idx, rng.next_u64()).into_bytes();
engine.put(key.clone(), value.clone()).unwrap();
expected.insert(key, Some(value));
} else if op < 80 {
engine.delete(key.clone()).unwrap();
expected.insert(key, None);
} else if op < 95 {
let end_idx = (idx + rng.next_usize(20) + 1).min(num_keys);
let start_key = format!("key_{:04}", idx).into_bytes();
let end_key = format!("key_{:04}", end_idx).into_bytes();
engine
.delete_range(start_key.clone(), end_key.clone())
.unwrap();
apply_range_delete(&mut expected, &start_key, &end_key);
} else {
let value = format!("ow{}_{}", idx, rng.next_u64()).into_bytes();
engine.put(key.clone(), value.clone()).unwrap();
expected.insert(key, Some(value));
}
}
// Verify all keys BEFORE crash (live engine, no flush)
for (key, expected_val) in &expected {
let actual = engine.get(key.clone()).unwrap();
assert_eq!(
&actual,
expected_val,
"Pre-crash mismatch for key {:?}",
String::from_utf8_lossy(key)
);
}
// DROP without close — simulates crash
}
// Reopen and verify all keys survived crash recovery
let engine = reopen(dir.path());
for (key, expected_val) in &expected {
let actual = engine.get(key.clone()).unwrap();
assert_eq!(
&actual,
expected_val,
"Post-crash mismatch for key {:?}",
String::from_utf8_lossy(key)
);
}
}
}