manifold/column_family/
partitioned_backend.rs

1use crate::StorageBackend;
2use crate::column_family::header::Segment;
3use std::fmt::{Debug, Formatter};
4use std::io;
5use std::sync::{Arc, Mutex, RwLock};
6
7/// A storage backend that operates on one or more segments within an underlying storage backend.
8///
9/// This backend supports multi-segment column families where data can be stored in non-contiguous
10/// regions of the file. This enables instant growth without moving existing data - new segments
11/// are simply appended at the end of the file.
12///
13/// # Virtual Offset Translation
14///
15/// The backend presents a continuous virtual address space to the caller, mapping it to
16/// physical segments transparently:
17/// - Virtual offset 0-1GB might map to physical offset 4KB-1GB (segment 1)
18/// - Virtual offset 1GB-1.5GB might map to physical offset 5GB-5.5GB (segment 2)
19///
20/// # Auto-Expansion
21///
22/// When a write would exceed the total capacity of all segments, the backend can automatically
23/// request a new segment via the expansion callback. This makes growth transparent to the
24/// Database instance.
25///
26/// # Example
27///
28/// ```ignore
29/// use manifold::column_family::{PartitionedStorageBackend, Segment};
30/// use manifold::backends::FileBackend;
31/// use std::sync::Arc;
32///
33/// let file_backend = Arc::new(FileBackend::new(file)?);
34///
35/// let segments = vec![
36///     Segment::new(4096, 1024 * 1024 * 1024),  // 1GB at 4KB
37///     Segment::new(5 * 1024 * 1024 * 1024, 512 * 1024 * 1024), // 512MB at 5GB
38/// ];
39///
40/// let backend = PartitionedStorageBackend::with_segments(
41///     file_backend,
42///     segments,
43///     None, // No auto-expansion
44/// );
45/// ```
46pub struct PartitionedStorageBackend {
47    inner: Arc<dyn StorageBackend>,
48    segments: Arc<RwLock<Vec<Segment>>>,
49    expansion_callback: Option<Arc<dyn Fn(u64) -> io::Result<Segment> + Send + Sync>>,
50    /// Global lock for file growth operations. Shared across all `PartitionedStorageBackend`
51    /// instances using the same underlying file to prevent race conditions during concurrent
52    /// `set_len()` calls.
53    file_growth_lock: Arc<Mutex<()>>,
54}
55
56impl PartitionedStorageBackend {
57    /// Creates a new partitioned storage backend with a single segment (for backward compatibility).
58    ///
59    /// # Arguments
60    ///
61    /// * `inner` - The underlying storage backend (wrapped in Arc for sharing)
62    /// * `partition_offset` - The absolute byte offset where this partition begins
63    /// * `partition_size` - The size of this partition in bytes
64    ///
65    /// # Panics
66    ///
67    /// Panics if `partition_offset + partition_size` would overflow `u64`.
68    pub fn new(inner: Arc<dyn StorageBackend>, partition_offset: u64, partition_size: u64) -> Self {
69        // Verify no overflow in partition bounds
70        partition_offset
71            .checked_add(partition_size)
72            .expect("partition_offset + partition_size overflows u64");
73
74        Self {
75            inner,
76            segments: Arc::new(RwLock::new(vec![Segment::new(
77                partition_offset,
78                partition_size,
79            )])),
80            expansion_callback: None,
81            file_growth_lock: Arc::new(Mutex::new(())),
82        }
83    }
84
85    /// Creates a new partitioned storage backend with multiple segments.
86    ///
87    /// # Arguments
88    ///
89    /// * `inner` - The underlying storage backend (wrapped in Arc for sharing)
90    /// * `segments` - Vector of segments making up this partition
91    /// * `expansion_callback` - Optional callback to request new segments for auto-expansion
92    pub fn with_segments(
93        inner: Arc<dyn StorageBackend>,
94        segments: Vec<Segment>,
95        expansion_callback: Option<Arc<dyn Fn(u64) -> io::Result<Segment> + Send + Sync>>,
96        file_growth_lock: Arc<Mutex<()>>,
97    ) -> Self {
98        Self {
99            inner,
100            segments: Arc::new(RwLock::new(segments)),
101            expansion_callback,
102            file_growth_lock,
103        }
104    }
105
106    /// Returns the total size of all segments (virtual address space size).
107    fn total_size(&self) -> u64 {
108        let segments = self.segments.read().unwrap();
109        segments.iter().map(|s| s.size).sum()
110    }
111
112    /// Maps a virtual offset to a physical offset in a specific segment.
113    ///
114    /// Returns `(physical_offset, remaining_in_segment)` on success.
115    fn virtual_to_physical(&self, virtual_offset: u64) -> io::Result<(u64, u64)> {
116        let segments = self.segments.read().unwrap();
117        let mut current_virtual = 0u64;
118
119        for segment in segments.iter() {
120            let segment_end = current_virtual + segment.size;
121
122            if virtual_offset < segment_end {
123                // Found the segment containing this virtual offset
124                let offset_in_segment = virtual_offset - current_virtual;
125                let physical_offset = segment.offset + offset_in_segment;
126                let remaining_in_segment = segment.size - offset_in_segment;
127                return Ok((physical_offset, remaining_in_segment));
128            }
129
130            current_virtual = segment_end;
131        }
132
133        Err(io::Error::new(
134            io::ErrorKind::InvalidInput,
135            format!("virtual offset {virtual_offset} exceeds total size {current_virtual}"),
136        ))
137    }
138
139    /// Attempts to expand the partition by requesting a new segment.
140    fn try_expand(&self, requested_size: u64) -> io::Result<()> {
141        if let Some(callback) = &self.expansion_callback {
142            let new_segment = callback(requested_size)?;
143            let mut segments = self.segments.write().unwrap();
144            segments.push(new_segment);
145            Ok(())
146        } else {
147            Err(io::Error::new(
148                io::ErrorKind::InvalidInput,
149                "cannot expand partition: no expansion callback configured",
150            ))
151        }
152    }
153}
154
155impl Debug for PartitionedStorageBackend {
156    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
157        let segments = self.segments.read().unwrap();
158        f.debug_struct("PartitionedStorageBackend")
159            .field("segment_count", &segments.len())
160            .field("total_size", &self.total_size())
161            .finish_non_exhaustive()
162    }
163}
164
165impl StorageBackend for PartitionedStorageBackend {
166    fn len(&self) -> io::Result<u64> {
167        // Return the actual allocated length across all segments
168        let underlying_len = self.inner.len()?;
169        let segments = self.segments.read().unwrap();
170
171        let mut total_allocated = 0u64;
172
173        for segment in segments.iter() {
174            if underlying_len <= segment.offset {
175                // This segment hasn't been allocated yet
176                break;
177            }
178
179            let segment_allocated = (underlying_len - segment.offset).min(segment.size);
180            total_allocated += segment_allocated;
181
182            // If this segment isn't fully allocated, stop counting
183            if segment_allocated < segment.size {
184                break;
185            }
186        }
187
188        Ok(total_allocated)
189    }
190
191    fn read(&self, offset: u64, out: &mut [u8]) -> io::Result<()> {
192        let mut bytes_read = 0;
193        let mut current_offset = offset;
194
195        while bytes_read < out.len() {
196            let (physical_offset, remaining_in_segment) =
197                self.virtual_to_physical(current_offset)?;
198
199            #[allow(clippy::cast_possible_truncation)]
200            let bytes_to_read =
201                (out.len() - bytes_read).min(remaining_in_segment.min(usize::MAX as u64) as usize);
202
203            self.inner.read(
204                physical_offset,
205                &mut out[bytes_read..bytes_read + bytes_to_read],
206            )?;
207
208            bytes_read += bytes_to_read;
209            current_offset += bytes_to_read as u64;
210        }
211
212        Ok(())
213    }
214
215    fn set_len(&self, len: u64) -> io::Result<()> {
216        let current_total = self.total_size();
217
218        // If requested length exceeds current capacity, try to expand
219        if len > current_total {
220            let needed = len - current_total;
221            // Add 10% buffer to reduce frequent small expansions
222            let allocation_size = needed + (needed / 10).max(1024 * 1024); // At least 1MB buffer
223            self.try_expand(allocation_size)?;
224        }
225
226        // CRITICAL SECTION: Calculate physical size and grow file atomically
227        // Calculate the maximum physical end we need to allocate
228        let max_physical_end = {
229            let segments = self.segments.read().unwrap();
230            let mut remaining = len;
231            let mut max_physical_end = 0u64;
232
233            for segment in segments.iter() {
234                if remaining == 0 {
235                    break;
236                }
237
238                let used_in_segment = remaining.min(segment.size);
239                let physical_end = segment.offset + used_in_segment;
240                max_physical_end = max_physical_end.max(physical_end);
241
242                remaining = remaining.saturating_sub(used_in_segment);
243            }
244
245            max_physical_end
246        };
247
248        // CRITICAL: Serialize file growth across all column families sharing this file
249        // Multiple PartitionedStorageBackend instances may wrap the same file via different
250        // FileBackend handles from the pool. Without this lock, concurrent `set_len()` calls
251        // can race, causing assertion failures where header claims file is larger than actual.
252        let _growth_lock = self.file_growth_lock.lock().unwrap();
253
254        // Only grow the underlying storage if needed (no-shrink policy)
255        let current_underlying_len = self.inner.len()?;
256        if max_physical_end > current_underlying_len {
257            self.inner.set_len(max_physical_end)?;
258        }
259        // Lock released here - file growth complete
260
261        Ok(())
262    }
263
264    fn sync_data(&self) -> io::Result<()> {
265        self.inner.sync_data()
266    }
267
268    fn write(&self, offset: u64, data: &[u8]) -> io::Result<()> {
269        let mut bytes_written = 0;
270        let mut current_offset = offset;
271
272        while bytes_written < data.len() {
273            let (physical_offset, remaining_in_segment) =
274                self.virtual_to_physical(current_offset)?;
275
276            #[allow(clippy::cast_possible_truncation)]
277            let bytes_to_write = (data.len() - bytes_written)
278                .min(remaining_in_segment.min(usize::MAX as u64) as usize);
279
280            self.inner.write(
281                physical_offset,
282                &data[bytes_written..bytes_written + bytes_to_write],
283            )?;
284
285            bytes_written += bytes_to_write;
286            current_offset += bytes_to_write as u64;
287        }
288
289        Ok(())
290    }
291
292    fn close(&self) -> io::Result<()> {
293        // Do not close the underlying storage, as other partitions may still be using it
294        Ok(())
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::backends::InMemoryBackend;
302
303    #[test]
304    fn test_single_segment_len() {
305        let inner = Arc::new(InMemoryBackend::new());
306        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
307
308        // Initially, partition has no allocated storage
309        assert_eq!(backend.len().unwrap(), 0);
310
311        // After setting length, it should return the allocated size
312        backend.set_len(3000).unwrap();
313        assert_eq!(backend.len().unwrap(), 3000);
314
315        backend.set_len(5000).unwrap();
316        assert_eq!(backend.len().unwrap(), 5000);
317    }
318
319    #[test]
320    fn test_read_write_with_offset_translation() {
321        let inner = Arc::new(InMemoryBackend::new());
322        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
323
324        // Pre-size the underlying storage to accommodate writes
325        backend.set_len(200).unwrap();
326
327        // Write data at partition offset 100
328        let write_data = b"hello world";
329        backend.write(100, write_data).unwrap();
330
331        // Verify it was written to absolute offset 1100 in underlying storage
332        let mut verify_buf = vec![0u8; write_data.len()];
333        inner.read(1100, &mut verify_buf).unwrap();
334        assert_eq!(&verify_buf, write_data);
335
336        // Read back through the partition
337        let mut read_buf = vec![0u8; write_data.len()];
338        backend.read(100, &mut read_buf).unwrap();
339        assert_eq!(&read_buf, write_data);
340    }
341
342    #[test]
343    fn test_read_at_offset_zero() {
344        let inner = Arc::new(InMemoryBackend::new());
345        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
346
347        // Pre-size the underlying storage
348        backend.set_len(100).unwrap();
349
350        let write_data = b"start";
351        backend.write(0, write_data).unwrap();
352
353        // Verify translation to absolute offset 1000
354        let mut verify_buf = vec![0u8; write_data.len()];
355        inner.read(1000, &mut verify_buf).unwrap();
356        assert_eq!(&verify_buf, write_data);
357    }
358
359    #[test]
360    fn test_read_at_partition_end() {
361        let inner = Arc::new(InMemoryBackend::new());
362        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
363
364        // Pre-size to full partition
365        backend.set_len(5000).unwrap();
366
367        // Write exactly at the end of the partition (offset 4995, length 5)
368        let write_data = b"end!_";
369        backend.write(4995, write_data).unwrap();
370
371        let mut read_buf = vec![0u8; write_data.len()];
372        backend.read(4995, &mut read_buf).unwrap();
373        assert_eq!(&read_buf, write_data);
374    }
375
376    #[test]
377    fn test_write_exceeds_partition_size() {
378        let inner = Arc::new(InMemoryBackend::new());
379        let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
380
381        // Try to write beyond partition size
382        let write_data = b"overflow";
383        let result = backend.write(4996, write_data);
384
385        assert!(result.is_err());
386        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
387    }
388
389    #[test]
390    fn test_read_exceeds_partition_size() {
391        let inner = Arc::new(InMemoryBackend::new());
392        let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
393
394        let mut buf = vec![0u8; 100];
395        let result = backend.read(4950, &mut buf);
396
397        assert!(result.is_err());
398        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
399    }
400
401    #[test]
402    fn test_set_len_within_partition() {
403        let inner = Arc::new(InMemoryBackend::new());
404        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
405
406        backend.set_len(3000).unwrap();
407
408        // Underlying storage should be extended to 1000 + 3000 = 4000
409        assert_eq!(inner.len().unwrap(), 4000);
410    }
411
412    #[test]
413    fn test_set_len_without_expansion_callback() {
414        let inner = Arc::new(InMemoryBackend::new());
415        let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
416
417        // Without expansion callback, cannot exceed initial size
418        let result = backend.set_len(6000);
419        assert!(result.is_err());
420    }
421
422    #[test]
423    fn test_multiple_partitions_same_storage() {
424        let inner = Arc::new(InMemoryBackend::new());
425
426        let partition1 = PartitionedStorageBackend::new(inner.clone(), 0, 1000);
427        let partition2 = PartitionedStorageBackend::new(inner.clone(), 1000, 1000);
428
429        // Pre-size both partitions
430        partition1.set_len(200).unwrap();
431        partition2.set_len(200).unwrap();
432
433        // Write to partition 1
434        partition1.write(100, b"partition1").unwrap();
435
436        // Write to partition 2
437        partition2.write(100, b"partition2").unwrap();
438
439        // Read back from partition 1
440        let mut buf1 = vec![0u8; 10];
441        partition1.read(100, &mut buf1).unwrap();
442        assert_eq!(&buf1, b"partition1");
443
444        // Read back from partition 2
445        let mut buf2 = vec![0u8; 10];
446        partition2.read(100, &mut buf2).unwrap();
447        assert_eq!(&buf2, b"partition2");
448
449        // Verify they're at different absolute offsets
450        let mut verify1 = vec![0u8; 10];
451        inner.read(100, &mut verify1).unwrap();
452        assert_eq!(&verify1, b"partition1");
453
454        let mut verify2 = vec![0u8; 10];
455        inner.read(1100, &mut verify2).unwrap();
456        assert_eq!(&verify2, b"partition2");
457    }
458
459    #[test]
460    fn test_partition_isolation() {
461        let inner = Arc::new(InMemoryBackend::new());
462
463        let partition1 = PartitionedStorageBackend::new(inner.clone(), 0, 1000);
464        let partition2 = PartitionedStorageBackend::new(inner.clone(), 1000, 1000);
465
466        // Each partition initially reports 0 length
467        assert_eq!(partition1.len().unwrap(), 0);
468        assert_eq!(partition2.len().unwrap(), 0);
469
470        // Grow partition 1
471        partition1.set_len(800).unwrap();
472
473        // Partition 1 should report its allocated size
474        assert_eq!(partition1.len().unwrap(), 800);
475
476        // Partition 2 should still report 0 (not affected by partition 1)
477        assert_eq!(partition2.len().unwrap(), 0);
478    }
479
480    #[test]
481    fn test_sync_delegates_to_inner() {
482        let inner = Arc::new(InMemoryBackend::new());
483        let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
484
485        // sync_data should not error (InMemoryBackend sync is no-op)
486        assert!(backend.sync_data().is_ok());
487    }
488
489    #[test]
490    fn test_close_does_not_close_inner() {
491        let inner = Arc::new(InMemoryBackend::new());
492        let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
493
494        // Close the partition
495        backend.close().unwrap();
496
497        // Inner should still be usable (pre-size it first for the read test)
498        inner.set_len(100).unwrap();
499        let mut buf = vec![0u8; 10];
500        assert!(inner.read(0, &mut buf).is_ok());
501    }
502
503    #[test]
504    #[should_panic(expected = "partition_offset + partition_size overflows u64")]
505    fn test_construction_overflow_panics() {
506        let inner = Arc::new(InMemoryBackend::new());
507        PartitionedStorageBackend::new(inner, u64::MAX, 1);
508    }
509
510    #[test]
511    fn test_multi_segment_read_write() {
512        let inner = Arc::new(InMemoryBackend::new());
513
514        let segments = vec![
515            Segment::new(1000, 1000),  // Virtual 0-1000 -> Physical 1000-2000
516            Segment::new(5000, 1000),  // Virtual 1000-2000 -> Physical 5000-6000
517            Segment::new(10000, 1000), // Virtual 2000-3000 -> Physical 10000-11000
518        ];
519
520        let backend = PartitionedStorageBackend::with_segments(
521            inner.clone(),
522            segments,
523            None,
524            Arc::new(Mutex::new(())),
525        );
526        backend.set_len(3000).unwrap();
527
528        // Write data that spans multiple segments
529        // Create 200 bytes of data so it definitely spans segments
530        let mut data = Vec::new();
531        for i in 0u8..200 {
532            data.push(i);
533        }
534
535        // Write starting at virtual offset 900 (100 bytes before segment boundary)
536        backend.write(900, &data).unwrap();
537
538        // Read it back
539        let mut read_buf = vec![0u8; data.len()];
540        backend.read(900, &mut read_buf).unwrap();
541        assert_eq!(&read_buf, &data);
542
543        // Verify it was written to correct physical locations
544        // Virtual 900-1000 (100 bytes) -> Physical 1900-2000 (end of segment 1)
545        // Virtual 1000-1100 (100 bytes) -> Physical 5000-5100 (start of segment 2)
546        let first_segment_bytes = 100; // bytes from virtual 900-1000
547
548        let mut verify1 = vec![0u8; first_segment_bytes];
549        inner.read(1900, &mut verify1).unwrap(); // Physical 1000 + 900 = 1900
550        assert_eq!(&verify1, &data[..first_segment_bytes]);
551
552        let mut verify2 = vec![0u8; 100]; // Next 100 bytes in segment 2
553        inner.read(5000, &mut verify2).unwrap(); // Start of segment 2
554        assert_eq!(
555            &verify2,
556            &data[first_segment_bytes..first_segment_bytes + 100]
557        );
558    }
559
560    #[test]
561    fn test_multi_segment_total_size() {
562        let inner = Arc::new(InMemoryBackend::new());
563
564        let segments = vec![
565            Segment::new(1000, 1024),
566            Segment::new(5000, 2048),
567            Segment::new(10000, 512),
568        ];
569
570        let backend = PartitionedStorageBackend::with_segments(
571            inner,
572            segments,
573            None,
574            Arc::new(Mutex::new(())),
575        );
576        assert_eq!(backend.total_size(), 1024 + 2048 + 512);
577    }
578
579    #[test]
580    fn test_virtual_to_physical_mapping() {
581        let inner = Arc::new(InMemoryBackend::new());
582
583        let segments = vec![Segment::new(4096, 1000), Segment::new(8192, 500)];
584
585        let backend = PartitionedStorageBackend::with_segments(
586            inner,
587            segments,
588            None,
589            Arc::new(Mutex::new(())),
590        );
591
592        // Virtual offset 0 -> Physical 4096
593        let (phys, rem) = backend.virtual_to_physical(0).unwrap();
594        assert_eq!(phys, 4096);
595        assert_eq!(rem, 1000);
596
597        // Virtual offset 999 -> Physical 5095 (end of first segment)
598        let (phys, rem) = backend.virtual_to_physical(999).unwrap();
599        assert_eq!(phys, 5095);
600        assert_eq!(rem, 1);
601
602        // Virtual offset 1000 -> Physical 8192 (start of second segment)
603        let (phys, rem) = backend.virtual_to_physical(1000).unwrap();
604        assert_eq!(phys, 8192);
605        assert_eq!(rem, 500);
606
607        // Virtual offset 1499 -> Physical 8691 (end of second segment)
608        let (phys, rem) = backend.virtual_to_physical(1499).unwrap();
609        assert_eq!(phys, 8691);
610        assert_eq!(rem, 1);
611
612        // Virtual offset beyond capacity should fail
613        assert!(backend.virtual_to_physical(1500).is_err());
614    }
615}