write_journal/
merge.rs

1use off64::u64;
2use off64::usz;
3use std::collections::BTreeMap;
4
5/// Merges a sequence of writes that may overlap, producing a minimal set of non-overlapping writes.
6/// Later writes win where there are overlaps.
7///
8/// Returns a BTreeMap where keys are start offsets and values are the merged data.
9/// The result contains only non-overlapping intervals with the final data for each byte position.
10///
11/// # Complexity
12/// - O(N log N) typical case for N writes with few overlaps
13/// - O(N² log N) worst case if every write overlaps everything
14///
15/// # Example
16/// ```ignore
17/// let writes = vec![
18///   WriteRequest::new(0, vec![1, 2, 3, 4]),
19///   WriteRequest::new(2, vec![5, 6]),  // Overlaps and wins
20/// ];
21/// let merged = merge_overlapping_writes(writes);
22/// // Result: {0 => [1, 2], 2 => [5, 6]}
23/// ```
24pub(crate) fn merge_overlapping_writes(
25  writes: impl IntoIterator<Item = (u64, Vec<u8>)>,
26) -> BTreeMap<u64, Vec<u8>> {
27  let mut intervals: BTreeMap<u64, Vec<u8>> = BTreeMap::new();
28
29  for (offset, data) in writes {
30    let end = offset + u64!(data.len());
31
32    // Find all intervals that overlap with [offset, end).
33    // An interval [s, e) overlaps if s < end && e > offset.
34    let overlapping_keys: Vec<u64> = intervals
35      .range(..end)
36      .filter(|(&start, old_data)| start + u64!(old_data.len()) > offset)
37      .map(|(&start, _)| start)
38      .collect();
39
40    // Remove overlapping intervals and process them.
41    let mut to_insert = Vec::new();
42
43    for key in overlapping_keys {
44      let old_data = intervals.remove(&key).unwrap();
45      let old_end = key + u64!(old_data.len());
46
47      // Preserve the part before our new write if it exists.
48      if key < offset {
49        let preserved_len = usz!(offset - key);
50        to_insert.push((key, old_data[..preserved_len].to_vec()));
51      }
52
53      // Preserve the part after our new write if it exists.
54      if old_end > end {
55        let skip_len = usz!(end - key);
56        to_insert.push((end, old_data[skip_len..].to_vec()));
57      }
58    }
59
60    // Insert the new write.
61    to_insert.push((offset, data.to_vec()));
62
63    // Insert all intervals back.
64    for (start, data) in to_insert {
65      intervals.insert(start, data);
66    }
67  }
68
69  intervals
70}
71
72#[cfg(test)]
73mod tests {
74  use super::*;
75
76  /// Helper to verify the merged intervals match expected results
77  fn assert_intervals(intervals: &BTreeMap<u64, Vec<u8>>, expected: &[(u64, Vec<u8>)]) {
78    let actual: Vec<_> = intervals
79      .iter()
80      .map(|(&start, data)| (start, data.clone()))
81      .collect();
82    assert_eq!(
83      actual, expected,
84      "\nActual intervals:   {:?}\nExpected intervals: {:?}",
85      actual, expected
86    );
87  }
88
89  #[test]
90  fn test_merge_no_writes() {
91    let writes: Vec<(u64, Vec<u8>)> = vec![];
92    let merged = merge_overlapping_writes(writes);
93    assert_intervals(&merged, &[]);
94  }
95
96  #[test]
97  fn test_merge_single_write() {
98    let writes = vec![(10, vec![1, 2, 3, 4])];
99    let merged = merge_overlapping_writes(writes);
100    assert_intervals(&merged, &[(10, vec![1, 2, 3, 4])]);
101  }
102
103  #[test]
104  fn test_merge_non_overlapping_sequential() {
105    let writes = vec![(0, vec![1, 2]), (2, vec![3, 4]), (4, vec![5, 6])];
106    let merged = merge_overlapping_writes(writes);
107    assert_intervals(&merged, &[
108      (0, vec![1, 2]),
109      (2, vec![3, 4]),
110      (4, vec![5, 6]),
111    ]);
112  }
113
114  #[test]
115  fn test_merge_non_overlapping_gaps() {
116    let writes = vec![(0, vec![1, 2]), (10, vec![3, 4]), (20, vec![5, 6])];
117    let merged = merge_overlapping_writes(writes);
118    assert_intervals(&merged, &[
119      (0, vec![1, 2]),
120      (10, vec![3, 4]),
121      (20, vec![5, 6]),
122    ]);
123  }
124
125  #[test]
126  fn test_merge_complete_overlap_later_wins() {
127    let writes = vec![
128      (0, vec![1, 2, 3, 4]),
129      (0, vec![5, 6, 7, 8]), // Completely overwrites first
130    ];
131    let merged = merge_overlapping_writes(writes);
132    assert_intervals(&merged, &[(0, vec![5, 6, 7, 8])]);
133  }
134
135  #[test]
136  fn test_merge_partial_overlap_at_end() {
137    let writes = vec![
138      (0, vec![1, 2, 3, 4]), // [0..4)
139      (2, vec![5, 6]),       // [2..4) overlaps
140    ];
141    let merged = merge_overlapping_writes(writes);
142    assert_intervals(&merged, &[(0, vec![1, 2]), (2, vec![5, 6])]);
143  }
144
145  #[test]
146  fn test_merge_partial_overlap_at_start() {
147    let writes = vec![
148      (2, vec![1, 2, 3, 4]), // [2..6)
149      (0, vec![5, 6]),       // [0..2) adjacent, not overlapping
150    ];
151    let merged = merge_overlapping_writes(writes);
152    assert_intervals(&merged, &[(0, vec![5, 6]), (2, vec![1, 2, 3, 4])]);
153  }
154
155  #[test]
156  fn test_merge_write_inside_another() {
157    let writes = vec![
158      (0, vec![1, 2, 3, 4, 5, 6]), // [0..6)
159      (2, vec![7, 8]),             // [2..4) inside
160    ];
161    let merged = merge_overlapping_writes(writes);
162    assert_intervals(&merged, &[
163      (0, vec![1, 2]),
164      (2, vec![7, 8]),
165      (4, vec![5, 6]),
166    ]);
167  }
168
169  #[test]
170  fn test_merge_write_spans_multiple() {
171    let writes = vec![
172      (0, vec![1, 2]),
173      (2, vec![3, 4]),
174      (4, vec![5, 6]),
175      (1, vec![7, 8, 9, 10, 11]), // [1..6) spans all three
176    ];
177    let merged = merge_overlapping_writes(writes);
178    assert_intervals(&merged, &[(0, vec![1]), (1, vec![7, 8, 9, 10, 11])]);
179  }
180
181  #[test]
182  fn test_merge_multiple_overlaps_later_wins() {
183    let writes = vec![
184      (0, vec![1, 2, 3, 4]),
185      (1, vec![5, 6]), // [1..3) overlaps
186      (2, vec![7, 8]), // [2..4) overlaps previous
187    ];
188    let merged = merge_overlapping_writes(writes);
189    assert_intervals(&merged, &[
190      (0, vec![1]),
191      (1, vec![5]),
192      (2, vec![7, 8]),
193    ]);
194  }
195
196  #[test]
197  fn test_merge_identical_offsets_later_wins() {
198    let writes = vec![(5, vec![1, 2, 3]), (5, vec![4, 5, 6]), (5, vec![7, 8, 9])];
199    let merged = merge_overlapping_writes(writes);
200    assert_intervals(&merged, &[(5, vec![7, 8, 9])]);
201  }
202
203  #[test]
204  fn test_merge_complex_overlapping_sequence() {
205    let writes = vec![
206      (0, vec![1, 2, 3, 4, 5]), // [0..5)
207      (3, vec![6, 7, 8, 9]),    // [3..7) extends and overlaps
208      (1, vec![10, 11]),        // [1..3) overlaps first part
209      (6, vec![12, 13, 14]),    // [6..9) extends further
210      (2, vec![15]),            // [2..3) single byte
211    ];
212    let merged = merge_overlapping_writes(writes);
213    // Expected:
214    // [0..1): 1
215    // [1..2): 10
216    // [2..3): 15
217    // [3..6): 6,7,8
218    // [6..9): 12,13,14
219    assert_intervals(&merged, &[
220      (0, vec![1]),
221      (1, vec![10]),
222      (2, vec![15]),
223      (3, vec![6, 7, 8]),
224      (6, vec![12, 13, 14]),
225    ]);
226  }
227
228  #[test]
229  fn test_merge_zero_length_not_supported() {
230    // Zero-length writes should just be no-ops (they create a 0-length interval)
231    let writes = vec![
232      (5, vec![1, 2, 3]),
233      (10, vec![]), // Empty write
234    ];
235    let merged = merge_overlapping_writes(writes);
236    // The empty write creates [10, 10) which is valid but has no effect
237    assert_intervals(&merged, &[(5, vec![1, 2, 3]), (10, vec![])]);
238  }
239
240  #[test]
241  fn test_merge_out_of_order_offsets() {
242    // Writes don't need to be in order
243    let writes = vec![(20, vec![5, 6]), (0, vec![1, 2]), (10, vec![3, 4])];
244    let merged = merge_overlapping_writes(writes);
245    assert_intervals(&merged, &[
246      (0, vec![1, 2]),
247      (10, vec![3, 4]),
248      (20, vec![5, 6]),
249    ]);
250  }
251
252  #[test]
253  fn test_merge_adjacent_not_merged() {
254    // Adjacent intervals should remain separate
255    let writes = vec![
256      (0, vec![1, 2]),
257      (2, vec![3, 4]), // Starts exactly where first ends
258    ];
259    let merged = merge_overlapping_writes(writes);
260    // These are adjacent, not overlapping, so they stay separate
261    assert_intervals(&merged, &[(0, vec![1, 2]), (2, vec![3, 4])]);
262  }
263
264  #[test]
265  fn test_merge_large_span_over_many_small() {
266    let writes = vec![
267      (0, vec![1]),
268      (2, vec![2]),
269      (4, vec![3]),
270      (6, vec![4]),
271      (8, vec![5]),
272      (1, vec![10, 11, 12, 13, 14, 15, 16, 17]), // [1..9) covers most
273    ];
274    let merged = merge_overlapping_writes(writes);
275    assert_intervals(&merged, &[
276      (0, vec![1]),
277      (1, vec![10, 11, 12, 13, 14, 15, 16, 17]),
278    ]);
279  }
280
281  #[test]
282  fn test_merge_write_extends_left() {
283    let writes = vec![
284      (10, vec![1, 2, 3, 4]),
285      (5, vec![5, 6, 7, 8, 9, 10]), // [5..11) extends left and overlaps
286    ];
287    let merged = merge_overlapping_writes(writes);
288    assert_intervals(&merged, &[
289      (5, vec![5, 6, 7, 8, 9, 10]),
290      (11, vec![2, 3, 4]),
291    ]);
292  }
293
294  #[test]
295  fn test_merge_write_extends_right() {
296    let writes = vec![
297      (5, vec![1, 2, 3, 4]),
298      (7, vec![5, 6, 7, 8, 9]), // [7..12) extends right and overlaps
299    ];
300    let merged = merge_overlapping_writes(writes);
301    assert_intervals(&merged, &[(5, vec![1, 2]), (7, vec![5, 6, 7, 8, 9])]);
302  }
303
304  #[test]
305  fn test_merge_alternating_overlaps() {
306    let writes = vec![
307      (0, vec![1, 2, 3]),
308      (2, vec![4, 5, 6]),
309      (4, vec![7, 8, 9]),
310      (6, vec![10, 11, 12]),
311    ];
312    let merged = merge_overlapping_writes(writes);
313    // [0..2): 1,2
314    // [2..4): 4,5
315    // [4..6): 7,8
316    // [6..9): 10,11,12
317    assert_intervals(&merged, &[
318      (0, vec![1, 2]),
319      (2, vec![4, 5]),
320      (4, vec![7, 8]),
321      (6, vec![10, 11, 12]),
322    ]);
323  }
324
325  #[test]
326  fn test_merge_same_byte_overwritten_multiple_times() {
327    // Multiple writes to the exact same byte
328    let writes = vec![
329      (5, vec![1]),
330      (5, vec![2]),
331      (5, vec![3]),
332      (5, vec![4]),
333      (5, vec![5]),
334    ];
335    let merged = merge_overlapping_writes(writes);
336    assert_intervals(&merged, &[(5, vec![5])]);
337  }
338
339  #[test]
340  fn test_merge_preserves_data_correctly() {
341    // Ensure data bytes are preserved correctly when splitting
342    let writes = vec![
343      (0, vec![10, 20, 30, 40, 50, 60, 70, 80]), // [0..8)
344      (2, vec![99, 88]),                         // [2..4) overwrites 30, 40
345    ];
346    let merged = merge_overlapping_writes(writes);
347    assert_intervals(&merged, &[
348      (0, vec![10, 20]),
349      (2, vec![99, 88]),
350      (4, vec![50, 60, 70, 80]),
351    ]);
352  }
353}