1use std::sync::Arc;
2
3use bytes::Bytes;
4use indexmap::IndexMap;
5use object_store::{ObjectStore, ObjectStoreExt};
6
7use crate::{
8 DType, Error, Result,
9 address::ChunkAddress,
10 array::ArrayElement,
11 codec::CompressionCodec,
12 delta::{
13 Delta, DeltaAllocator, DeltaCache, DeltaImmutable, DeltaMutable, write_file_then_bytes,
14 },
15 footer::{FOOTER_VERSION, Footer},
16 layout::{
17 ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
18 StorageLayout,
19 },
20 stats::{ArrayStats, StatsFile, compute_chunk_partial, merge_partial, read_stats_file},
21 storage::{InMemoryStorage, ObjectStoreBackend, Storage},
22};
23
24pub const DEFAULT_BLOCK_TARGET_SIZE: usize = 8 * 1024 * 1024; pub const DEFAULT_CACHE_CAPACITY: usize = 256 * 1024 * 1024; pub const DEFAULT_IO_CACHE_CAPACITY: usize = 64 * 1024 * 1024; pub struct FileConfig<C: CompressionCodec> {
49 pub codec: C,
51 pub block_target_size: usize,
53 pub cache_capacity: usize,
57 pub io_cache_capacity: usize,
61 pub cache: Option<Arc<DeltaCache>>,
68}
69
70impl<C: CompressionCodec> FileConfig<C> {
71 pub fn new(codec: C) -> Self {
74 Self {
75 codec,
76 block_target_size: DEFAULT_BLOCK_TARGET_SIZE,
77 cache_capacity: DEFAULT_CACHE_CAPACITY,
78 io_cache_capacity: DEFAULT_IO_CACHE_CAPACITY,
79 cache: None,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
90pub struct MergedArrayMeta {
91 pub name: String,
93 pub dtype: DType,
95 pub shape: Vec<u32>,
97 pub chunk_shape: Vec<u32>,
99 pub dimension_names: Vec<String>,
101 pub fill_value: Option<FillValue>,
103}
104
105struct StoreDir {
109 store: Arc<dyn ObjectStore>,
110 base_path: object_store::path::Path,
111}
112
113pub(crate) struct ChunkedSchema {
115 pub full_shape: Vec<u32>,
116 pub chunk_shape: Vec<u32>,
117 pub dtype: DType,
118 pub all_coords: Vec<Vec<u32>>,
119}
120
121pub struct ArrayFile {
128 deltas: Vec<Delta<DeltaImmutable>>,
129 pending: Option<Delta<DeltaMutable>>,
130 codec: Arc<dyn CompressionCodec>,
131 block_target_size: usize,
132 cache: Option<Arc<DeltaCache>>,
133 store_dir: Option<StoreDir>,
135 stats: Option<StatsFile>,
137}
138
139impl ArrayFile {
142 pub async fn create<C: CompressionCodec + 'static>(
149 store: Arc<dyn ObjectStore>,
150 path: object_store::path::Path,
151 config: FileConfig<C>,
152 ) -> Result<Self> {
153 let cache = resolve_cache(&config);
154 let delta_path = Arc::<str>::from(path.as_ref());
155 let storage =
156 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
157 write_empty_base(&*storage).await?;
158 let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
159 Ok(ArrayFile {
160 deltas: vec![base_delta],
161 pending: None,
162 codec: Arc::new(config.codec),
163 block_target_size: config.block_target_size,
164 cache,
165 store_dir: Some(StoreDir {
166 store,
167 base_path: path,
168 }),
169 stats: None,
170 })
171 }
172
173 pub async fn open<C: CompressionCodec + 'static>(
180 store: Arc<dyn ObjectStore>,
181 path: object_store::path::Path,
182 config: FileConfig<C>,
183 ) -> Result<Self> {
184 let cache = resolve_cache(&config);
185 let delta_path = Arc::<str>::from(path.as_ref());
186 let storage =
187 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
188 let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
189 let mut deltas = vec![base_delta];
190
191 let sidecars = discover_sidecars_store(&*store, &path).await?;
192 for (_, scar_path) in sidecars {
193 let scar_delta_path = Arc::<str>::from(scar_path.as_ref());
194 let scar_storage = Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path))
195 as Arc<dyn Storage>;
196 deltas.push(
197 Delta::<DeltaImmutable>::open(scar_storage, scar_delta_path, cache.clone()).await?,
198 );
199 }
200
201 let stats = {
202 let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&path));
203 read_stats_file(&s_storage).await.ok()
204 };
205
206 Ok(ArrayFile {
207 deltas,
208 pending: None,
209 codec: Arc::new(config.codec),
210 block_target_size: config.block_target_size,
211 cache,
212 store_dir: Some(StoreDir {
213 store,
214 base_path: path,
215 }),
216 stats,
217 })
218 }
219
220 pub async fn create_memory<C: CompressionCodec + 'static>(
226 config: FileConfig<C>,
227 ) -> Result<Self> {
228 let cache = resolve_cache(&config);
229 let storage = Arc::new(InMemoryStorage::new()) as Arc<dyn Storage>;
230 write_empty_base(&*storage).await?;
231 let base_delta =
232 Delta::<DeltaImmutable>::open(storage, Arc::from("__memory_0__"), cache.clone())
233 .await?;
234 Ok(ArrayFile {
235 deltas: vec![base_delta],
236 pending: None,
237 codec: Arc::new(config.codec),
238 block_target_size: config.block_target_size,
239 cache,
240 store_dir: None,
241 stats: None,
242 })
243 }
244}
245
246impl ArrayFile {
249 pub fn get_array(&self, name: &str) -> Result<&ArrayMeta> {
252 self.resolve_array_meta(name)
253 .ok_or_else(|| Error::ArrayNotFound {
254 name: name.to_string(),
255 })
256 }
257
258 fn resolve_array_meta(&self, name: &str) -> Option<&ArrayMeta> {
259 if let Some(p) = self.pending.as_ref()
260 && let Some(m) = p.inner.array_meta.get(name)
261 {
262 return if m.deleted { None } else { Some(m) };
263 }
264 for delta in self.deltas.iter().rev() {
265 if let Some(&i) = delta.inner.array_index.get(name) {
266 let m = &delta.inner.footer.arrays[i];
267 return if m.deleted { None } else { Some(m) };
268 }
269 }
270 None
271 }
272
273 fn pending_mut(&mut self) -> &mut Delta<DeltaMutable> {
275 if self.pending.is_none() {
276 let overlay_index = self.deltas.len() as u32;
277 self.pending = Some(Delta::<DeltaMutable>::new(
278 Arc::clone(&self.codec),
279 self.block_target_size,
280 overlay_index,
281 ));
282 }
283 self.pending.as_mut().unwrap()
284 }
285
286 pub(crate) fn get_chunked_schema(&self, name: &str) -> Result<ChunkedSchema> {
288 let meta = self.get_array(name)?;
289 let full_shape = meta.layout.shape.clone();
290 let chunk_shape = meta.layout.storage.chunk_shape.clone();
291 let dtype = meta.dtype.clone();
292 let mut existing: IndexMap<Vec<u32>, ()> = IndexMap::new();
294 for delta in &self.deltas {
295 if let Some(&i) = delta.inner.array_index.get(name) {
296 for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
297 existing.entry(e.coord.clone()).or_default();
298 }
299 }
300 }
301 if let Some(p) = self.pending.as_ref()
302 && let Some(m) = p.inner.array_meta.get(name)
303 {
304 for e in &m.layout.storage.chunks {
305 existing.entry(e.coord.clone()).or_default();
306 }
307 }
308 Ok(ChunkedSchema {
309 full_shape,
310 chunk_shape,
311 dtype,
312 all_coords: existing.into_keys().collect(),
313 })
314 }
315
316 pub fn list_arrays(&self) -> Vec<MergedArrayMeta> {
318 let mut seen: IndexMap<String, MergedArrayMeta> = IndexMap::new();
319
320 for delta in &self.deltas {
322 for a in &delta.inner.footer.arrays {
323 if a.deleted {
324 seen.shift_remove(&a.name);
325 } else {
326 seen.insert(
327 a.name.clone(),
328 MergedArrayMeta {
329 name: a.name.clone(),
330 dtype: a.dtype.clone(),
331 shape: a.layout.shape.clone(),
332 chunk_shape: a.layout.storage.chunk_shape.clone(),
333 dimension_names: a.layout.dimension_names.clone(),
334 fill_value: a.fill_value.clone(),
335 },
336 );
337 }
338 }
339 }
340 if let Some(p) = self.pending.as_ref() {
341 for (name, a) in &p.inner.array_meta {
342 if a.deleted {
343 seen.shift_remove(name);
344 } else {
345 seen.insert(
346 name.clone(),
347 MergedArrayMeta {
348 name: a.name.clone(),
349 dtype: a.dtype.clone(),
350 shape: a.layout.shape.clone(),
351 chunk_shape: a.layout.storage.chunk_shape.clone(),
352 dimension_names: a.layout.dimension_names.clone(),
353 fill_value: a.fill_value.clone(),
354 },
355 );
356 }
357 }
358 }
359 seen.into_values().collect()
360 }
361
362 pub fn array_stats(&self, name: &str) -> Option<&ArrayStats> {
364 self.stats.as_ref()?.get_array(name)
365 }
366
367 pub fn num_layers(&self) -> usize {
369 self.deltas.len()
370 }
371
372 pub fn get_attribute(&self, name: &str, key: &str) -> Result<Option<&AttributeValue>> {
375 let meta = self.get_array(name)?;
376 let key_idx = match self
377 .pending
378 .as_ref()
379 .and_then(|p| p.inner.attr_keys.iter().position(|k| k == key))
380 .or_else(|| {
381 self.deltas
383 .iter()
384 .rev()
385 .find_map(|d| d.inner.footer.attr_keys.iter().position(|k| k == key))
386 }) {
387 Some(i) => i,
388 None => return Ok(None),
389 };
390 let val_idx = match meta.attributes.get(key_idx) {
391 Some(i) => i,
392 None => return Ok(None),
393 };
394 if let Some(p) = self.pending.as_ref()
396 && val_idx < p.inner.attr_values.len()
397 {
398 return Ok(Some(&p.inner.attr_values[val_idx]));
399 }
400 for delta in self.deltas.iter().rev() {
401 if val_idx < delta.inner.footer.attr_values.len() {
402 return Ok(Some(&delta.inner.footer.attr_values[val_idx]));
403 }
404 }
405 Ok(None)
406 }
407
408 pub fn set_attribute(&mut self, name: &str, key: &str, value: AttributeValue) -> Result<()> {
413 let mut existing_meta = self.get_array(name)?.clone();
418 existing_meta.layout.storage.chunks.clear();
419
420 let pending = self.pending_mut();
421 let key_idx = pending.intern_attr_key(key);
422 let val_idx = pending.intern_attr_value(value);
423
424 if pending.array_meta_mut(name).is_none() {
426 pending.upsert_array_meta(existing_meta);
427 }
428 let meta = pending.array_meta_mut(name).unwrap();
429 meta.attributes.upsert(key_idx, val_idx);
430 Ok(())
431 }
432}
433
434impl ArrayFile {
437 pub fn define_array<T: ArrayElement>(
449 &mut self,
450 name: impl Into<String>,
451 dimension_names: Vec<String>,
452 shape: Vec<usize>,
453 chunk_shape: Option<Vec<usize>>,
454 fill_value: Option<FillValue>,
455 ) -> Result<()> {
456 let name = name.into();
457 if self.resolve_array_meta(&name).is_some() {
458 return Err(Error::ArrayAlreadyExists { name });
459 }
460 let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
461 let ndim = shape_u32.len();
462 let chunk_shape_u32: Vec<u32> = chunk_shape
463 .map(|cs| cs.iter().map(|&s| s as u32).collect())
464 .unwrap_or_else(|| shape_u32.clone());
465 let dim_names = if dimension_names.len() == ndim {
466 dimension_names
467 } else {
468 (0..ndim).map(|i| format!("dim{i}")).collect()
469 };
470 let layout = ArrayLayout {
471 shape: shape_u32,
472 dimension_names: dim_names,
473 storage: StorageLayout {
474 chunk_shape: chunk_shape_u32,
475 chunks: vec![],
476 },
477 };
478 self.pending_mut().upsert_array_meta(ArrayMeta {
479 name,
480 dtype: T::DTYPE,
481 layout,
482 fill_value,
483 deleted: false,
484 attributes: Attributes::empty(AttrIndexKind::U16),
485 });
486 Ok(())
487 }
488
489 pub fn delete(&mut self, name: &str) -> Result<()> {
495 let meta = self.get_array(name)?.clone();
496 self.pending_mut().mark_deleted(meta);
497 Ok(())
498 }
499}
500
501impl ArrayFile {
504 pub(crate) async fn read_chunk<T: ArrayElement>(
505 &self,
506 name: &str,
507 coord: &[u32],
508 ) -> Result<Vec<T>> {
509 if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
510 return Ok(T::decode_chunk(&bytes));
511 }
512 let meta = self.get_array(name)?;
513 let chunk_elems: usize = meta
514 .layout
515 .storage
516 .chunk_shape
517 .iter()
518 .enumerate()
519 .map(|(i, &cs)| {
520 let axis_len = meta.layout.shape[i] as usize;
521 let start = coord[i] as usize * cs as usize;
522 (cs as usize).min(axis_len.saturating_sub(start))
523 })
524 .product();
525 Ok(vec![T::fill_element(meta.fill_value.as_ref()); chunk_elems])
526 }
527
528 pub(crate) fn write_chunk_raw(
529 &mut self,
530 name: &str,
531 coord: Vec<u32>,
532 bytes: Vec<u8>,
533 ) -> Result<()> {
534 let snapshot = if self
539 .pending
540 .as_ref()
541 .and_then(|p| p.inner.array_meta.get(name))
542 .is_none()
543 {
544 let mut m = self.get_array(name)?.clone();
545 m.layout.storage.chunks.clear();
546 Some(m)
547 } else {
548 None
549 };
550 let pending = self.pending_mut();
551 if let Some(meta) = snapshot {
552 pending.upsert_array_meta(meta);
553 }
554 pending.write_raw_chunk(name, coord, &bytes)
555 }
556
557 async fn resolve_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
558 if let Some(p) = self.pending.as_ref()
559 && let Some(bytes) = p.read_raw_chunk(name, coord)
560 {
561 return Ok(Some(bytes));
562 }
563 for delta in self.deltas.iter().rev() {
564 if let Some(bytes) = delta.read_raw_chunk(name, coord).await? {
565 return Ok(Some(bytes));
566 }
567 }
568 Ok(None)
569 }
570}
571
572impl ArrayFile {
575 pub async fn write_array<T: ArrayElement>(
583 &mut self,
584 name: &str,
585 start: Vec<usize>,
586 data: ndarray::ArrayView<'_, T, ndarray::IxDyn>,
587 ) -> Result<()> {
588 crate::ndarray_ext::write_nd(self, name, data, &start).await
589 }
590
591 pub async fn read_array<T: ArrayElement>(
598 &self,
599 name: &str,
600 start: Vec<usize>,
601 shape: Vec<usize>,
602 ) -> Result<ndarray::ArcArray<T, ndarray::IxDyn>> {
603 use std::ops::Range;
604 let slice: Option<Vec<Range<usize>>> = if start.is_empty() && shape.is_empty() {
605 None
606 } else {
607 let meta = self.get_array(name)?;
608 let ndim = meta.layout.shape.len();
609 let effective_start = if start.len() == ndim {
610 start.clone()
611 } else {
612 vec![0; ndim]
613 };
614 let effective_shape: Vec<usize> = if shape.len() == ndim {
615 shape.clone()
616 } else {
617 meta.layout.shape.iter().map(|&s| s as usize).collect()
618 };
619 Some(
620 effective_start
621 .iter()
622 .zip(&effective_shape)
623 .map(|(&s, &sz)| s..s + sz)
624 .collect(),
625 )
626 };
627 crate::ndarray_ext::assemble_nd(self, name, slice.as_deref()).await
628 }
629}
630
631impl ArrayFile {
634 pub async fn flush(&mut self) -> Result<()> {
640 if self.pending.is_none() {
641 return Ok(());
642 }
643 let (store, base_path) = match &self.store_dir {
644 Some(sd) => (Arc::clone(&sd.store), sd.base_path.clone()),
645 None => {
646 return Err(Error::Storage(
647 "in-memory file: use flush_memory instead".into(),
648 ));
649 }
650 };
651 let overlay_index = self.deltas.len() as u32;
652 let scar_path = sidecar_path(&base_path, overlay_index);
653 let delta_path = Arc::<str>::from(scar_path.as_ref());
654 let storage =
655 Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path)) as Arc<dyn Storage>;
656 let hint = base_path.as_ref().to_string();
657 let dirty_names = self.commit_pending(storage, delta_path, hint).await?;
658
659 let merged = self.compute_stats_for(&dirty_names).await?;
660 let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&base_path));
661 s_storage
662 .write(bytes::Bytes::from(merged.serialize()?))
663 .await?;
664 self.stats = Some(merged);
665 Ok(())
666 }
667
668 pub async fn flush_memory(&mut self, storage: &InMemoryStorage) -> Result<()> {
674 if self.pending.is_none() {
675 return Ok(());
676 }
677 let overlay_index = self.deltas.len() as u32;
678 let delta_path = Arc::<str>::from(format!("__memory_{overlay_index}__").as_str());
679 let arc: Arc<dyn Storage> = Arc::new(storage.clone());
680 let dirty_names = self.commit_pending(arc, delta_path, String::new()).await?;
681
682 let merged = self.compute_stats_for(&dirty_names).await?;
683 self.stats = Some(merged);
684 Ok(())
685 }
686
687 async fn compute_stats_for(&self, dirty_names: &[String]) -> Result<StatsFile> {
688 let mut merged = self.stats.clone().unwrap_or_default();
689 for name in dirty_names {
690 let schema = match self.get_chunked_schema(name) {
691 Ok(s) => s,
692 Err(_) => continue,
693 };
694 let fill_value = self
695 .resolve_array_meta(name)
696 .and_then(|m| m.fill_value.clone());
697 let shape_product: u64 = schema.full_shape.iter().map(|&s| s as u64).product();
698 let mut stats = ArrayStats::new(name.clone());
699 let mut written_non_null: u64 = 0;
700 for coord in &schema.all_coords {
701 if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
702 let (min, max, nc, rc) =
703 compute_chunk_partial(&bytes, &schema.dtype, fill_value.as_ref());
704 written_non_null += rc - nc;
705 merge_partial(&mut stats, min, max, nc, rc);
706 }
707 }
708 stats.row_count = shape_product;
709 stats.null_count = shape_product - written_non_null;
710 merged.upsert(stats);
711 }
712 Ok(merged)
713 }
714
715 async fn commit_pending(
719 &mut self,
720 storage: Arc<dyn Storage>,
721 delta_path: Arc<str>,
722 base_file_hint: String,
723 ) -> Result<Vec<String>> {
724 let mutable = self
725 .pending
726 .take()
727 .expect("commit_pending: no pending delta");
728 let dirty_names: Vec<String> = mutable
729 .inner
730 .array_meta
731 .iter()
732 .filter(|(_, m)| !m.layout.storage.chunks.is_empty())
733 .map(|(name, _)| name.clone())
734 .collect();
735 let immutable = mutable
736 .commit(storage, delta_path, self.cache.clone(), base_file_hint)
737 .await?;
738 self.deltas.push(immutable);
739 Ok(dirty_names)
740 }
741}
742
743impl ArrayFile {
746 pub async fn compact(&mut self) -> Result<()> {
752 let merged_names: Vec<String> = self.list_arrays().into_iter().map(|m| m.name).collect();
754
755 let mut allocator = DeltaAllocator::new(Arc::clone(&self.codec), self.block_target_size);
757 let mut arrays: Vec<ArrayMeta> = Vec::new();
758 let mut per_array_stats: Vec<ArrayStats> = Vec::new();
759
760 for name in &merged_names {
761 let meta = self
762 .resolve_array_meta(name)
763 .ok_or_else(|| Error::ArrayNotFound { name: name.clone() })?
764 .clone();
765
766 let mut all_coords: indexmap::IndexSet<Vec<u32>> = indexmap::IndexSet::new();
768 for delta in &self.deltas {
769 if let Some(&i) = delta.inner.array_index.get(name.as_str()) {
770 for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
771 all_coords.insert(e.coord.clone());
772 }
773 }
774 }
775
776 let shape_product: u64 = meta.layout.shape.iter().map(|&s| s as u64).product();
777 let mut new_chunks: Vec<ChunkEntry> = Vec::new();
778 let mut array_stats = ArrayStats::new(name.clone());
779 let mut written_non_null: u64 = 0;
780 for coord in &all_coords {
781 if let Some(raw) = self.resolve_raw_chunk(name, coord).await? {
783 let (min, max, nc, rc) =
784 compute_chunk_partial(&raw, &meta.dtype, meta.fill_value.as_ref());
785 written_non_null += rc - nc;
786 merge_partial(&mut array_stats, min, max, nc, rc);
787 let alloc = allocator.allocate(&raw);
788 new_chunks.push(ChunkEntry {
789 coord: coord.clone(),
790 address: ChunkAddress::from(alloc),
791 });
792 }
793 }
794 array_stats.row_count = shape_product;
795 array_stats.null_count = shape_product - written_non_null;
796 per_array_stats.push(array_stats);
797
798 let mut new_meta = meta;
799 new_meta.layout.storage.chunks = new_chunks;
800 arrays.push(new_meta);
801 }
802
803 let crate::delta::AllocatorOutput {
804 mut file,
805 output_size,
806 blocks,
807 } = allocator.commit().await;
808
809 let mut attr_keys: Vec<String> = Vec::new();
811 let mut attr_values: Vec<crate::layout::AttributeValue> = Vec::new();
812 for delta in &self.deltas {
813 for k in &delta.inner.footer.attr_keys {
814 if !attr_keys.contains(k) {
815 attr_keys.push(k.clone());
816 }
817 }
818 for v in &delta.inner.footer.attr_values {
819 if !attr_values.contains(v) {
820 attr_values.push(v.clone());
821 }
822 }
823 }
824
825 let footer = Footer {
826 version: FOOTER_VERSION,
827 blocks,
828 arrays,
829 attr_keys,
830 attr_values,
831 overlay_index: 0,
832 base_file_hint: String::new(),
833 };
834 let footer_bytes = footer.serialize()?;
835
836 let base_storage: Arc<dyn Storage> = if let Some(sd) = &self.store_dir {
838 for i in 1..self.deltas.len() {
840 let _ = sd
841 .store
842 .delete(&sidecar_path(&sd.base_path, i as u32))
843 .await;
844 }
845 Arc::new(ObjectStoreBackend::new(
847 Arc::clone(&sd.store),
848 sd.base_path.clone(),
849 ))
850 } else {
851 Arc::clone(&self.deltas[0].inner.storage)
853 };
854
855 write_file_then_bytes(&mut file, output_size, &footer_bytes, &*base_storage).await?;
856 let base_delta_path: Arc<str> = if let Some(sd) = &self.store_dir {
857 Arc::from(sd.base_path.as_ref())
858 } else {
859 Arc::from("__memory_0__")
860 };
861 let new_base =
862 Delta::<DeltaImmutable>::open(base_storage, base_delta_path, self.cache.clone())
863 .await?;
864 self.deltas = vec![new_base];
865
866 let mut new_stats = StatsFile::default();
867 for s in per_array_stats {
868 new_stats.upsert(s);
869 }
870 if let Some(sd) = &self.store_dir {
871 let s_storage =
872 ObjectStoreBackend::new(Arc::clone(&sd.store), stats_path(&sd.base_path));
873 s_storage
874 .write(bytes::Bytes::from(new_stats.serialize()?))
875 .await?;
876 }
877 self.stats = Some(new_stats);
878 Ok(())
879 }
880}
881
882fn resolve_cache<C: CompressionCodec>(config: &FileConfig<C>) -> Option<Arc<DeltaCache>> {
885 if let Some(c) = &config.cache {
886 Some(Arc::clone(c))
887 } else if config.cache_capacity == 0 && config.io_cache_capacity == 0 {
888 None
889 } else {
890 Some(Arc::new(DeltaCache::new(
891 config.cache_capacity as u64,
892 config.io_cache_capacity as u64,
893 )))
894 }
895}
896
897fn sidecar_path(base: &object_store::path::Path, n: u32) -> object_store::path::Path {
898 let s = base.as_ref();
899 let without_af = s.strip_suffix(".af").unwrap_or(s);
900 object_store::path::Path::from(format!("{without_af}.{n}.af").as_str())
901}
902
903fn stats_path(base: &object_store::path::Path) -> object_store::path::Path {
904 let s = base.as_ref();
905 let without_af = s.strip_suffix(".af").unwrap_or(s);
906 object_store::path::Path::from(format!("{without_af}.stats").as_str())
907}
908
909async fn discover_sidecars_store(
910 store: &dyn ObjectStore,
911 base_path: &object_store::path::Path,
912) -> Result<Vec<(u32, object_store::path::Path)>> {
913 use futures::TryStreamExt;
914 let base_str = base_path.as_ref();
915 let stem_prefix = base_str
916 .strip_suffix(".af")
917 .ok_or_else(|| Error::Storage("path must end with .af".into()))?;
918 let list_prefix = base_str
919 .rfind('/')
920 .map(|pos| object_store::path::Path::from(&base_str[..pos]));
921 let objects: Vec<_> = store
922 .list(list_prefix.as_ref())
923 .try_collect()
924 .await
925 .map_err(|e| Error::Storage(e.to_string()))?;
926 let mut sidecars: Vec<(u32, object_store::path::Path)> = objects
927 .into_iter()
928 .filter_map(|meta| {
929 let s = meta.location.as_ref();
930 let rest = s.strip_prefix(stem_prefix)?.strip_prefix('.')?;
931 let (num_str, ext) = rest.rsplit_once('.')?;
932 if ext != "af" {
933 return None;
934 }
935 let n: u32 = num_str.parse().ok()?;
936 if n == 0 {
937 return None;
938 }
939 Some((n, meta.location))
940 })
941 .collect();
942 sidecars.sort_by_key(|(n, _)| *n);
943 Ok(sidecars)
944}
945
946async fn write_empty_base(storage: &dyn Storage) -> Result<()> {
947 let footer = Footer::new();
948 let bytes = footer.serialize()?;
949 storage.write(Bytes::from(bytes)).await
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955 use crate::codec::NoCompression;
956
957 #[tokio::test]
958 async fn shared_cache_is_reused_across_files() {
959 let shared = Arc::new(DeltaCache::new(1024 * 1024, 0));
960
961 let mut cfg_a = FileConfig::new(NoCompression);
962 cfg_a.cache = Some(Arc::clone(&shared));
963 let file_a = ArrayFile::create_memory(cfg_a).await.unwrap();
964
965 let mut cfg_b = FileConfig::new(NoCompression);
966 cfg_b.cache = Some(Arc::clone(&shared));
967 let file_b = ArrayFile::create_memory(cfg_b).await.unwrap();
968
969 let a = file_a.cache.as_ref().expect("file_a has cache");
970 let b = file_b.cache.as_ref().expect("file_b has cache");
971 assert!(Arc::ptr_eq(a, &shared));
972 assert!(Arc::ptr_eq(b, &shared));
973 }
974}