Skip to main content

array_format/
file.rs

1//! The runtime: [`ArrayFile`], the top-level read/write/compact handle.
2//!
3//! [`ArrayFile`] ties the lower layers together — it defines and reads/writes
4//! arrays, manages the stack of delta layers (flushing pending writes into
5//! sidecars and compacting them back down), and is configured through
6//! [`FileConfig`].
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use indexmap::IndexMap;
12use object_store::{ObjectStore, ObjectStoreExt};
13
14use crate::{
15    DType, Error, Result,
16    address::ChunkAddress,
17    array::ArrayElement,
18    codec::CompressionCodec,
19    delta::{
20        Delta, DeltaAllocator, DeltaCache, DeltaImmutable, DeltaMutable, write_file_then_bytes,
21    },
22    footer::{FOOTER_VERSION, Footer},
23    layout::{
24        ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
25        StorageLayout,
26    },
27    stats::{ArrayStats, StatsFile, compute_chunk_partial, merge_partial, read_stats_file},
28    storage::{ObjectStoreBackend, Storage},
29};
30
31// ── Constants ───────────────────────────────────────────────────────
32
33/// Default target size for a data block before a new one is started (8 MiB).
34pub const DEFAULT_BLOCK_TARGET_SIZE: usize = 8 * 1024 * 1024; // 8 MiB
35/// Default byte budget for the decompressed-block cache (256 MiB).
36pub const DEFAULT_CACHE_CAPACITY: usize = 256 * 1024 * 1024; // 256 MiB
37/// Default byte budget for the raw I/O slab cache (64 MiB); useful for object-store workloads.
38pub const DEFAULT_IO_CACHE_CAPACITY: usize = 64 * 1024 * 1024; // 64 MiB; enable for object-store workloads
39
40// ── FileConfig ──────────────────────────────────────────────────────
41
42/// Configuration for opening or creating an [`ArrayFile`].
43///
44/// Construct with [`FileConfig::new`] for the defaults, then override fields as
45/// needed:
46///
47/// ```
48/// use array_format::{FileConfig, ZstdCodec};
49///
50/// let config = FileConfig {
51///     block_target_size: 4 * 1024 * 1024,
52///     ..FileConfig::new(ZstdCodec { level: 9 })
53/// };
54/// ```
55pub struct FileConfig<C: CompressionCodec> {
56    /// Compression codec applied to data blocks on write.
57    pub codec: C,
58    /// Target size of a data block before a new block is started, in bytes.
59    pub block_target_size: usize,
60    /// Byte budget for this file's decompressed-block cache.
61    ///
62    /// Ignored when [`cache`](Self::cache) is `Some`.
63    pub cache_capacity: usize,
64    /// Byte budget for this file's raw I/O slab cache (0 disables it).
65    ///
66    /// Ignored when [`cache`](Self::cache) is `Some`.
67    pub io_cache_capacity: usize,
68    /// Optional pre-built cache to share across multiple [`ArrayFile`]s.
69    ///
70    /// When `Some`, [`cache_capacity`](Self::cache_capacity) and
71    /// [`io_cache_capacity`](Self::io_cache_capacity) are ignored and every file
72    /// sharing this cache is bounded by one combined byte budget. Entries are
73    /// keyed by `(file_path, block_id)`, so files do not interfere.
74    pub cache: Option<Arc<DeltaCache>>,
75}
76
77impl<C: CompressionCodec> FileConfig<C> {
78    /// Creates a config using `codec` and the `DEFAULT_*` capacities, with no
79    /// shared cache.
80    pub fn new(codec: C) -> Self {
81        Self {
82            codec,
83            block_target_size: DEFAULT_BLOCK_TARGET_SIZE,
84            cache_capacity: DEFAULT_CACHE_CAPACITY,
85            io_cache_capacity: DEFAULT_IO_CACHE_CAPACITY,
86            cache: None,
87        }
88    }
89}
90
91// ── MergedArrayMeta ─────────────────────────────────────────────────
92
93/// Array metadata visible to the caller after merging all delta layers.
94///
95/// Returned by [`ArrayFile::list_arrays`].
96#[derive(Debug, Clone)]
97pub struct MergedArrayMeta {
98    /// Array name (unique within the file).
99    pub name: String,
100    /// Element type.
101    pub dtype: DType,
102    /// Full array shape, one entry per dimension.
103    pub shape: Vec<u32>,
104    /// Chunk shape; equals [`shape`](Self::shape) for single-chunk arrays.
105    pub chunk_shape: Vec<u32>,
106    /// Name of each dimension.
107    pub dimension_names: Vec<String>,
108    /// Fill value used for unwritten elements, if one was set at definition.
109    pub fill_value: Option<FillValue>,
110}
111
112// ── File ────────────────────────────────────────────────────────────
113
114/// Object store and base-file path backing a file.
115struct StoreDir {
116    store: Arc<dyn ObjectStore>,
117    base_path: object_store::path::Path,
118}
119
120/// Schema information returned by [`File::get_chunked_schema`].
121pub(crate) struct ChunkedSchema {
122    pub full_shape: Vec<u32>,
123    pub chunk_shape: Vec<u32>,
124    pub dtype: DType,
125    pub all_coords: Vec<Vec<u32>>,
126}
127
128/// The top-level file handle.
129///
130/// Layers are stacked oldest → newest in `deltas`. Uncommitted writes
131/// accumulate in a disk-backed mutable delta (`pending`) and are flushed
132/// by [`flush`](Self::flush). The mutable delta is created lazily on the
133/// first mutation after open/flush.
134pub struct ArrayFile {
135    deltas: Vec<Delta<DeltaImmutable>>,
136    pending: Option<Delta<DeltaMutable>>,
137    codec: Arc<dyn CompressionCodec>,
138    block_target_size: usize,
139    cache: Option<Arc<DeltaCache>>,
140    /// Object store and stem backing this file (an in-memory file uses
141    /// `object_store`'s in-memory backend).
142    store_dir: StoreDir,
143    /// Per-array aggregate statistics; `None` until first flush or open.
144    stats: Option<StatsFile>,
145}
146
147// ── Constructors ────────────────────────────────────────────────────
148
149impl ArrayFile {
150    /// Creates a new empty file at `path` within `store`.
151    ///
152    /// `path` is the base file object and should end in `.af`; sidecars
153    /// (`{stem}.N.af`) and the stats file (`{stem}.stats`) are written alongside
154    /// it in the same prefix. Fails if an object already exists at `path` only
155    /// insofar as the backend allows overwriting — the base is (re)written empty.
156    pub async fn create<C: CompressionCodec + 'static>(
157        store: Arc<dyn ObjectStore>,
158        path: object_store::path::Path,
159        config: FileConfig<C>,
160    ) -> Result<Self> {
161        let cache = resolve_cache(&config);
162        let delta_path = Arc::<str>::from(path.as_ref());
163        let storage =
164            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
165        write_empty_base(&*storage).await?;
166        let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
167        Ok(ArrayFile {
168            deltas: vec![base_delta],
169            pending: None,
170            codec: Arc::new(config.codec),
171            block_target_size: config.block_target_size,
172            cache,
173            store_dir: StoreDir {
174                store,
175                base_path: path,
176            },
177            stats: None,
178        })
179    }
180
181    /// Opens an existing file from `store`, discovering the base and all
182    /// sidecar layers under the same stem.
183    ///
184    /// `path` must end in `.af`. Aggregate statistics are loaded from
185    /// `{stem}.stats` if present; a missing or unreadable stats file is not an
186    /// error (see [`array_stats`](Self::array_stats)).
187    pub async fn open<C: CompressionCodec + 'static>(
188        store: Arc<dyn ObjectStore>,
189        path: object_store::path::Path,
190        config: FileConfig<C>,
191    ) -> Result<Self> {
192        let cache = resolve_cache(&config);
193        let delta_path = Arc::<str>::from(path.as_ref());
194        let storage =
195            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
196        let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
197        let mut deltas = vec![base_delta];
198
199        let sidecars = discover_sidecars_store(&*store, &path).await?;
200        for (_, scar_path) in sidecars {
201            let scar_delta_path = Arc::<str>::from(scar_path.as_ref());
202            let scar_storage = Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path))
203                as Arc<dyn Storage>;
204            deltas.push(
205                Delta::<DeltaImmutable>::open(scar_storage, scar_delta_path, cache.clone()).await?,
206            );
207        }
208
209        let stats = {
210            let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&path));
211            read_stats_file(&s_storage).await.ok()
212        };
213
214        Ok(ArrayFile {
215            deltas,
216            pending: None,
217            codec: Arc::new(config.codec),
218            block_target_size: config.block_target_size,
219            cache,
220            store_dir: StoreDir {
221                store,
222                base_path: path,
223            },
224            stats,
225        })
226    }
227
228    /// Creates a new empty in-memory file.
229    ///
230    /// Backed by `object_store`'s in-memory backend, so it behaves exactly like
231    /// an on-disk file (commit pending writes with [`flush`](Self::flush)) but
232    /// keeps everything in process. Useful for tests and ephemeral pipelines.
233    pub async fn create_memory<C: CompressionCodec + 'static>(
234        config: FileConfig<C>,
235    ) -> Result<Self> {
236        let store = Arc::new(object_store::memory::InMemory::new());
237        Self::create(store, object_store::path::Path::from("memory.af"), config).await
238    }
239}
240
241// ── Schema & attribute access ────────────────────────────────────────
242
243impl ArrayFile {
244    /// Returns a reference to the merged array metadata for `name`,
245    /// searching from the newest layer towards the oldest.
246    pub fn get_array(&self, name: &str) -> Result<&ArrayMeta> {
247        self.resolve_array_meta(name)
248            .ok_or_else(|| Error::ArrayNotFound {
249                name: name.to_string(),
250            })
251    }
252
253    fn resolve_array_meta(&self, name: &str) -> Option<&ArrayMeta> {
254        if let Some(p) = self.pending.as_ref()
255            && let Some(m) = p.inner.array_meta.get(name)
256        {
257            return if m.deleted { None } else { Some(m) };
258        }
259        for delta in self.deltas.iter().rev() {
260            if let Some(&i) = delta.inner.array_index.get(name) {
261                let m = &delta.inner.footer.arrays[i];
262                return if m.deleted { None } else { Some(m) };
263            }
264        }
265        None
266    }
267
268    /// Lazily creates a mutable pending delta and returns a mutable reference.
269    fn pending_mut(&mut self) -> &mut Delta<DeltaMutable> {
270        if self.pending.is_none() {
271            let overlay_index = self.deltas.len() as u32;
272            self.pending = Some(Delta::<DeltaMutable>::new(
273                Arc::clone(&self.codec),
274                self.block_target_size,
275                overlay_index,
276            ));
277        }
278        self.pending.as_mut().unwrap()
279    }
280
281    /// Returns the array schema in the form expected by the ndarray write path.
282    pub(crate) fn get_chunked_schema(&self, name: &str) -> Result<ChunkedSchema> {
283        let meta = self.get_array(name)?;
284        let full_shape = meta.layout.shape.clone();
285        let chunk_shape = meta.layout.storage.chunk_shape.clone();
286        let dtype = meta.dtype.clone();
287        // Collect existing chunk coords from all layers (newest wins, so just union).
288        let mut existing: IndexMap<Vec<u32>, ()> = IndexMap::new();
289        for delta in &self.deltas {
290            if let Some(&i) = delta.inner.array_index.get(name) {
291                for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
292                    existing.entry(e.coord.clone()).or_default();
293                }
294            }
295        }
296        if let Some(p) = self.pending.as_ref()
297            && let Some(m) = p.inner.array_meta.get(name)
298        {
299            for e in &m.layout.storage.chunks {
300                existing.entry(e.coord.clone()).or_default();
301            }
302        }
303        Ok(ChunkedSchema {
304            full_shape,
305            chunk_shape,
306            dtype,
307            all_coords: existing.into_keys().collect(),
308        })
309    }
310
311    /// Returns all non-deleted visible arrays (newest-wins merge).
312    pub fn list_arrays(&self) -> Vec<MergedArrayMeta> {
313        let mut seen: IndexMap<String, MergedArrayMeta> = IndexMap::new();
314
315        // Walk from oldest to newest so later entries overwrite earlier ones.
316        for delta in &self.deltas {
317            for a in &delta.inner.footer.arrays {
318                if a.deleted {
319                    seen.shift_remove(&a.name);
320                } else {
321                    seen.insert(
322                        a.name.clone(),
323                        MergedArrayMeta {
324                            name: a.name.clone(),
325                            dtype: a.dtype.clone(),
326                            shape: a.layout.shape.clone(),
327                            chunk_shape: a.layout.storage.chunk_shape.clone(),
328                            dimension_names: a.layout.dimension_names.clone(),
329                            fill_value: a.fill_value.clone(),
330                        },
331                    );
332                }
333            }
334        }
335        if let Some(p) = self.pending.as_ref() {
336            for (name, a) in &p.inner.array_meta {
337                if a.deleted {
338                    seen.shift_remove(name);
339                } else {
340                    seen.insert(
341                        name.clone(),
342                        MergedArrayMeta {
343                            name: a.name.clone(),
344                            dtype: a.dtype.clone(),
345                            shape: a.layout.shape.clone(),
346                            chunk_shape: a.layout.storage.chunk_shape.clone(),
347                            dimension_names: a.layout.dimension_names.clone(),
348                            fill_value: a.fill_value.clone(),
349                        },
350                    );
351                }
352            }
353        }
354        seen.into_values().collect()
355    }
356
357    /// Returns aggregate statistics for `name`, or `None` if no stats exist yet.
358    pub fn array_stats(&self, name: &str) -> Option<&ArrayStats> {
359        self.stats.as_ref()?.get_array(name)
360    }
361
362    /// Number of committed (immutable) delta layers.
363    pub fn num_layers(&self) -> usize {
364        self.deltas.len()
365    }
366
367    /// Returns the value of attribute `key` on array `name`, or `None` if the
368    /// array has no such attribute. Errors if the array does not exist.
369    pub fn get_attribute(&self, name: &str, key: &str) -> Result<Option<&AttributeValue>> {
370        let meta = self.get_array(name)?;
371        let key_idx = match self
372            .pending
373            .as_ref()
374            .and_then(|p| p.inner.attr_keys.iter().position(|k| k == key))
375            .or_else(|| {
376                // Check global dicts in most-recent delta
377                self.deltas
378                    .iter()
379                    .rev()
380                    .find_map(|d| d.inner.footer.attr_keys.iter().position(|k| k == key))
381            }) {
382            Some(i) => i,
383            None => return Ok(None),
384        };
385        let val_idx = match meta.attributes.get(key_idx) {
386            Some(i) => i,
387            None => return Ok(None),
388        };
389        // Look up in pending first, then deltas
390        if let Some(p) = self.pending.as_ref()
391            && val_idx < p.inner.attr_values.len()
392        {
393            return Ok(Some(&p.inner.attr_values[val_idx]));
394        }
395        for delta in self.deltas.iter().rev() {
396            if val_idx < delta.inner.footer.attr_values.len() {
397                return Ok(Some(&delta.inner.footer.attr_values[val_idx]));
398            }
399        }
400        Ok(None)
401    }
402
403    /// Sets attribute `key` on array `name` to `value`, inserting or replacing
404    /// any existing entry. The change lands in the pending layer and is
405    /// persisted on the next [`flush`](Self::flush). Errors if the array does
406    /// not exist.
407    pub fn set_attribute(&mut self, name: &str, key: &str, value: AttributeValue) -> Result<()> {
408        // Ensure the array exists (in deltas or pending), and snapshot its meta
409        // in case we need to copy it down into the pending mutable delta.
410        // Clear the cloned chunks list so we don't carry stale block addresses
411        // from a lower layer into this delta's footer.
412        let mut existing_meta = self.get_array(name)?.clone();
413        existing_meta.layout.storage.chunks.clear();
414
415        let pending = self.pending_mut();
416        let key_idx = pending.intern_attr_key(key);
417        let val_idx = pending.intern_attr_value(value);
418
419        // Update the array meta in pending (copy from lower layer if absent).
420        if pending.array_meta_mut(name).is_none() {
421            pending.upsert_array_meta(existing_meta);
422        }
423        let meta = pending.array_meta_mut(name).unwrap();
424        meta.attributes.upsert(key_idx, val_idx);
425        Ok(())
426    }
427}
428
429// ── Array definition / deletion ──────────────────────────────────────
430
431impl ArrayFile {
432    /// Defines a new array in the pending layer.
433    ///
434    /// `shape` is the full array shape; `chunk_shape` tiles it into a grid of
435    /// independently stored chunks, or `None` to store the whole array as a
436    /// single chunk. If `dimension_names` does not have one entry per dimension
437    /// it is replaced with `dim0`, `dim1`, … . `fill_value` is returned for
438    /// elements that are never written.
439    ///
440    /// Errors with [`Error::ArrayAlreadyExists`] if an array of this name is
441    /// already visible. The definition is persisted on the next
442    /// [`flush`](Self::flush).
443    pub fn define_array<T: ArrayElement>(
444        &mut self,
445        name: impl Into<String>,
446        dimension_names: Vec<String>,
447        shape: Vec<usize>,
448        chunk_shape: Option<Vec<usize>>,
449        fill_value: Option<FillValue>,
450    ) -> Result<()> {
451        let name = name.into();
452        if self.resolve_array_meta(&name).is_some() {
453            return Err(Error::ArrayAlreadyExists { name });
454        }
455        let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
456        let ndim = shape_u32.len();
457        let chunk_shape_u32: Vec<u32> = chunk_shape
458            .map(|cs| cs.iter().map(|&s| s as u32).collect())
459            .unwrap_or_else(|| shape_u32.clone());
460        let dim_names = if dimension_names.len() == ndim {
461            dimension_names
462        } else {
463            (0..ndim).map(|i| format!("dim{i}")).collect()
464        };
465        let layout = ArrayLayout {
466            shape: shape_u32,
467            dimension_names: dim_names,
468            storage: StorageLayout {
469                chunk_shape: chunk_shape_u32,
470                chunks: vec![],
471            },
472        };
473        self.pending_mut().upsert_array_meta(ArrayMeta {
474            name,
475            dtype: T::DTYPE,
476            layout,
477            fill_value,
478            deleted: false,
479            attributes: Attributes::empty(AttrIndexKind::U16),
480        });
481        Ok(())
482    }
483
484    /// Logically deletes an array by writing a tombstone to the pending layer.
485    ///
486    /// The array is excluded from [`list_arrays`](Self::list_arrays) and all
487    /// reads immediately, but its bytes remain on disk until
488    /// [`compact`](Self::compact) reclaims them.
489    pub fn delete(&mut self, name: &str) -> Result<()> {
490        let meta = self.get_array(name)?.clone();
491        self.pending_mut().mark_deleted(meta);
492        Ok(())
493    }
494}
495
496// ── Chunk-level read/write (pub(crate) for ndarray_ext) ──────────────
497
498impl ArrayFile {
499    pub(crate) async fn read_chunk<T: ArrayElement>(
500        &self,
501        name: &str,
502        coord: &[u32],
503    ) -> Result<Vec<T>> {
504        if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
505            return Ok(T::decode_chunk(&bytes));
506        }
507        let meta = self.get_array(name)?;
508        let chunk_elems: usize = meta
509            .layout
510            .storage
511            .chunk_shape
512            .iter()
513            .enumerate()
514            .map(|(i, &cs)| {
515                let axis_len = meta.layout.shape[i] as usize;
516                let start = coord[i] as usize * cs as usize;
517                (cs as usize).min(axis_len.saturating_sub(start))
518            })
519            .product();
520        Ok(vec![T::fill_element(meta.fill_value.as_ref()); chunk_elems])
521    }
522
523    pub(crate) fn write_chunk_raw(
524        &mut self,
525        name: &str,
526        coord: Vec<u32>,
527        bytes: Vec<u8>,
528    ) -> Result<()> {
529        // If the array isn't yet present in pending, copy its meta down so the
530        // mutable delta has an entry to attach the chunk to. Clear the cloned
531        // chunks list — the lower-layer addresses don't apply to this delta's
532        // data file, and only chunks written into this session belong here.
533        let snapshot = if self
534            .pending
535            .as_ref()
536            .and_then(|p| p.inner.array_meta.get(name))
537            .is_none()
538        {
539            let mut m = self.get_array(name)?.clone();
540            m.layout.storage.chunks.clear();
541            Some(m)
542        } else {
543            None
544        };
545        let pending = self.pending_mut();
546        if let Some(meta) = snapshot {
547            pending.upsert_array_meta(meta);
548        }
549        pending.write_raw_chunk(name, coord, &bytes)
550    }
551
552    async fn resolve_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
553        if let Some(p) = self.pending.as_ref()
554            && let Some(bytes) = p.read_raw_chunk(name, coord)
555        {
556            return Ok(Some(bytes));
557        }
558        for delta in self.deltas.iter().rev() {
559            if let Some(bytes) = delta.read_raw_chunk(name, coord).await? {
560                return Ok(Some(bytes));
561            }
562        }
563        Ok(None)
564    }
565}
566
567// ── ndarray read/write ───────────────────────────────────────────────
568
569impl ArrayFile {
570    /// Writes `data` into array `name` with its origin at coordinate `start`.
571    ///
572    /// The region may span multiple chunks and need not be chunk-aligned;
573    /// partially covered chunks are read-modify-written automatically. `T` must
574    /// match the array's declared dtype, otherwise [`Error::DTypeMismatch`] is
575    /// returned. Writes accumulate in the pending layer until
576    /// [`flush`](Self::flush).
577    pub async fn write_array<T: ArrayElement>(
578        &mut self,
579        name: &str,
580        start: Vec<usize>,
581        data: ndarray::ArrayView<'_, T, ndarray::IxDyn>,
582    ) -> Result<()> {
583        crate::ndarray_ext::write_nd(self, name, data, &start).await
584    }
585
586    /// Reads the sub-region of array `name` starting at `start` with the given
587    /// `shape`.
588    ///
589    /// Pass `vec![], vec![]` to read the whole array. Chunks that were never
590    /// written are materialized from the array's fill value. `T` must match the
591    /// array's declared dtype, otherwise [`Error::DTypeMismatch`] is returned.
592    pub async fn read_array<T: ArrayElement>(
593        &self,
594        name: &str,
595        start: Vec<usize>,
596        shape: Vec<usize>,
597    ) -> Result<ndarray::ArcArray<T, ndarray::IxDyn>> {
598        use std::ops::Range;
599        let slice: Option<Vec<Range<usize>>> = if start.is_empty() && shape.is_empty() {
600            None
601        } else {
602            let meta = self.get_array(name)?;
603            let ndim = meta.layout.shape.len();
604            let effective_start = if start.len() == ndim {
605                start.clone()
606            } else {
607                vec![0; ndim]
608            };
609            let effective_shape: Vec<usize> = if shape.len() == ndim {
610                shape.clone()
611            } else {
612                meta.layout.shape.iter().map(|&s| s as usize).collect()
613            };
614            Some(
615                effective_start
616                    .iter()
617                    .zip(&effective_shape)
618                    .map(|(&s, &sz)| s..s + sz)
619                    .collect(),
620            )
621        };
622        crate::ndarray_ext::assemble_nd(self, name, slice.as_deref()).await
623    }
624}
625
626// ── Flush ────────────────────────────────────────────────────────────
627
628impl ArrayFile {
629    /// Commits pending writes to a new sidecar layer and refreshes the
630    /// `{stem}.stats` file.
631    ///
632    /// A no-op if there are no pending changes.
633    pub async fn flush(&mut self) -> Result<()> {
634        if self.pending.is_none() {
635            return Ok(());
636        }
637        let store = Arc::clone(&self.store_dir.store);
638        let base_path = self.store_dir.base_path.clone();
639        let overlay_index = self.deltas.len() as u32;
640        let scar_path = sidecar_path(&base_path, overlay_index);
641        let delta_path = Arc::<str>::from(scar_path.as_ref());
642        let storage =
643            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path)) as Arc<dyn Storage>;
644        let hint = base_path.as_ref().to_string();
645        let dirty_names = self.commit_pending(storage, delta_path, hint).await?;
646
647        let merged = self.compute_stats_for(&dirty_names).await?;
648        let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&base_path));
649        s_storage
650            .write(bytes::Bytes::from(merged.serialize()?))
651            .await?;
652        self.stats = Some(merged);
653        Ok(())
654    }
655
656    async fn compute_stats_for(&self, dirty_names: &[String]) -> Result<StatsFile> {
657        let mut merged = self.stats.clone().unwrap_or_default();
658        for name in dirty_names {
659            let schema = match self.get_chunked_schema(name) {
660                Ok(s) => s,
661                Err(_) => continue,
662            };
663            let fill_value = self
664                .resolve_array_meta(name)
665                .and_then(|m| m.fill_value.clone());
666            let shape_product: u64 = schema.full_shape.iter().map(|&s| s as u64).product();
667            let mut stats = ArrayStats::new(name.clone());
668            let mut written_non_null: u64 = 0;
669            for coord in &schema.all_coords {
670                if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
671                    let (min, max, nc, rc) =
672                        compute_chunk_partial(&bytes, &schema.dtype, fill_value.as_ref());
673                    written_non_null += rc - nc;
674                    merge_partial(&mut stats, min, max, nc, rc);
675                }
676            }
677            stats.row_count = shape_product;
678            stats.null_count = shape_product - written_non_null;
679            merged.upsert(stats);
680        }
681        Ok(merged)
682    }
683
684    /// Commits the pending mutable delta to `storage`, appends the resulting
685    /// immutable delta to `self.deltas`, and returns the names of arrays that
686    /// had dirty chunks (used to recompute stats).
687    async fn commit_pending(
688        &mut self,
689        storage: Arc<dyn Storage>,
690        delta_path: Arc<str>,
691        base_file_hint: String,
692    ) -> Result<Vec<String>> {
693        let mutable = self
694            .pending
695            .take()
696            .expect("commit_pending: no pending delta");
697        let dirty_names: Vec<String> = mutable
698            .inner
699            .array_meta
700            .iter()
701            .filter(|(_, m)| !m.layout.storage.chunks.is_empty())
702            .map(|(name, _)| name.clone())
703            .collect();
704        let immutable = mutable
705            .commit(storage, delta_path, self.cache.clone(), base_file_hint)
706            .await?;
707        self.deltas.push(immutable);
708        Ok(dirty_names)
709    }
710}
711
712// ── Compact ──────────────────────────────────────────────────────────
713
714impl ArrayFile {
715    /// Merges all committed layers into a single new base file, deleting the
716    /// sidecars and reclaiming space held by overwritten and tombstoned chunks.
717    ///
718    /// After a successful compaction [`num_layers`](Self::num_layers) returns
719    /// `1`. Recomputes and rewrites the `{stem}.stats` file.
720    pub async fn compact(&mut self) -> Result<()> {
721        // Build the merged view.
722        let merged_names: Vec<String> = self.list_arrays().into_iter().map(|m| m.name).collect();
723
724        // Allocate all chunks for merged arrays.
725        let mut allocator = DeltaAllocator::new(Arc::clone(&self.codec), self.block_target_size);
726        let mut arrays: Vec<ArrayMeta> = Vec::new();
727        let mut per_array_stats: Vec<ArrayStats> = Vec::new();
728
729        for name in &merged_names {
730            let meta = self
731                .resolve_array_meta(name)
732                .ok_or_else(|| Error::ArrayNotFound { name: name.clone() })?
733                .clone();
734
735            // Collect all chunk coords across all layers for this array.
736            let mut all_coords: indexmap::IndexSet<Vec<u32>> = indexmap::IndexSet::new();
737            for delta in &self.deltas {
738                if let Some(&i) = delta.inner.array_index.get(name.as_str()) {
739                    for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
740                        all_coords.insert(e.coord.clone());
741                    }
742                }
743            }
744
745            let shape_product: u64 = meta.layout.shape.iter().map(|&s| s as u64).product();
746            let mut new_chunks: Vec<ChunkEntry> = Vec::new();
747            let mut array_stats = ArrayStats::new(name.clone());
748            let mut written_non_null: u64 = 0;
749            for coord in &all_coords {
750                // Read from newest layer that has this chunk.
751                if let Some(raw) = self.resolve_raw_chunk(name, coord).await? {
752                    let (min, max, nc, rc) =
753                        compute_chunk_partial(&raw, &meta.dtype, meta.fill_value.as_ref());
754                    written_non_null += rc - nc;
755                    merge_partial(&mut array_stats, min, max, nc, rc);
756                    let alloc = allocator.allocate(&raw);
757                    new_chunks.push(ChunkEntry {
758                        coord: coord.clone(),
759                        address: ChunkAddress::from(alloc),
760                    });
761                }
762            }
763            array_stats.row_count = shape_product;
764            array_stats.null_count = shape_product - written_non_null;
765            per_array_stats.push(array_stats);
766
767            let mut new_meta = meta;
768            new_meta.layout.storage.chunks = new_chunks;
769            arrays.push(new_meta);
770        }
771
772        let crate::delta::AllocatorOutput {
773            mut file,
774            output_size,
775            blocks,
776        } = allocator.commit().await;
777
778        // Build attr dictionaries from all layers (simple union).
779        let mut attr_keys: Vec<String> = Vec::new();
780        let mut attr_values: Vec<crate::layout::AttributeValue> = Vec::new();
781        for delta in &self.deltas {
782            for k in &delta.inner.footer.attr_keys {
783                if !attr_keys.contains(k) {
784                    attr_keys.push(k.clone());
785                }
786            }
787            for v in &delta.inner.footer.attr_values {
788                if !attr_values.contains(v) {
789                    attr_values.push(v.clone());
790                }
791            }
792        }
793
794        let footer = Footer {
795            version: FOOTER_VERSION,
796            blocks,
797            arrays,
798            attr_keys,
799            attr_values,
800            overlay_index: 0,
801            base_file_hint: String::new(),
802        };
803        let footer_bytes = footer.serialize()?;
804
805        // Write the new base file.
806        let sd = &self.store_dir;
807        // Delete old sidecars first.
808        for i in 1..self.deltas.len() {
809            let _ = sd
810                .store
811                .delete(&sidecar_path(&sd.base_path, i as u32))
812                .await;
813        }
814        let base_storage: Arc<dyn Storage> = Arc::new(ObjectStoreBackend::new(
815            Arc::clone(&sd.store),
816            sd.base_path.clone(),
817        ));
818
819        write_file_then_bytes(&mut file, output_size, &footer_bytes, &*base_storage).await?;
820        let base_delta_path: Arc<str> = Arc::from(sd.base_path.as_ref());
821        let new_base =
822            Delta::<DeltaImmutable>::open(base_storage, base_delta_path, self.cache.clone())
823                .await?;
824        self.deltas = vec![new_base];
825
826        let mut new_stats = StatsFile::default();
827        for s in per_array_stats {
828            new_stats.upsert(s);
829        }
830        let s_storage = ObjectStoreBackend::new(
831            Arc::clone(&self.store_dir.store),
832            stats_path(&self.store_dir.base_path),
833        );
834        s_storage
835            .write(bytes::Bytes::from(new_stats.serialize()?))
836            .await?;
837        self.stats = Some(new_stats);
838        Ok(())
839    }
840}
841
842// ── Helpers ──────────────────────────────────────────────────────────
843
844fn resolve_cache<C: CompressionCodec>(config: &FileConfig<C>) -> Option<Arc<DeltaCache>> {
845    if let Some(c) = &config.cache {
846        Some(Arc::clone(c))
847    } else if config.cache_capacity == 0 && config.io_cache_capacity == 0 {
848        None
849    } else {
850        Some(Arc::new(DeltaCache::new(
851            config.cache_capacity as u64,
852            config.io_cache_capacity as u64,
853        )))
854    }
855}
856
857fn sidecar_path(base: &object_store::path::Path, n: u32) -> object_store::path::Path {
858    let s = base.as_ref();
859    let without_af = s.strip_suffix(".af").unwrap_or(s);
860    object_store::path::Path::from(format!("{without_af}.{n}.af").as_str())
861}
862
863fn stats_path(base: &object_store::path::Path) -> object_store::path::Path {
864    let s = base.as_ref();
865    let without_af = s.strip_suffix(".af").unwrap_or(s);
866    object_store::path::Path::from(format!("{without_af}.stats").as_str())
867}
868
869async fn discover_sidecars_store(
870    store: &dyn ObjectStore,
871    base_path: &object_store::path::Path,
872) -> Result<Vec<(u32, object_store::path::Path)>> {
873    use futures::TryStreamExt;
874    let base_str = base_path.as_ref();
875    let stem_prefix = base_str
876        .strip_suffix(".af")
877        .ok_or_else(|| Error::Storage("path must end with .af".into()))?;
878    let list_prefix = base_str
879        .rfind('/')
880        .map(|pos| object_store::path::Path::from(&base_str[..pos]));
881    let objects: Vec<_> = store
882        .list(list_prefix.as_ref())
883        .try_collect()
884        .await
885        .map_err(|e| Error::Storage(e.to_string()))?;
886    let mut sidecars: Vec<(u32, object_store::path::Path)> = objects
887        .into_iter()
888        .filter_map(|meta| {
889            let s = meta.location.as_ref();
890            let rest = s.strip_prefix(stem_prefix)?.strip_prefix('.')?;
891            let (num_str, ext) = rest.rsplit_once('.')?;
892            if ext != "af" {
893                return None;
894            }
895            let n: u32 = num_str.parse().ok()?;
896            if n == 0 {
897                return None;
898            }
899            Some((n, meta.location))
900        })
901        .collect();
902    sidecars.sort_by_key(|(n, _)| *n);
903    Ok(sidecars)
904}
905
906async fn write_empty_base(storage: &dyn Storage) -> Result<()> {
907    let footer = Footer::new();
908    let bytes = footer.serialize()?;
909    storage.write(Bytes::from(bytes)).await
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915    use crate::codec::NoCompression;
916
917    #[tokio::test]
918    async fn shared_cache_is_reused_across_files() {
919        let shared = Arc::new(DeltaCache::new(1024 * 1024, 0));
920
921        let mut cfg_a = FileConfig::new(NoCompression);
922        cfg_a.cache = Some(Arc::clone(&shared));
923        let file_a = ArrayFile::create_memory(cfg_a).await.unwrap();
924
925        let mut cfg_b = FileConfig::new(NoCompression);
926        cfg_b.cache = Some(Arc::clone(&shared));
927        let file_b = ArrayFile::create_memory(cfg_b).await.unwrap();
928
929        let a = file_a.cache.as_ref().expect("file_a has cache");
930        let b = file_b.cache.as_ref().expect("file_b has cache");
931        assert!(Arc::ptr_eq(a, &shared));
932        assert!(Arc::ptr_eq(b, &shared));
933    }
934}