seekable_async_file/
merge.rs

1use crate::WriteRequest;
2use off64::u64;
3use off64::usz;
4use std::collections::BTreeMap;
5
6/// Merges a sequence of writes that may overlap, producing a minimal set of non-overlapping writes.
7/// Later writes win where there are overlaps.
8///
9/// Returns a BTreeMap where keys are start offsets and values are (end_offset, merged_data).
10/// The result contains only non-overlapping intervals with the final data for each byte position.
11///
12/// # Complexity
13/// - O(N log N) typical case for N writes with few overlaps
14/// - O(N² log N) worst case if every write overlaps everything
15///
16/// # Example
17/// ```ignore
18/// let writes = vec![
19///   WriteRequest::new(0, vec![1, 2, 3, 4]),
20///   WriteRequest::new(2, vec![5, 6]),  // Overlaps and wins
21/// ];
22/// let merged = merge_overlapping_writes(writes);
23/// // Result: {0 => (2, [1, 2]), 2 => (4, [5, 6])}
24/// ```
25pub fn merge_overlapping_writes<D: AsRef<[u8]> + Send + 'static>(
26  writes: impl IntoIterator<Item = WriteRequest<D>>,
27) -> BTreeMap<u64, (u64, Vec<u8>)> {
28  let mut intervals: BTreeMap<u64, (u64, Vec<u8>)> = BTreeMap::new();
29
30  for w in writes {
31    let data = w.data.as_ref();
32    let offset = w.offset;
33    let end = offset + u64!(data.len());
34
35    // Find all intervals that overlap with [offset, end).
36    // An interval [s, e) overlaps if s < end && e > offset.
37    let overlapping_keys: Vec<u64> = intervals
38      .range(..end)
39      .filter(|(_, &(old_end, _))| old_end > offset)
40      .map(|(&start, _)| start)
41      .collect();
42
43    // Remove overlapping intervals and process them.
44    let mut to_insert = Vec::new();
45
46    for key in overlapping_keys {
47      let (old_end, old_data) = intervals.remove(&key).unwrap();
48
49      // Preserve the part before our new write if it exists.
50      if key < offset {
51        let preserved_len = usz!(offset - key);
52        to_insert.push((key, offset, old_data[..preserved_len].to_vec()));
53      }
54
55      // Preserve the part after our new write if it exists.
56      if old_end > end {
57        let skip_len = usz!(end - key);
58        to_insert.push((end, old_end, old_data[skip_len..].to_vec()));
59      }
60    }
61
62    // Insert the new write.
63    to_insert.push((offset, end, data.to_vec()));
64
65    // Insert all intervals back.
66    for (start, end, data) in to_insert {
67      intervals.insert(start, (end, data));
68    }
69  }
70
71  intervals
72}
73
74#[cfg(test)]
75mod tests {
76  use super::*;
77
78  /// Helper to create a write request with Vec<u8> data
79  fn wr(offset: u64, data: Vec<u8>) -> WriteRequest<Vec<u8>> {
80    WriteRequest::new(offset, data)
81  }
82
83  /// Helper to verify the merged intervals match expected results
84  fn assert_intervals(intervals: &BTreeMap<u64, (u64, Vec<u8>)>, expected: &[(u64, u64, Vec<u8>)]) {
85    let actual: Vec<_> = intervals
86      .iter()
87      .map(|(&start, &(end, ref data))| (start, end, data.clone()))
88      .collect();
89    assert_eq!(
90      actual, expected,
91      "\nActual intervals:   {:?}\nExpected intervals: {:?}",
92      actual, expected
93    );
94  }
95
96  #[test]
97  fn test_merge_no_writes() {
98    let writes: Vec<WriteRequest<Vec<u8>>> = vec![];
99    let merged = merge_overlapping_writes(writes);
100    assert_intervals(&merged, &[]);
101  }
102
103  #[test]
104  fn test_merge_single_write() {
105    let writes = vec![wr(10, vec![1, 2, 3, 4])];
106    let merged = merge_overlapping_writes(writes);
107    assert_intervals(&merged, &[(10, 14, vec![1, 2, 3, 4])]);
108  }
109
110  #[test]
111  fn test_merge_non_overlapping_sequential() {
112    let writes = vec![wr(0, vec![1, 2]), wr(2, vec![3, 4]), wr(4, vec![5, 6])];
113    let merged = merge_overlapping_writes(writes);
114    assert_intervals(&merged, &[
115      (0, 2, vec![1, 2]),
116      (2, 4, vec![3, 4]),
117      (4, 6, vec![5, 6]),
118    ]);
119  }
120
121  #[test]
122  fn test_merge_non_overlapping_gaps() {
123    let writes = vec![wr(0, vec![1, 2]), wr(10, vec![3, 4]), wr(20, vec![5, 6])];
124    let merged = merge_overlapping_writes(writes);
125    assert_intervals(&merged, &[
126      (0, 2, vec![1, 2]),
127      (10, 12, vec![3, 4]),
128      (20, 22, vec![5, 6]),
129    ]);
130  }
131
132  #[test]
133  fn test_merge_complete_overlap_later_wins() {
134    let writes = vec![
135      wr(0, vec![1, 2, 3, 4]),
136      wr(0, vec![5, 6, 7, 8]), // Completely overwrites first
137    ];
138    let merged = merge_overlapping_writes(writes);
139    assert_intervals(&merged, &[(0, 4, vec![5, 6, 7, 8])]);
140  }
141
142  #[test]
143  fn test_merge_partial_overlap_at_end() {
144    let writes = vec![
145      wr(0, vec![1, 2, 3, 4]), // [0..4)
146      wr(2, vec![5, 6]),       // [2..4) overlaps
147    ];
148    let merged = merge_overlapping_writes(writes);
149    assert_intervals(&merged, &[(0, 2, vec![1, 2]), (2, 4, vec![5, 6])]);
150  }
151
152  #[test]
153  fn test_merge_partial_overlap_at_start() {
154    let writes = vec![
155      wr(2, vec![1, 2, 3, 4]), // [2..6)
156      wr(0, vec![5, 6]),       // [0..2) adjacent, not overlapping
157    ];
158    let merged = merge_overlapping_writes(writes);
159    assert_intervals(&merged, &[(0, 2, vec![5, 6]), (2, 6, vec![1, 2, 3, 4])]);
160  }
161
162  #[test]
163  fn test_merge_write_inside_another() {
164    let writes = vec![
165      wr(0, vec![1, 2, 3, 4, 5, 6]), // [0..6)
166      wr(2, vec![7, 8]),             // [2..4) inside
167    ];
168    let merged = merge_overlapping_writes(writes);
169    assert_intervals(&merged, &[
170      (0, 2, vec![1, 2]),
171      (2, 4, vec![7, 8]),
172      (4, 6, vec![5, 6]),
173    ]);
174  }
175
176  #[test]
177  fn test_merge_write_spans_multiple() {
178    let writes = vec![
179      wr(0, vec![1, 2]),
180      wr(2, vec![3, 4]),
181      wr(4, vec![5, 6]),
182      wr(1, vec![7, 8, 9, 10, 11]), // [1..6) spans all three
183    ];
184    let merged = merge_overlapping_writes(writes);
185    assert_intervals(&merged, &[(0, 1, vec![1]), (1, 6, vec![7, 8, 9, 10, 11])]);
186  }
187
188  #[test]
189  fn test_merge_multiple_overlaps_later_wins() {
190    let writes = vec![
191      wr(0, vec![1, 2, 3, 4]),
192      wr(1, vec![5, 6]), // [1..3) overlaps
193      wr(2, vec![7, 8]), // [2..4) overlaps previous
194    ];
195    let merged = merge_overlapping_writes(writes);
196    assert_intervals(&merged, &[
197      (0, 1, vec![1]),
198      (1, 2, vec![5]),
199      (2, 4, vec![7, 8]),
200    ]);
201  }
202
203  #[test]
204  fn test_merge_identical_offsets_later_wins() {
205    let writes = vec![
206      wr(5, vec![1, 2, 3]),
207      wr(5, vec![4, 5, 6]),
208      wr(5, vec![7, 8, 9]),
209    ];
210    let merged = merge_overlapping_writes(writes);
211    assert_intervals(&merged, &[(5, 8, vec![7, 8, 9])]);
212  }
213
214  #[test]
215  fn test_merge_complex_overlapping_sequence() {
216    let writes = vec![
217      wr(0, vec![1, 2, 3, 4, 5]), // [0..5)
218      wr(3, vec![6, 7, 8, 9]),    // [3..7) extends and overlaps
219      wr(1, vec![10, 11]),        // [1..3) overlaps first part
220      wr(6, vec![12, 13, 14]),    // [6..9) extends further
221      wr(2, vec![15]),            // [2..3) single byte
222    ];
223    let merged = merge_overlapping_writes(writes);
224    // Expected:
225    // [0..1): 1
226    // [1..2): 10
227    // [2..3): 15
228    // [3..6): 6,7,8
229    // [6..9): 12,13,14
230    assert_intervals(&merged, &[
231      (0, 1, vec![1]),
232      (1, 2, vec![10]),
233      (2, 3, vec![15]),
234      (3, 6, vec![6, 7, 8]),
235      (6, 9, vec![12, 13, 14]),
236    ]);
237  }
238
239  #[test]
240  fn test_merge_zero_length_not_supported() {
241    // Zero-length writes should just be no-ops (they create a 0-length interval)
242    let writes = vec![
243      wr(5, vec![1, 2, 3]),
244      wr(10, vec![]), // Empty write
245    ];
246    let merged = merge_overlapping_writes(writes);
247    // The empty write creates [10, 10) which is valid but has no effect
248    assert_intervals(&merged, &[(5, 8, vec![1, 2, 3]), (10, 10, vec![])]);
249  }
250
251  #[test]
252  fn test_merge_out_of_order_offsets() {
253    // Writes don't need to be in order
254    let writes = vec![wr(20, vec![5, 6]), wr(0, vec![1, 2]), wr(10, vec![3, 4])];
255    let merged = merge_overlapping_writes(writes);
256    assert_intervals(&merged, &[
257      (0, 2, vec![1, 2]),
258      (10, 12, vec![3, 4]),
259      (20, 22, vec![5, 6]),
260    ]);
261  }
262
263  #[test]
264  fn test_merge_adjacent_not_merged() {
265    // Adjacent intervals should remain separate
266    let writes = vec![
267      wr(0, vec![1, 2]),
268      wr(2, vec![3, 4]), // Starts exactly where first ends
269    ];
270    let merged = merge_overlapping_writes(writes);
271    // These are adjacent, not overlapping, so they stay separate
272    assert_intervals(&merged, &[(0, 2, vec![1, 2]), (2, 4, vec![3, 4])]);
273  }
274
275  #[test]
276  fn test_merge_large_span_over_many_small() {
277    let writes = vec![
278      wr(0, vec![1]),
279      wr(2, vec![2]),
280      wr(4, vec![3]),
281      wr(6, vec![4]),
282      wr(8, vec![5]),
283      wr(1, vec![10, 11, 12, 13, 14, 15, 16, 17]), // [1..9) covers most
284    ];
285    let merged = merge_overlapping_writes(writes);
286    assert_intervals(&merged, &[
287      (0, 1, vec![1]),
288      (1, 9, vec![10, 11, 12, 13, 14, 15, 16, 17]),
289    ]);
290  }
291
292  #[test]
293  fn test_merge_write_extends_left() {
294    let writes = vec![
295      wr(10, vec![1, 2, 3, 4]),
296      wr(5, vec![5, 6, 7, 8, 9, 10]), // [5..11) extends left and overlaps
297    ];
298    let merged = merge_overlapping_writes(writes);
299    assert_intervals(&merged, &[
300      (5, 11, vec![5, 6, 7, 8, 9, 10]),
301      (11, 14, vec![2, 3, 4]),
302    ]);
303  }
304
305  #[test]
306  fn test_merge_write_extends_right() {
307    let writes = vec![
308      wr(5, vec![1, 2, 3, 4]),
309      wr(7, vec![5, 6, 7, 8, 9]), // [7..12) extends right and overlaps
310    ];
311    let merged = merge_overlapping_writes(writes);
312    assert_intervals(&merged, &[(5, 7, vec![1, 2]), (7, 12, vec![5, 6, 7, 8, 9])]);
313  }
314
315  #[test]
316  fn test_merge_alternating_overlaps() {
317    let writes = vec![
318      wr(0, vec![1, 2, 3]),
319      wr(2, vec![4, 5, 6]),
320      wr(4, vec![7, 8, 9]),
321      wr(6, vec![10, 11, 12]),
322    ];
323    let merged = merge_overlapping_writes(writes);
324    // [0..2): 1,2
325    // [2..4): 4,5
326    // [4..6): 7,8
327    // [6..9): 10,11,12
328    assert_intervals(&merged, &[
329      (0, 2, vec![1, 2]),
330      (2, 4, vec![4, 5]),
331      (4, 6, vec![7, 8]),
332      (6, 9, vec![10, 11, 12]),
333    ]);
334  }
335
336  #[test]
337  fn test_merge_same_byte_overwritten_multiple_times() {
338    // Multiple writes to the exact same byte
339    let writes = vec![
340      wr(5, vec![1]),
341      wr(5, vec![2]),
342      wr(5, vec![3]),
343      wr(5, vec![4]),
344      wr(5, vec![5]),
345    ];
346    let merged = merge_overlapping_writes(writes);
347    assert_intervals(&merged, &[(5, 6, vec![5])]);
348  }
349
350  #[test]
351  fn test_merge_preserves_data_correctly() {
352    // Ensure data bytes are preserved correctly when splitting
353    let writes = vec![
354      wr(0, vec![10, 20, 30, 40, 50, 60, 70, 80]), // [0..8)
355      wr(2, vec![99, 88]),                         // [2..4) overwrites 30, 40
356    ];
357    let merged = merge_overlapping_writes(writes);
358    assert_intervals(&merged, &[
359      (0, 2, vec![10, 20]),
360      (2, 4, vec![99, 88]),
361      (4, 8, vec![50, 60, 70, 80]),
362    ]);
363  }
364}