1use std::sync::Arc;
9
10use bytes::Bytes;
11use indexmap::IndexMap;
12use object_store::{ObjectStore, ObjectStoreExt};
13
14use crate::{
15 DType, Error, Result,
16 address::ChunkAddress,
17 array::ArrayElement,
18 codec::CompressionCodec,
19 delta::{
20 Delta, DeltaAllocator, DeltaCache, DeltaImmutable, DeltaMutable, write_file_then_bytes,
21 },
22 footer::{FOOTER_VERSION, Footer},
23 layout::{
24 ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
25 StorageLayout,
26 },
27 stats::{ArrayStats, StatsFile, compute_chunk_partial, merge_partial, read_stats_file},
28 storage::{ObjectStoreBackend, Storage},
29};
30
31pub const DEFAULT_BLOCK_TARGET_SIZE: usize = 8 * 1024 * 1024; pub const DEFAULT_CACHE_CAPACITY: usize = 256 * 1024 * 1024; pub const DEFAULT_IO_CACHE_CAPACITY: usize = 64 * 1024 * 1024; pub struct FileConfig<C: CompressionCodec> {
56 pub codec: C,
58 pub block_target_size: usize,
60 pub cache_capacity: usize,
64 pub io_cache_capacity: usize,
68 pub cache: Option<Arc<DeltaCache>>,
75}
76
77impl<C: CompressionCodec> FileConfig<C> {
78 pub fn new(codec: C) -> Self {
81 Self {
82 codec,
83 block_target_size: DEFAULT_BLOCK_TARGET_SIZE,
84 cache_capacity: DEFAULT_CACHE_CAPACITY,
85 io_cache_capacity: DEFAULT_IO_CACHE_CAPACITY,
86 cache: None,
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
97pub struct MergedArrayMeta {
98 pub name: String,
100 pub dtype: DType,
102 pub shape: Vec<u32>,
104 pub chunk_shape: Vec<u32>,
106 pub dimension_names: Vec<String>,
108 pub fill_value: Option<FillValue>,
110}
111
112struct StoreDir {
116 store: Arc<dyn ObjectStore>,
117 base_path: object_store::path::Path,
118}
119
120pub(crate) struct ChunkedSchema {
122 pub full_shape: Vec<u32>,
123 pub chunk_shape: Vec<u32>,
124 pub dtype: DType,
125 pub all_coords: Vec<Vec<u32>>,
126}
127
128pub struct ArrayFile {
135 deltas: Vec<Delta<DeltaImmutable>>,
136 pending: Option<Delta<DeltaMutable>>,
137 codec: Arc<dyn CompressionCodec>,
138 block_target_size: usize,
139 cache: Option<Arc<DeltaCache>>,
140 store_dir: StoreDir,
143 stats: Option<StatsFile>,
145}
146
147impl ArrayFile {
150 pub async fn create<C: CompressionCodec + 'static>(
157 store: Arc<dyn ObjectStore>,
158 path: object_store::path::Path,
159 config: FileConfig<C>,
160 ) -> Result<Self> {
161 let cache = resolve_cache(&config);
162 let delta_path = Arc::<str>::from(path.as_ref());
163 let storage =
164 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
165 write_empty_base(&*storage).await?;
166 let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
167 Ok(ArrayFile {
168 deltas: vec![base_delta],
169 pending: None,
170 codec: Arc::new(config.codec),
171 block_target_size: config.block_target_size,
172 cache,
173 store_dir: StoreDir {
174 store,
175 base_path: path,
176 },
177 stats: None,
178 })
179 }
180
181 pub async fn open<C: CompressionCodec + 'static>(
188 store: Arc<dyn ObjectStore>,
189 path: object_store::path::Path,
190 config: FileConfig<C>,
191 ) -> Result<Self> {
192 let cache = resolve_cache(&config);
193 let delta_path = Arc::<str>::from(path.as_ref());
194 let storage =
195 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
196 let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
197 let mut deltas = vec![base_delta];
198
199 let sidecars = discover_sidecars_store(&*store, &path).await?;
200 for (_, scar_path) in sidecars {
201 let scar_delta_path = Arc::<str>::from(scar_path.as_ref());
202 let scar_storage = Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path))
203 as Arc<dyn Storage>;
204 deltas.push(
205 Delta::<DeltaImmutable>::open(scar_storage, scar_delta_path, cache.clone()).await?,
206 );
207 }
208
209 let stats = {
210 let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&path));
211 read_stats_file(&s_storage).await.ok()
212 };
213
214 Ok(ArrayFile {
215 deltas,
216 pending: None,
217 codec: Arc::new(config.codec),
218 block_target_size: config.block_target_size,
219 cache,
220 store_dir: StoreDir {
221 store,
222 base_path: path,
223 },
224 stats,
225 })
226 }
227
228 pub async fn create_memory<C: CompressionCodec + 'static>(
234 config: FileConfig<C>,
235 ) -> Result<Self> {
236 let store = Arc::new(object_store::memory::InMemory::new());
237 Self::create(store, object_store::path::Path::from("memory.af"), config).await
238 }
239}
240
241impl ArrayFile {
244 pub fn get_array(&self, name: &str) -> Result<&ArrayMeta> {
247 self.resolve_array_meta(name)
248 .ok_or_else(|| Error::ArrayNotFound {
249 name: name.to_string(),
250 })
251 }
252
253 fn resolve_array_meta(&self, name: &str) -> Option<&ArrayMeta> {
254 if let Some(p) = self.pending.as_ref()
255 && let Some(m) = p.inner.array_meta.get(name)
256 {
257 return if m.deleted { None } else { Some(m) };
258 }
259 for delta in self.deltas.iter().rev() {
260 if let Some(&i) = delta.inner.array_index.get(name) {
261 let m = &delta.inner.footer.arrays[i];
262 return if m.deleted { None } else { Some(m) };
263 }
264 }
265 None
266 }
267
268 fn pending_mut(&mut self) -> &mut Delta<DeltaMutable> {
270 if self.pending.is_none() {
271 let overlay_index = self.deltas.len() as u32;
272 self.pending = Some(Delta::<DeltaMutable>::new(
273 Arc::clone(&self.codec),
274 self.block_target_size,
275 overlay_index,
276 ));
277 }
278 self.pending.as_mut().unwrap()
279 }
280
281 pub(crate) fn get_chunked_schema(&self, name: &str) -> Result<ChunkedSchema> {
283 let meta = self.get_array(name)?;
284 let full_shape = meta.layout.shape.clone();
285 let chunk_shape = meta.layout.storage.chunk_shape.clone();
286 let dtype = meta.dtype.clone();
287 let mut existing: IndexMap<Vec<u32>, ()> = IndexMap::new();
289 for delta in &self.deltas {
290 if let Some(&i) = delta.inner.array_index.get(name) {
291 for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
292 existing.entry(e.coord.clone()).or_default();
293 }
294 }
295 }
296 if let Some(p) = self.pending.as_ref()
297 && let Some(m) = p.inner.array_meta.get(name)
298 {
299 for e in &m.layout.storage.chunks {
300 existing.entry(e.coord.clone()).or_default();
301 }
302 }
303 Ok(ChunkedSchema {
304 full_shape,
305 chunk_shape,
306 dtype,
307 all_coords: existing.into_keys().collect(),
308 })
309 }
310
311 pub fn list_arrays(&self) -> Vec<MergedArrayMeta> {
313 let mut seen: IndexMap<String, MergedArrayMeta> = IndexMap::new();
314
315 for delta in &self.deltas {
317 for a in &delta.inner.footer.arrays {
318 if a.deleted {
319 seen.shift_remove(&a.name);
320 } else {
321 seen.insert(
322 a.name.clone(),
323 MergedArrayMeta {
324 name: a.name.clone(),
325 dtype: a.dtype.clone(),
326 shape: a.layout.shape.clone(),
327 chunk_shape: a.layout.storage.chunk_shape.clone(),
328 dimension_names: a.layout.dimension_names.clone(),
329 fill_value: a.fill_value.clone(),
330 },
331 );
332 }
333 }
334 }
335 if let Some(p) = self.pending.as_ref() {
336 for (name, a) in &p.inner.array_meta {
337 if a.deleted {
338 seen.shift_remove(name);
339 } else {
340 seen.insert(
341 name.clone(),
342 MergedArrayMeta {
343 name: a.name.clone(),
344 dtype: a.dtype.clone(),
345 shape: a.layout.shape.clone(),
346 chunk_shape: a.layout.storage.chunk_shape.clone(),
347 dimension_names: a.layout.dimension_names.clone(),
348 fill_value: a.fill_value.clone(),
349 },
350 );
351 }
352 }
353 }
354 seen.into_values().collect()
355 }
356
357 pub fn array_stats(&self, name: &str) -> Option<&ArrayStats> {
359 self.stats.as_ref()?.get_array(name)
360 }
361
362 pub fn num_layers(&self) -> usize {
364 self.deltas.len()
365 }
366
367 pub fn get_attribute(&self, name: &str, key: &str) -> Result<Option<&AttributeValue>> {
370 let meta = self.get_array(name)?;
371 let key_idx = match self
372 .pending
373 .as_ref()
374 .and_then(|p| p.inner.attr_keys.iter().position(|k| k == key))
375 .or_else(|| {
376 self.deltas
378 .iter()
379 .rev()
380 .find_map(|d| d.inner.footer.attr_keys.iter().position(|k| k == key))
381 }) {
382 Some(i) => i,
383 None => return Ok(None),
384 };
385 let val_idx = match meta.attributes.get(key_idx) {
386 Some(i) => i,
387 None => return Ok(None),
388 };
389 if let Some(p) = self.pending.as_ref()
391 && val_idx < p.inner.attr_values.len()
392 {
393 return Ok(Some(&p.inner.attr_values[val_idx]));
394 }
395 for delta in self.deltas.iter().rev() {
396 if val_idx < delta.inner.footer.attr_values.len() {
397 return Ok(Some(&delta.inner.footer.attr_values[val_idx]));
398 }
399 }
400 Ok(None)
401 }
402
403 pub fn set_attribute(&mut self, name: &str, key: &str, value: AttributeValue) -> Result<()> {
408 let mut existing_meta = self.get_array(name)?.clone();
413 existing_meta.layout.storage.chunks.clear();
414
415 let pending = self.pending_mut();
416 let key_idx = pending.intern_attr_key(key);
417 let val_idx = pending.intern_attr_value(value);
418
419 if pending.array_meta_mut(name).is_none() {
421 pending.upsert_array_meta(existing_meta);
422 }
423 let meta = pending.array_meta_mut(name).unwrap();
424 meta.attributes.upsert(key_idx, val_idx);
425 Ok(())
426 }
427}
428
429impl ArrayFile {
432 pub fn define_array<T: ArrayElement>(
444 &mut self,
445 name: impl Into<String>,
446 dimension_names: Vec<String>,
447 shape: Vec<usize>,
448 chunk_shape: Option<Vec<usize>>,
449 fill_value: Option<FillValue>,
450 ) -> Result<()> {
451 let name = name.into();
452 if self.resolve_array_meta(&name).is_some() {
453 return Err(Error::ArrayAlreadyExists { name });
454 }
455 let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
456 let ndim = shape_u32.len();
457 let chunk_shape_u32: Vec<u32> = chunk_shape
458 .map(|cs| cs.iter().map(|&s| s as u32).collect())
459 .unwrap_or_else(|| shape_u32.clone());
460 let dim_names = if dimension_names.len() == ndim {
461 dimension_names
462 } else {
463 (0..ndim).map(|i| format!("dim{i}")).collect()
464 };
465 let layout = ArrayLayout {
466 shape: shape_u32,
467 dimension_names: dim_names,
468 storage: StorageLayout {
469 chunk_shape: chunk_shape_u32,
470 chunks: vec![],
471 },
472 };
473 self.pending_mut().upsert_array_meta(ArrayMeta {
474 name,
475 dtype: T::DTYPE,
476 layout,
477 fill_value,
478 deleted: false,
479 attributes: Attributes::empty(AttrIndexKind::U16),
480 });
481 Ok(())
482 }
483
484 pub fn delete(&mut self, name: &str) -> Result<()> {
490 let meta = self.get_array(name)?.clone();
491 self.pending_mut().mark_deleted(meta);
492 Ok(())
493 }
494}
495
496impl ArrayFile {
499 pub(crate) async fn read_chunk<T: ArrayElement>(
500 &self,
501 name: &str,
502 coord: &[u32],
503 ) -> Result<Vec<T>> {
504 if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
505 return Ok(T::decode_chunk(&bytes));
506 }
507 let meta = self.get_array(name)?;
508 let chunk_elems: usize = meta
509 .layout
510 .storage
511 .chunk_shape
512 .iter()
513 .enumerate()
514 .map(|(i, &cs)| {
515 let axis_len = meta.layout.shape[i] as usize;
516 let start = coord[i] as usize * cs as usize;
517 (cs as usize).min(axis_len.saturating_sub(start))
518 })
519 .product();
520 Ok(vec![T::fill_element(meta.fill_value.as_ref()); chunk_elems])
521 }
522
523 pub(crate) fn write_chunk_raw(
524 &mut self,
525 name: &str,
526 coord: Vec<u32>,
527 bytes: Vec<u8>,
528 ) -> Result<()> {
529 let snapshot = if self
534 .pending
535 .as_ref()
536 .and_then(|p| p.inner.array_meta.get(name))
537 .is_none()
538 {
539 let mut m = self.get_array(name)?.clone();
540 m.layout.storage.chunks.clear();
541 Some(m)
542 } else {
543 None
544 };
545 let pending = self.pending_mut();
546 if let Some(meta) = snapshot {
547 pending.upsert_array_meta(meta);
548 }
549 pending.write_raw_chunk(name, coord, &bytes)
550 }
551
552 async fn resolve_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
553 if let Some(p) = self.pending.as_ref()
554 && let Some(bytes) = p.read_raw_chunk(name, coord)
555 {
556 return Ok(Some(bytes));
557 }
558 for delta in self.deltas.iter().rev() {
559 if let Some(bytes) = delta.read_raw_chunk(name, coord).await? {
560 return Ok(Some(bytes));
561 }
562 }
563 Ok(None)
564 }
565}
566
567impl ArrayFile {
570 pub async fn write_array<T: ArrayElement>(
578 &mut self,
579 name: &str,
580 start: Vec<usize>,
581 data: ndarray::ArrayView<'_, T, ndarray::IxDyn>,
582 ) -> Result<()> {
583 crate::ndarray_ext::write_nd(self, name, data, &start).await
584 }
585
586 pub async fn read_array<T: ArrayElement>(
593 &self,
594 name: &str,
595 start: Vec<usize>,
596 shape: Vec<usize>,
597 ) -> Result<ndarray::ArcArray<T, ndarray::IxDyn>> {
598 use std::ops::Range;
599 let slice: Option<Vec<Range<usize>>> = if start.is_empty() && shape.is_empty() {
600 None
601 } else {
602 let meta = self.get_array(name)?;
603 let ndim = meta.layout.shape.len();
604 let effective_start = if start.len() == ndim {
605 start.clone()
606 } else {
607 vec![0; ndim]
608 };
609 let effective_shape: Vec<usize> = if shape.len() == ndim {
610 shape.clone()
611 } else {
612 meta.layout.shape.iter().map(|&s| s as usize).collect()
613 };
614 Some(
615 effective_start
616 .iter()
617 .zip(&effective_shape)
618 .map(|(&s, &sz)| s..s + sz)
619 .collect(),
620 )
621 };
622 crate::ndarray_ext::assemble_nd(self, name, slice.as_deref()).await
623 }
624}
625
626impl ArrayFile {
629 pub async fn flush(&mut self) -> Result<()> {
634 if self.pending.is_none() {
635 return Ok(());
636 }
637 let store = Arc::clone(&self.store_dir.store);
638 let base_path = self.store_dir.base_path.clone();
639 let overlay_index = self.deltas.len() as u32;
640 let scar_path = sidecar_path(&base_path, overlay_index);
641 let delta_path = Arc::<str>::from(scar_path.as_ref());
642 let storage =
643 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path)) as Arc<dyn Storage>;
644 let hint = base_path.as_ref().to_string();
645 let dirty_names = self.commit_pending(storage, delta_path, hint).await?;
646
647 let merged = self.compute_stats_for(&dirty_names).await?;
648 let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&base_path));
649 s_storage
650 .write(bytes::Bytes::from(merged.serialize()?))
651 .await?;
652 self.stats = Some(merged);
653 Ok(())
654 }
655
656 async fn compute_stats_for(&self, dirty_names: &[String]) -> Result<StatsFile> {
657 let mut merged = self.stats.clone().unwrap_or_default();
658 for name in dirty_names {
659 let schema = match self.get_chunked_schema(name) {
660 Ok(s) => s,
661 Err(_) => continue,
662 };
663 let fill_value = self
664 .resolve_array_meta(name)
665 .and_then(|m| m.fill_value.clone());
666 let shape_product: u64 = schema.full_shape.iter().map(|&s| s as u64).product();
667 let mut stats = ArrayStats::new(name.clone());
668 let mut written_non_null: u64 = 0;
669 for coord in &schema.all_coords {
670 if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
671 let (min, max, nc, rc) =
672 compute_chunk_partial(&bytes, &schema.dtype, fill_value.as_ref());
673 written_non_null += rc - nc;
674 merge_partial(&mut stats, min, max, nc, rc);
675 }
676 }
677 stats.row_count = shape_product;
678 stats.null_count = shape_product - written_non_null;
679 merged.upsert(stats);
680 }
681 Ok(merged)
682 }
683
684 async fn commit_pending(
688 &mut self,
689 storage: Arc<dyn Storage>,
690 delta_path: Arc<str>,
691 base_file_hint: String,
692 ) -> Result<Vec<String>> {
693 let mutable = self
694 .pending
695 .take()
696 .expect("commit_pending: no pending delta");
697 let dirty_names: Vec<String> = mutable
698 .inner
699 .array_meta
700 .iter()
701 .filter(|(_, m)| !m.layout.storage.chunks.is_empty())
702 .map(|(name, _)| name.clone())
703 .collect();
704 let immutable = mutable
705 .commit(storage, delta_path, self.cache.clone(), base_file_hint)
706 .await?;
707 self.deltas.push(immutable);
708 Ok(dirty_names)
709 }
710}
711
712impl ArrayFile {
715 pub async fn compact(&mut self) -> Result<()> {
721 let merged_names: Vec<String> = self.list_arrays().into_iter().map(|m| m.name).collect();
723
724 let mut allocator = DeltaAllocator::new(Arc::clone(&self.codec), self.block_target_size);
726 let mut arrays: Vec<ArrayMeta> = Vec::new();
727 let mut per_array_stats: Vec<ArrayStats> = Vec::new();
728
729 for name in &merged_names {
730 let meta = self
731 .resolve_array_meta(name)
732 .ok_or_else(|| Error::ArrayNotFound { name: name.clone() })?
733 .clone();
734
735 let mut all_coords: indexmap::IndexSet<Vec<u32>> = indexmap::IndexSet::new();
737 for delta in &self.deltas {
738 if let Some(&i) = delta.inner.array_index.get(name.as_str()) {
739 for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
740 all_coords.insert(e.coord.clone());
741 }
742 }
743 }
744
745 let shape_product: u64 = meta.layout.shape.iter().map(|&s| s as u64).product();
746 let mut new_chunks: Vec<ChunkEntry> = Vec::new();
747 let mut array_stats = ArrayStats::new(name.clone());
748 let mut written_non_null: u64 = 0;
749 for coord in &all_coords {
750 if let Some(raw) = self.resolve_raw_chunk(name, coord).await? {
752 let (min, max, nc, rc) =
753 compute_chunk_partial(&raw, &meta.dtype, meta.fill_value.as_ref());
754 written_non_null += rc - nc;
755 merge_partial(&mut array_stats, min, max, nc, rc);
756 let alloc = allocator.allocate(&raw);
757 new_chunks.push(ChunkEntry {
758 coord: coord.clone(),
759 address: ChunkAddress::from(alloc),
760 });
761 }
762 }
763 array_stats.row_count = shape_product;
764 array_stats.null_count = shape_product - written_non_null;
765 per_array_stats.push(array_stats);
766
767 let mut new_meta = meta;
768 new_meta.layout.storage.chunks = new_chunks;
769 arrays.push(new_meta);
770 }
771
772 let crate::delta::AllocatorOutput {
773 mut file,
774 output_size,
775 blocks,
776 } = allocator.commit().await;
777
778 let mut attr_keys: Vec<String> = Vec::new();
780 let mut attr_values: Vec<crate::layout::AttributeValue> = Vec::new();
781 for delta in &self.deltas {
782 for k in &delta.inner.footer.attr_keys {
783 if !attr_keys.contains(k) {
784 attr_keys.push(k.clone());
785 }
786 }
787 for v in &delta.inner.footer.attr_values {
788 if !attr_values.contains(v) {
789 attr_values.push(v.clone());
790 }
791 }
792 }
793
794 let footer = Footer {
795 version: FOOTER_VERSION,
796 blocks,
797 arrays,
798 attr_keys,
799 attr_values,
800 overlay_index: 0,
801 base_file_hint: String::new(),
802 };
803 let footer_bytes = footer.serialize()?;
804
805 let sd = &self.store_dir;
807 for i in 1..self.deltas.len() {
809 let _ = sd
810 .store
811 .delete(&sidecar_path(&sd.base_path, i as u32))
812 .await;
813 }
814 let base_storage: Arc<dyn Storage> = Arc::new(ObjectStoreBackend::new(
815 Arc::clone(&sd.store),
816 sd.base_path.clone(),
817 ));
818
819 write_file_then_bytes(&mut file, output_size, &footer_bytes, &*base_storage).await?;
820 let base_delta_path: Arc<str> = Arc::from(sd.base_path.as_ref());
821 let new_base =
822 Delta::<DeltaImmutable>::open(base_storage, base_delta_path, self.cache.clone())
823 .await?;
824 self.deltas = vec![new_base];
825
826 let mut new_stats = StatsFile::default();
827 for s in per_array_stats {
828 new_stats.upsert(s);
829 }
830 let s_storage = ObjectStoreBackend::new(
831 Arc::clone(&self.store_dir.store),
832 stats_path(&self.store_dir.base_path),
833 );
834 s_storage
835 .write(bytes::Bytes::from(new_stats.serialize()?))
836 .await?;
837 self.stats = Some(new_stats);
838 Ok(())
839 }
840}
841
842fn resolve_cache<C: CompressionCodec>(config: &FileConfig<C>) -> Option<Arc<DeltaCache>> {
845 if let Some(c) = &config.cache {
846 Some(Arc::clone(c))
847 } else if config.cache_capacity == 0 && config.io_cache_capacity == 0 {
848 None
849 } else {
850 Some(Arc::new(DeltaCache::new(
851 config.cache_capacity as u64,
852 config.io_cache_capacity as u64,
853 )))
854 }
855}
856
857fn sidecar_path(base: &object_store::path::Path, n: u32) -> object_store::path::Path {
858 let s = base.as_ref();
859 let without_af = s.strip_suffix(".af").unwrap_or(s);
860 object_store::path::Path::from(format!("{without_af}.{n}.af").as_str())
861}
862
863fn stats_path(base: &object_store::path::Path) -> object_store::path::Path {
864 let s = base.as_ref();
865 let without_af = s.strip_suffix(".af").unwrap_or(s);
866 object_store::path::Path::from(format!("{without_af}.stats").as_str())
867}
868
869async fn discover_sidecars_store(
870 store: &dyn ObjectStore,
871 base_path: &object_store::path::Path,
872) -> Result<Vec<(u32, object_store::path::Path)>> {
873 use futures::TryStreamExt;
874 let base_str = base_path.as_ref();
875 let stem_prefix = base_str
876 .strip_suffix(".af")
877 .ok_or_else(|| Error::Storage("path must end with .af".into()))?;
878 let list_prefix = base_str
879 .rfind('/')
880 .map(|pos| object_store::path::Path::from(&base_str[..pos]));
881 let objects: Vec<_> = store
882 .list(list_prefix.as_ref())
883 .try_collect()
884 .await
885 .map_err(|e| Error::Storage(e.to_string()))?;
886 let mut sidecars: Vec<(u32, object_store::path::Path)> = objects
887 .into_iter()
888 .filter_map(|meta| {
889 let s = meta.location.as_ref();
890 let rest = s.strip_prefix(stem_prefix)?.strip_prefix('.')?;
891 let (num_str, ext) = rest.rsplit_once('.')?;
892 if ext != "af" {
893 return None;
894 }
895 let n: u32 = num_str.parse().ok()?;
896 if n == 0 {
897 return None;
898 }
899 Some((n, meta.location))
900 })
901 .collect();
902 sidecars.sort_by_key(|(n, _)| *n);
903 Ok(sidecars)
904}
905
906async fn write_empty_base(storage: &dyn Storage) -> Result<()> {
907 let footer = Footer::new();
908 let bytes = footer.serialize()?;
909 storage.write(Bytes::from(bytes)).await
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915 use crate::codec::NoCompression;
916
917 #[tokio::test]
918 async fn shared_cache_is_reused_across_files() {
919 let shared = Arc::new(DeltaCache::new(1024 * 1024, 0));
920
921 let mut cfg_a = FileConfig::new(NoCompression);
922 cfg_a.cache = Some(Arc::clone(&shared));
923 let file_a = ArrayFile::create_memory(cfg_a).await.unwrap();
924
925 let mut cfg_b = FileConfig::new(NoCompression);
926 cfg_b.cache = Some(Arc::clone(&shared));
927 let file_b = ArrayFile::create_memory(cfg_b).await.unwrap();
928
929 let a = file_a.cache.as_ref().expect("file_a has cache");
930 let b = file_b.cache.as_ref().expect("file_b has cache");
931 assert!(Arc::ptr_eq(a, &shared));
932 assert!(Arc::ptr_eq(b, &shared));
933 }
934}