write_journal/
merge.rs

1use off64::u64;
2use off64::usz;
3use std::collections::BTreeMap;
4
5/// Merges a sequence of writes that may overlap, producing a minimal set of non-overlapping writes.
6/// Later writes win where there are overlaps.
7///
8/// Returns a BTreeMap where keys are start offsets and values are (end_offset, merged_data).
9/// The result contains only non-overlapping intervals with the final data for each byte position.
10///
11/// # Complexity
12/// - O(N log N) typical case for N writes with few overlaps
13/// - O(N² log N) worst case if every write overlaps everything
14///
15/// # Example
16/// ```ignore
17/// let writes = vec![
18///   WriteRequest::new(0, vec![1, 2, 3, 4]),
19///   WriteRequest::new(2, vec![5, 6]),  // Overlaps and wins
20/// ];
21/// let merged = merge_overlapping_writes(writes);
22/// // Result: {0 => (2, [1, 2]), 2 => (4, [5, 6])}
23/// ```
24pub(crate) fn merge_overlapping_writes(
25  writes: impl IntoIterator<Item = (u64, Vec<u8>)>,
26) -> BTreeMap<u64, (u64, Vec<u8>)> {
27  let mut intervals: BTreeMap<u64, (u64, Vec<u8>)> = BTreeMap::new();
28
29  for (offset, data) in writes {
30    let end = offset + u64!(data.len());
31
32    // Find all intervals that overlap with [offset, end).
33    // An interval [s, e) overlaps if s < end && e > offset.
34    let overlapping_keys: Vec<u64> = intervals
35      .range(..end)
36      .filter(|(_, &(old_end, _))| old_end > offset)
37      .map(|(&start, _)| start)
38      .collect();
39
40    // Remove overlapping intervals and process them.
41    let mut to_insert = Vec::new();
42
43    for key in overlapping_keys {
44      let (old_end, old_data) = intervals.remove(&key).unwrap();
45
46      // Preserve the part before our new write if it exists.
47      if key < offset {
48        let preserved_len = usz!(offset - key);
49        to_insert.push((key, offset, old_data[..preserved_len].to_vec()));
50      }
51
52      // Preserve the part after our new write if it exists.
53      if old_end > end {
54        let skip_len = usz!(end - key);
55        to_insert.push((end, old_end, old_data[skip_len..].to_vec()));
56      }
57    }
58
59    // Insert the new write.
60    to_insert.push((offset, end, data.to_vec()));
61
62    // Insert all intervals back.
63    for (start, end, data) in to_insert {
64      intervals.insert(start, (end, data));
65    }
66  }
67
68  intervals
69}
70
71#[cfg(test)]
72mod tests {
73  use super::*;
74
75  /// Helper to verify the merged intervals match expected results
76  fn assert_intervals(intervals: &BTreeMap<u64, (u64, Vec<u8>)>, expected: &[(u64, u64, Vec<u8>)]) {
77    let actual: Vec<_> = intervals
78      .iter()
79      .map(|(&start, &(end, ref data))| (start, end, data.clone()))
80      .collect();
81    assert_eq!(
82      actual, expected,
83      "\nActual intervals:   {:?}\nExpected intervals: {:?}",
84      actual, expected
85    );
86  }
87
88  #[test]
89  fn test_merge_no_writes() {
90    let writes: Vec<(u64, Vec<u8>)> = vec![];
91    let merged = merge_overlapping_writes(writes);
92    assert_intervals(&merged, &[]);
93  }
94
95  #[test]
96  fn test_merge_single_write() {
97    let writes = vec![(10, vec![1, 2, 3, 4])];
98    let merged = merge_overlapping_writes(writes);
99    assert_intervals(&merged, &[(10, 14, vec![1, 2, 3, 4])]);
100  }
101
102  #[test]
103  fn test_merge_non_overlapping_sequential() {
104    let writes = vec![(0, vec![1, 2]), (2, vec![3, 4]), (4, vec![5, 6])];
105    let merged = merge_overlapping_writes(writes);
106    assert_intervals(&merged, &[
107      (0, 2, vec![1, 2]),
108      (2, 4, vec![3, 4]),
109      (4, 6, vec![5, 6]),
110    ]);
111  }
112
113  #[test]
114  fn test_merge_non_overlapping_gaps() {
115    let writes = vec![(0, vec![1, 2]), (10, vec![3, 4]), (20, vec![5, 6])];
116    let merged = merge_overlapping_writes(writes);
117    assert_intervals(&merged, &[
118      (0, 2, vec![1, 2]),
119      (10, 12, vec![3, 4]),
120      (20, 22, vec![5, 6]),
121    ]);
122  }
123
124  #[test]
125  fn test_merge_complete_overlap_later_wins() {
126    let writes = vec![
127      (0, vec![1, 2, 3, 4]),
128      (0, vec![5, 6, 7, 8]), // Completely overwrites first
129    ];
130    let merged = merge_overlapping_writes(writes);
131    assert_intervals(&merged, &[(0, 4, vec![5, 6, 7, 8])]);
132  }
133
134  #[test]
135  fn test_merge_partial_overlap_at_end() {
136    let writes = vec![
137      (0, vec![1, 2, 3, 4]), // [0..4)
138      (2, vec![5, 6]),       // [2..4) overlaps
139    ];
140    let merged = merge_overlapping_writes(writes);
141    assert_intervals(&merged, &[(0, 2, vec![1, 2]), (2, 4, vec![5, 6])]);
142  }
143
144  #[test]
145  fn test_merge_partial_overlap_at_start() {
146    let writes = vec![
147      (2, vec![1, 2, 3, 4]), // [2..6)
148      (0, vec![5, 6]),       // [0..2) adjacent, not overlapping
149    ];
150    let merged = merge_overlapping_writes(writes);
151    assert_intervals(&merged, &[(0, 2, vec![5, 6]), (2, 6, vec![1, 2, 3, 4])]);
152  }
153
154  #[test]
155  fn test_merge_write_inside_another() {
156    let writes = vec![
157      (0, vec![1, 2, 3, 4, 5, 6]), // [0..6)
158      (2, vec![7, 8]),             // [2..4) inside
159    ];
160    let merged = merge_overlapping_writes(writes);
161    assert_intervals(&merged, &[
162      (0, 2, vec![1, 2]),
163      (2, 4, vec![7, 8]),
164      (4, 6, vec![5, 6]),
165    ]);
166  }
167
168  #[test]
169  fn test_merge_write_spans_multiple() {
170    let writes = vec![
171      (0, vec![1, 2]),
172      (2, vec![3, 4]),
173      (4, vec![5, 6]),
174      (1, vec![7, 8, 9, 10, 11]), // [1..6) spans all three
175    ];
176    let merged = merge_overlapping_writes(writes);
177    assert_intervals(&merged, &[(0, 1, vec![1]), (1, 6, vec![7, 8, 9, 10, 11])]);
178  }
179
180  #[test]
181  fn test_merge_multiple_overlaps_later_wins() {
182    let writes = vec![
183      (0, vec![1, 2, 3, 4]),
184      (1, vec![5, 6]), // [1..3) overlaps
185      (2, vec![7, 8]), // [2..4) overlaps previous
186    ];
187    let merged = merge_overlapping_writes(writes);
188    assert_intervals(&merged, &[
189      (0, 1, vec![1]),
190      (1, 2, vec![5]),
191      (2, 4, vec![7, 8]),
192    ]);
193  }
194
195  #[test]
196  fn test_merge_identical_offsets_later_wins() {
197    let writes = vec![(5, vec![1, 2, 3]), (5, vec![4, 5, 6]), (5, vec![7, 8, 9])];
198    let merged = merge_overlapping_writes(writes);
199    assert_intervals(&merged, &[(5, 8, vec![7, 8, 9])]);
200  }
201
202  #[test]
203  fn test_merge_complex_overlapping_sequence() {
204    let writes = vec![
205      (0, vec![1, 2, 3, 4, 5]), // [0..5)
206      (3, vec![6, 7, 8, 9]),    // [3..7) extends and overlaps
207      (1, vec![10, 11]),        // [1..3) overlaps first part
208      (6, vec![12, 13, 14]),    // [6..9) extends further
209      (2, vec![15]),            // [2..3) single byte
210    ];
211    let merged = merge_overlapping_writes(writes);
212    // Expected:
213    // [0..1): 1
214    // [1..2): 10
215    // [2..3): 15
216    // [3..6): 6,7,8
217    // [6..9): 12,13,14
218    assert_intervals(&merged, &[
219      (0, 1, vec![1]),
220      (1, 2, vec![10]),
221      (2, 3, vec![15]),
222      (3, 6, vec![6, 7, 8]),
223      (6, 9, vec![12, 13, 14]),
224    ]);
225  }
226
227  #[test]
228  fn test_merge_zero_length_not_supported() {
229    // Zero-length writes should just be no-ops (they create a 0-length interval)
230    let writes = vec![
231      (5, vec![1, 2, 3]),
232      (10, vec![]), // Empty write
233    ];
234    let merged = merge_overlapping_writes(writes);
235    // The empty write creates [10, 10) which is valid but has no effect
236    assert_intervals(&merged, &[(5, 8, vec![1, 2, 3]), (10, 10, vec![])]);
237  }
238
239  #[test]
240  fn test_merge_out_of_order_offsets() {
241    // Writes don't need to be in order
242    let writes = vec![(20, vec![5, 6]), (0, vec![1, 2]), (10, vec![3, 4])];
243    let merged = merge_overlapping_writes(writes);
244    assert_intervals(&merged, &[
245      (0, 2, vec![1, 2]),
246      (10, 12, vec![3, 4]),
247      (20, 22, vec![5, 6]),
248    ]);
249  }
250
251  #[test]
252  fn test_merge_adjacent_not_merged() {
253    // Adjacent intervals should remain separate
254    let writes = vec![
255      (0, vec![1, 2]),
256      (2, vec![3, 4]), // Starts exactly where first ends
257    ];
258    let merged = merge_overlapping_writes(writes);
259    // These are adjacent, not overlapping, so they stay separate
260    assert_intervals(&merged, &[(0, 2, vec![1, 2]), (2, 4, vec![3, 4])]);
261  }
262
263  #[test]
264  fn test_merge_large_span_over_many_small() {
265    let writes = vec![
266      (0, vec![1]),
267      (2, vec![2]),
268      (4, vec![3]),
269      (6, vec![4]),
270      (8, vec![5]),
271      (1, vec![10, 11, 12, 13, 14, 15, 16, 17]), // [1..9) covers most
272    ];
273    let merged = merge_overlapping_writes(writes);
274    assert_intervals(&merged, &[
275      (0, 1, vec![1]),
276      (1, 9, vec![10, 11, 12, 13, 14, 15, 16, 17]),
277    ]);
278  }
279
280  #[test]
281  fn test_merge_write_extends_left() {
282    let writes = vec![
283      (10, vec![1, 2, 3, 4]),
284      (5, vec![5, 6, 7, 8, 9, 10]), // [5..11) extends left and overlaps
285    ];
286    let merged = merge_overlapping_writes(writes);
287    assert_intervals(&merged, &[
288      (5, 11, vec![5, 6, 7, 8, 9, 10]),
289      (11, 14, vec![2, 3, 4]),
290    ]);
291  }
292
293  #[test]
294  fn test_merge_write_extends_right() {
295    let writes = vec![
296      (5, vec![1, 2, 3, 4]),
297      (7, vec![5, 6, 7, 8, 9]), // [7..12) extends right and overlaps
298    ];
299    let merged = merge_overlapping_writes(writes);
300    assert_intervals(&merged, &[(5, 7, vec![1, 2]), (7, 12, vec![5, 6, 7, 8, 9])]);
301  }
302
303  #[test]
304  fn test_merge_alternating_overlaps() {
305    let writes = vec![
306      (0, vec![1, 2, 3]),
307      (2, vec![4, 5, 6]),
308      (4, vec![7, 8, 9]),
309      (6, vec![10, 11, 12]),
310    ];
311    let merged = merge_overlapping_writes(writes);
312    // [0..2): 1,2
313    // [2..4): 4,5
314    // [4..6): 7,8
315    // [6..9): 10,11,12
316    assert_intervals(&merged, &[
317      (0, 2, vec![1, 2]),
318      (2, 4, vec![4, 5]),
319      (4, 6, vec![7, 8]),
320      (6, 9, vec![10, 11, 12]),
321    ]);
322  }
323
324  #[test]
325  fn test_merge_same_byte_overwritten_multiple_times() {
326    // Multiple writes to the exact same byte
327    let writes = vec![
328      (5, vec![1]),
329      (5, vec![2]),
330      (5, vec![3]),
331      (5, vec![4]),
332      (5, vec![5]),
333    ];
334    let merged = merge_overlapping_writes(writes);
335    assert_intervals(&merged, &[(5, 6, vec![5])]);
336  }
337
338  #[test]
339  fn test_merge_preserves_data_correctly() {
340    // Ensure data bytes are preserved correctly when splitting
341    let writes = vec![
342      (0, vec![10, 20, 30, 40, 50, 60, 70, 80]), // [0..8)
343      (2, vec![99, 88]),                         // [2..4) overwrites 30, 40
344    ];
345    let merged = merge_overlapping_writes(writes);
346    assert_intervals(&merged, &[
347      (0, 2, vec![10, 20]),
348      (2, 4, vec![99, 88]),
349      (4, 8, vec![50, 60, 70, 80]),
350    ]);
351  }
352}