1use crate::StorageBackend;
2use crate::column_family::header::Segment;
3use std::fmt::{Debug, Formatter};
4use std::io;
5use std::sync::{Arc, Mutex, RwLock};
6
7pub struct PartitionedStorageBackend {
47 inner: Arc<dyn StorageBackend>,
48 segments: Arc<RwLock<Vec<Segment>>>,
49 expansion_callback: Option<Arc<dyn Fn(u64) -> io::Result<Segment> + Send + Sync>>,
50 file_growth_lock: Arc<Mutex<()>>,
54}
55
56impl PartitionedStorageBackend {
57 pub fn new(inner: Arc<dyn StorageBackend>, partition_offset: u64, partition_size: u64) -> Self {
69 partition_offset
71 .checked_add(partition_size)
72 .expect("partition_offset + partition_size overflows u64");
73
74 Self {
75 inner,
76 segments: Arc::new(RwLock::new(vec![Segment::new(
77 partition_offset,
78 partition_size,
79 )])),
80 expansion_callback: None,
81 file_growth_lock: Arc::new(Mutex::new(())),
82 }
83 }
84
85 pub fn with_segments(
93 inner: Arc<dyn StorageBackend>,
94 segments: Vec<Segment>,
95 expansion_callback: Option<Arc<dyn Fn(u64) -> io::Result<Segment> + Send + Sync>>,
96 file_growth_lock: Arc<Mutex<()>>,
97 ) -> Self {
98 Self {
99 inner,
100 segments: Arc::new(RwLock::new(segments)),
101 expansion_callback,
102 file_growth_lock,
103 }
104 }
105
106 fn total_size(&self) -> u64 {
108 let segments = self.segments.read().unwrap();
109 segments.iter().map(|s| s.size).sum()
110 }
111
112 fn virtual_to_physical(&self, virtual_offset: u64) -> io::Result<(u64, u64)> {
116 let segments = self.segments.read().unwrap();
117 let mut current_virtual = 0u64;
118
119 for segment in segments.iter() {
120 let segment_end = current_virtual + segment.size;
121
122 if virtual_offset < segment_end {
123 let offset_in_segment = virtual_offset - current_virtual;
125 let physical_offset = segment.offset + offset_in_segment;
126 let remaining_in_segment = segment.size - offset_in_segment;
127 return Ok((physical_offset, remaining_in_segment));
128 }
129
130 current_virtual = segment_end;
131 }
132
133 Err(io::Error::new(
134 io::ErrorKind::InvalidInput,
135 format!("virtual offset {virtual_offset} exceeds total size {current_virtual}"),
136 ))
137 }
138
139 fn try_expand(&self, requested_size: u64) -> io::Result<()> {
141 if let Some(callback) = &self.expansion_callback {
142 let new_segment = callback(requested_size)?;
143 let mut segments = self.segments.write().unwrap();
144 segments.push(new_segment);
145 Ok(())
146 } else {
147 Err(io::Error::new(
148 io::ErrorKind::InvalidInput,
149 "cannot expand partition: no expansion callback configured",
150 ))
151 }
152 }
153}
154
155impl Debug for PartitionedStorageBackend {
156 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
157 let segments = self.segments.read().unwrap();
158 f.debug_struct("PartitionedStorageBackend")
159 .field("segment_count", &segments.len())
160 .field("total_size", &self.total_size())
161 .finish_non_exhaustive()
162 }
163}
164
165impl StorageBackend for PartitionedStorageBackend {
166 fn len(&self) -> io::Result<u64> {
167 let underlying_len = self.inner.len()?;
169 let segments = self.segments.read().unwrap();
170
171 let mut total_allocated = 0u64;
172
173 for segment in segments.iter() {
174 if underlying_len <= segment.offset {
175 break;
177 }
178
179 let segment_allocated = (underlying_len - segment.offset).min(segment.size);
180 total_allocated += segment_allocated;
181
182 if segment_allocated < segment.size {
184 break;
185 }
186 }
187
188 Ok(total_allocated)
189 }
190
191 fn read(&self, offset: u64, out: &mut [u8]) -> io::Result<()> {
192 let mut bytes_read = 0;
193 let mut current_offset = offset;
194
195 while bytes_read < out.len() {
196 let (physical_offset, remaining_in_segment) =
197 self.virtual_to_physical(current_offset)?;
198
199 #[allow(clippy::cast_possible_truncation)]
200 let bytes_to_read =
201 (out.len() - bytes_read).min(remaining_in_segment.min(usize::MAX as u64) as usize);
202
203 self.inner.read(
204 physical_offset,
205 &mut out[bytes_read..bytes_read + bytes_to_read],
206 )?;
207
208 bytes_read += bytes_to_read;
209 current_offset += bytes_to_read as u64;
210 }
211
212 Ok(())
213 }
214
215 fn set_len(&self, len: u64) -> io::Result<()> {
216 let current_total = self.total_size();
217
218 if len > current_total {
220 let needed = len - current_total;
221 let allocation_size = needed + (needed / 10).max(1024 * 1024); self.try_expand(allocation_size)?;
224 }
225
226 let max_physical_end = {
229 let segments = self.segments.read().unwrap();
230 let mut remaining = len;
231 let mut max_physical_end = 0u64;
232
233 for segment in segments.iter() {
234 if remaining == 0 {
235 break;
236 }
237
238 let used_in_segment = remaining.min(segment.size);
239 let physical_end = segment.offset + used_in_segment;
240 max_physical_end = max_physical_end.max(physical_end);
241
242 remaining = remaining.saturating_sub(used_in_segment);
243 }
244
245 max_physical_end
246 };
247
248 let _growth_lock = self.file_growth_lock.lock().unwrap();
253
254 let current_underlying_len = self.inner.len()?;
256 if max_physical_end > current_underlying_len {
257 self.inner.set_len(max_physical_end)?;
258 }
259 Ok(())
262 }
263
264 fn sync_data(&self) -> io::Result<()> {
265 self.inner.sync_data()
266 }
267
268 fn write(&self, offset: u64, data: &[u8]) -> io::Result<()> {
269 let mut bytes_written = 0;
270 let mut current_offset = offset;
271
272 while bytes_written < data.len() {
273 let (physical_offset, remaining_in_segment) =
274 self.virtual_to_physical(current_offset)?;
275
276 #[allow(clippy::cast_possible_truncation)]
277 let bytes_to_write = (data.len() - bytes_written)
278 .min(remaining_in_segment.min(usize::MAX as u64) as usize);
279
280 self.inner.write(
281 physical_offset,
282 &data[bytes_written..bytes_written + bytes_to_write],
283 )?;
284
285 bytes_written += bytes_to_write;
286 current_offset += bytes_to_write as u64;
287 }
288
289 Ok(())
290 }
291
292 fn close(&self) -> io::Result<()> {
293 Ok(())
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::backends::InMemoryBackend;
302
303 #[test]
304 fn test_single_segment_len() {
305 let inner = Arc::new(InMemoryBackend::new());
306 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
307
308 assert_eq!(backend.len().unwrap(), 0);
310
311 backend.set_len(3000).unwrap();
313 assert_eq!(backend.len().unwrap(), 3000);
314
315 backend.set_len(5000).unwrap();
316 assert_eq!(backend.len().unwrap(), 5000);
317 }
318
319 #[test]
320 fn test_read_write_with_offset_translation() {
321 let inner = Arc::new(InMemoryBackend::new());
322 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
323
324 backend.set_len(200).unwrap();
326
327 let write_data = b"hello world";
329 backend.write(100, write_data).unwrap();
330
331 let mut verify_buf = vec![0u8; write_data.len()];
333 inner.read(1100, &mut verify_buf).unwrap();
334 assert_eq!(&verify_buf, write_data);
335
336 let mut read_buf = vec![0u8; write_data.len()];
338 backend.read(100, &mut read_buf).unwrap();
339 assert_eq!(&read_buf, write_data);
340 }
341
342 #[test]
343 fn test_read_at_offset_zero() {
344 let inner = Arc::new(InMemoryBackend::new());
345 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
346
347 backend.set_len(100).unwrap();
349
350 let write_data = b"start";
351 backend.write(0, write_data).unwrap();
352
353 let mut verify_buf = vec![0u8; write_data.len()];
355 inner.read(1000, &mut verify_buf).unwrap();
356 assert_eq!(&verify_buf, write_data);
357 }
358
359 #[test]
360 fn test_read_at_partition_end() {
361 let inner = Arc::new(InMemoryBackend::new());
362 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
363
364 backend.set_len(5000).unwrap();
366
367 let write_data = b"end!_";
369 backend.write(4995, write_data).unwrap();
370
371 let mut read_buf = vec![0u8; write_data.len()];
372 backend.read(4995, &mut read_buf).unwrap();
373 assert_eq!(&read_buf, write_data);
374 }
375
376 #[test]
377 fn test_write_exceeds_partition_size() {
378 let inner = Arc::new(InMemoryBackend::new());
379 let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
380
381 let write_data = b"overflow";
383 let result = backend.write(4996, write_data);
384
385 assert!(result.is_err());
386 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
387 }
388
389 #[test]
390 fn test_read_exceeds_partition_size() {
391 let inner = Arc::new(InMemoryBackend::new());
392 let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
393
394 let mut buf = vec![0u8; 100];
395 let result = backend.read(4950, &mut buf);
396
397 assert!(result.is_err());
398 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
399 }
400
401 #[test]
402 fn test_set_len_within_partition() {
403 let inner = Arc::new(InMemoryBackend::new());
404 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
405
406 backend.set_len(3000).unwrap();
407
408 assert_eq!(inner.len().unwrap(), 4000);
410 }
411
412 #[test]
413 fn test_set_len_without_expansion_callback() {
414 let inner = Arc::new(InMemoryBackend::new());
415 let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
416
417 let result = backend.set_len(6000);
419 assert!(result.is_err());
420 }
421
422 #[test]
423 fn test_multiple_partitions_same_storage() {
424 let inner = Arc::new(InMemoryBackend::new());
425
426 let partition1 = PartitionedStorageBackend::new(inner.clone(), 0, 1000);
427 let partition2 = PartitionedStorageBackend::new(inner.clone(), 1000, 1000);
428
429 partition1.set_len(200).unwrap();
431 partition2.set_len(200).unwrap();
432
433 partition1.write(100, b"partition1").unwrap();
435
436 partition2.write(100, b"partition2").unwrap();
438
439 let mut buf1 = vec![0u8; 10];
441 partition1.read(100, &mut buf1).unwrap();
442 assert_eq!(&buf1, b"partition1");
443
444 let mut buf2 = vec![0u8; 10];
446 partition2.read(100, &mut buf2).unwrap();
447 assert_eq!(&buf2, b"partition2");
448
449 let mut verify1 = vec![0u8; 10];
451 inner.read(100, &mut verify1).unwrap();
452 assert_eq!(&verify1, b"partition1");
453
454 let mut verify2 = vec![0u8; 10];
455 inner.read(1100, &mut verify2).unwrap();
456 assert_eq!(&verify2, b"partition2");
457 }
458
459 #[test]
460 fn test_partition_isolation() {
461 let inner = Arc::new(InMemoryBackend::new());
462
463 let partition1 = PartitionedStorageBackend::new(inner.clone(), 0, 1000);
464 let partition2 = PartitionedStorageBackend::new(inner.clone(), 1000, 1000);
465
466 assert_eq!(partition1.len().unwrap(), 0);
468 assert_eq!(partition2.len().unwrap(), 0);
469
470 partition1.set_len(800).unwrap();
472
473 assert_eq!(partition1.len().unwrap(), 800);
475
476 assert_eq!(partition2.len().unwrap(), 0);
478 }
479
480 #[test]
481 fn test_sync_delegates_to_inner() {
482 let inner = Arc::new(InMemoryBackend::new());
483 let backend = PartitionedStorageBackend::new(inner, 1000, 5000);
484
485 assert!(backend.sync_data().is_ok());
487 }
488
489 #[test]
490 fn test_close_does_not_close_inner() {
491 let inner = Arc::new(InMemoryBackend::new());
492 let backend = PartitionedStorageBackend::new(inner.clone(), 1000, 5000);
493
494 backend.close().unwrap();
496
497 inner.set_len(100).unwrap();
499 let mut buf = vec![0u8; 10];
500 assert!(inner.read(0, &mut buf).is_ok());
501 }
502
503 #[test]
504 #[should_panic(expected = "partition_offset + partition_size overflows u64")]
505 fn test_construction_overflow_panics() {
506 let inner = Arc::new(InMemoryBackend::new());
507 PartitionedStorageBackend::new(inner, u64::MAX, 1);
508 }
509
510 #[test]
511 fn test_multi_segment_read_write() {
512 let inner = Arc::new(InMemoryBackend::new());
513
514 let segments = vec![
515 Segment::new(1000, 1000), Segment::new(5000, 1000), Segment::new(10000, 1000), ];
519
520 let backend = PartitionedStorageBackend::with_segments(
521 inner.clone(),
522 segments,
523 None,
524 Arc::new(Mutex::new(())),
525 );
526 backend.set_len(3000).unwrap();
527
528 let mut data = Vec::new();
531 for i in 0u8..200 {
532 data.push(i);
533 }
534
535 backend.write(900, &data).unwrap();
537
538 let mut read_buf = vec![0u8; data.len()];
540 backend.read(900, &mut read_buf).unwrap();
541 assert_eq!(&read_buf, &data);
542
543 let first_segment_bytes = 100; let mut verify1 = vec![0u8; first_segment_bytes];
549 inner.read(1900, &mut verify1).unwrap(); assert_eq!(&verify1, &data[..first_segment_bytes]);
551
552 let mut verify2 = vec![0u8; 100]; inner.read(5000, &mut verify2).unwrap(); assert_eq!(
555 &verify2,
556 &data[first_segment_bytes..first_segment_bytes + 100]
557 );
558 }
559
560 #[test]
561 fn test_multi_segment_total_size() {
562 let inner = Arc::new(InMemoryBackend::new());
563
564 let segments = vec![
565 Segment::new(1000, 1024),
566 Segment::new(5000, 2048),
567 Segment::new(10000, 512),
568 ];
569
570 let backend = PartitionedStorageBackend::with_segments(
571 inner,
572 segments,
573 None,
574 Arc::new(Mutex::new(())),
575 );
576 assert_eq!(backend.total_size(), 1024 + 2048 + 512);
577 }
578
579 #[test]
580 fn test_virtual_to_physical_mapping() {
581 let inner = Arc::new(InMemoryBackend::new());
582
583 let segments = vec![Segment::new(4096, 1000), Segment::new(8192, 500)];
584
585 let backend = PartitionedStorageBackend::with_segments(
586 inner,
587 segments,
588 None,
589 Arc::new(Mutex::new(())),
590 );
591
592 let (phys, rem) = backend.virtual_to_physical(0).unwrap();
594 assert_eq!(phys, 4096);
595 assert_eq!(rem, 1000);
596
597 let (phys, rem) = backend.virtual_to_physical(999).unwrap();
599 assert_eq!(phys, 5095);
600 assert_eq!(rem, 1);
601
602 let (phys, rem) = backend.virtual_to_physical(1000).unwrap();
604 assert_eq!(phys, 8192);
605 assert_eq!(rem, 500);
606
607 let (phys, rem) = backend.virtual_to_physical(1499).unwrap();
609 assert_eq!(phys, 8691);
610 assert_eq!(rem, 1);
611
612 assert!(backend.virtual_to_physical(1500).is_err());
614 }
615}