Skip to main content

array_format/
file.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use indexmap::IndexMap;
5use object_store::{ObjectStore, ObjectStoreExt};
6
7use crate::{
8    DType, Error, Result,
9    address::ChunkAddress,
10    array::ArrayElement,
11    codec::CompressionCodec,
12    delta::{
13        Delta, DeltaAllocator, DeltaCache, DeltaImmutable, DeltaMutable, write_file_then_bytes,
14    },
15    footer::{FOOTER_VERSION, Footer},
16    layout::{
17        ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
18        StorageLayout,
19    },
20    stats::{ArrayStats, StatsFile, compute_chunk_partial, merge_partial, read_stats_file},
21    storage::{InMemoryStorage, ObjectStoreBackend, Storage},
22};
23
24// ── Constants ───────────────────────────────────────────────────────
25
26/// Default target size for a data block before a new one is started (8 MiB).
27pub const DEFAULT_BLOCK_TARGET_SIZE: usize = 8 * 1024 * 1024; // 8 MiB
28/// Default byte budget for the decompressed-block cache (256 MiB).
29pub const DEFAULT_CACHE_CAPACITY: usize = 256 * 1024 * 1024; // 256 MiB
30/// Default byte budget for the raw I/O slab cache (64 MiB); useful for object-store workloads.
31pub const DEFAULT_IO_CACHE_CAPACITY: usize = 64 * 1024 * 1024; // 64 MiB; enable for object-store workloads
32
33// ── FileConfig ──────────────────────────────────────────────────────
34
35/// Configuration for opening or creating an [`ArrayFile`].
36///
37/// Construct with [`FileConfig::new`] for the defaults, then override fields as
38/// needed:
39///
40/// ```
41/// use array_format::{FileConfig, ZstdCodec};
42///
43/// let config = FileConfig {
44///     block_target_size: 4 * 1024 * 1024,
45///     ..FileConfig::new(ZstdCodec { level: 9 })
46/// };
47/// ```
48pub struct FileConfig<C: CompressionCodec> {
49    /// Compression codec applied to data blocks on write.
50    pub codec: C,
51    /// Target size of a data block before a new block is started, in bytes.
52    pub block_target_size: usize,
53    /// Byte budget for this file's decompressed-block cache.
54    ///
55    /// Ignored when [`cache`](Self::cache) is `Some`.
56    pub cache_capacity: usize,
57    /// Byte budget for this file's raw I/O slab cache (0 disables it).
58    ///
59    /// Ignored when [`cache`](Self::cache) is `Some`.
60    pub io_cache_capacity: usize,
61    /// Optional pre-built cache to share across multiple [`ArrayFile`]s.
62    ///
63    /// When `Some`, [`cache_capacity`](Self::cache_capacity) and
64    /// [`io_cache_capacity`](Self::io_cache_capacity) are ignored and every file
65    /// sharing this cache is bounded by one combined byte budget. Entries are
66    /// keyed by `(file_path, block_id)`, so files do not interfere.
67    pub cache: Option<Arc<DeltaCache>>,
68}
69
70impl<C: CompressionCodec> FileConfig<C> {
71    /// Creates a config using `codec` and the `DEFAULT_*` capacities, with no
72    /// shared cache.
73    pub fn new(codec: C) -> Self {
74        Self {
75            codec,
76            block_target_size: DEFAULT_BLOCK_TARGET_SIZE,
77            cache_capacity: DEFAULT_CACHE_CAPACITY,
78            io_cache_capacity: DEFAULT_IO_CACHE_CAPACITY,
79            cache: None,
80        }
81    }
82}
83
84// ── MergedArrayMeta ─────────────────────────────────────────────────
85
86/// Array metadata visible to the caller after merging all delta layers.
87///
88/// Returned by [`ArrayFile::list_arrays`].
89#[derive(Debug, Clone)]
90pub struct MergedArrayMeta {
91    /// Array name (unique within the file).
92    pub name: String,
93    /// Element type.
94    pub dtype: DType,
95    /// Full array shape, one entry per dimension.
96    pub shape: Vec<u32>,
97    /// Chunk shape; equals [`shape`](Self::shape) for single-chunk arrays.
98    pub chunk_shape: Vec<u32>,
99    /// Name of each dimension.
100    pub dimension_names: Vec<String>,
101    /// Fill value used for unwritten elements, if one was set at definition.
102    pub fill_value: Option<FillValue>,
103}
104
105// ── File ────────────────────────────────────────────────────────────
106
107/// Object store and base-file path for an on-disk file.
108struct StoreDir {
109    store: Arc<dyn ObjectStore>,
110    base_path: object_store::path::Path,
111}
112
113/// Schema information returned by [`File::get_chunked_schema`].
114pub(crate) struct ChunkedSchema {
115    pub full_shape: Vec<u32>,
116    pub chunk_shape: Vec<u32>,
117    pub dtype: DType,
118    pub all_coords: Vec<Vec<u32>>,
119}
120
121/// The top-level file handle.
122///
123/// Layers are stacked oldest → newest in `deltas`. Uncommitted writes
124/// accumulate in a disk-backed mutable delta (`pending`) and are flushed
125/// by [`flush`](Self::flush). The mutable delta is created lazily on the
126/// first mutation after open/flush.
127pub struct ArrayFile {
128    deltas: Vec<Delta<DeltaImmutable>>,
129    pending: Option<Delta<DeltaMutable>>,
130    codec: Arc<dyn CompressionCodec>,
131    block_target_size: usize,
132    cache: Option<Arc<DeltaCache>>,
133    /// Object store and stem for on-disk files; `None` for in-memory files.
134    store_dir: Option<StoreDir>,
135    /// Per-array aggregate statistics; `None` until first flush or open.
136    stats: Option<StatsFile>,
137}
138
139// ── Constructors ────────────────────────────────────────────────────
140
141impl ArrayFile {
142    /// Creates a new empty file at `path` within `store`.
143    ///
144    /// `path` is the base file object and should end in `.af`; sidecars
145    /// (`{stem}.N.af`) and the stats file (`{stem}.stats`) are written alongside
146    /// it in the same prefix. Fails if an object already exists at `path` only
147    /// insofar as the backend allows overwriting — the base is (re)written empty.
148    pub async fn create<C: CompressionCodec + 'static>(
149        store: Arc<dyn ObjectStore>,
150        path: object_store::path::Path,
151        config: FileConfig<C>,
152    ) -> Result<Self> {
153        let cache = resolve_cache(&config);
154        let delta_path = Arc::<str>::from(path.as_ref());
155        let storage =
156            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
157        write_empty_base(&*storage).await?;
158        let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
159        Ok(ArrayFile {
160            deltas: vec![base_delta],
161            pending: None,
162            codec: Arc::new(config.codec),
163            block_target_size: config.block_target_size,
164            cache,
165            store_dir: Some(StoreDir {
166                store,
167                base_path: path,
168            }),
169            stats: None,
170        })
171    }
172
173    /// Opens an existing file from `store`, discovering the base and all
174    /// sidecar layers under the same stem.
175    ///
176    /// `path` must end in `.af`. Aggregate statistics are loaded from
177    /// `{stem}.stats` if present; a missing or unreadable stats file is not an
178    /// error (see [`array_stats`](Self::array_stats)).
179    pub async fn open<C: CompressionCodec + 'static>(
180        store: Arc<dyn ObjectStore>,
181        path: object_store::path::Path,
182        config: FileConfig<C>,
183    ) -> Result<Self> {
184        let cache = resolve_cache(&config);
185        let delta_path = Arc::<str>::from(path.as_ref());
186        let storage =
187            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
188        let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
189        let mut deltas = vec![base_delta];
190
191        let sidecars = discover_sidecars_store(&*store, &path).await?;
192        for (_, scar_path) in sidecars {
193            let scar_delta_path = Arc::<str>::from(scar_path.as_ref());
194            let scar_storage = Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path))
195                as Arc<dyn Storage>;
196            deltas.push(
197                Delta::<DeltaImmutable>::open(scar_storage, scar_delta_path, cache.clone()).await?,
198            );
199        }
200
201        let stats = {
202            let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&path));
203            read_stats_file(&s_storage).await.ok()
204        };
205
206        Ok(ArrayFile {
207            deltas,
208            pending: None,
209            codec: Arc::new(config.codec),
210            block_target_size: config.block_target_size,
211            cache,
212            store_dir: Some(StoreDir {
213                store,
214                base_path: path,
215            }),
216            stats,
217        })
218    }
219
220    /// Creates a new empty in-memory file.
221    ///
222    /// Has no backing object store; commit pending writes with
223    /// [`flush_memory`](Self::flush_memory) rather than [`flush`](Self::flush).
224    /// Useful for tests and ephemeral pipelines.
225    pub async fn create_memory<C: CompressionCodec + 'static>(
226        config: FileConfig<C>,
227    ) -> Result<Self> {
228        let cache = resolve_cache(&config);
229        let storage = Arc::new(InMemoryStorage::new()) as Arc<dyn Storage>;
230        write_empty_base(&*storage).await?;
231        let base_delta =
232            Delta::<DeltaImmutable>::open(storage, Arc::from("__memory_0__"), cache.clone())
233                .await?;
234        Ok(ArrayFile {
235            deltas: vec![base_delta],
236            pending: None,
237            codec: Arc::new(config.codec),
238            block_target_size: config.block_target_size,
239            cache,
240            store_dir: None,
241            stats: None,
242        })
243    }
244}
245
246// ── Schema & attribute access ────────────────────────────────────────
247
248impl ArrayFile {
249    /// Returns a reference to the merged array metadata for `name`,
250    /// searching from the newest layer towards the oldest.
251    pub fn get_array(&self, name: &str) -> Result<&ArrayMeta> {
252        self.resolve_array_meta(name)
253            .ok_or_else(|| Error::ArrayNotFound {
254                name: name.to_string(),
255            })
256    }
257
258    fn resolve_array_meta(&self, name: &str) -> Option<&ArrayMeta> {
259        if let Some(p) = self.pending.as_ref()
260            && let Some(m) = p.inner.array_meta.get(name)
261        {
262            return if m.deleted { None } else { Some(m) };
263        }
264        for delta in self.deltas.iter().rev() {
265            if let Some(&i) = delta.inner.array_index.get(name) {
266                let m = &delta.inner.footer.arrays[i];
267                return if m.deleted { None } else { Some(m) };
268            }
269        }
270        None
271    }
272
273    /// Lazily creates a mutable pending delta and returns a mutable reference.
274    fn pending_mut(&mut self) -> &mut Delta<DeltaMutable> {
275        if self.pending.is_none() {
276            let overlay_index = self.deltas.len() as u32;
277            self.pending = Some(Delta::<DeltaMutable>::new(
278                Arc::clone(&self.codec),
279                self.block_target_size,
280                overlay_index,
281            ));
282        }
283        self.pending.as_mut().unwrap()
284    }
285
286    /// Returns the array schema in the form expected by the ndarray write path.
287    pub(crate) fn get_chunked_schema(&self, name: &str) -> Result<ChunkedSchema> {
288        let meta = self.get_array(name)?;
289        let full_shape = meta.layout.shape.clone();
290        let chunk_shape = meta.layout.storage.chunk_shape.clone();
291        let dtype = meta.dtype.clone();
292        // Collect existing chunk coords from all layers (newest wins, so just union).
293        let mut existing: IndexMap<Vec<u32>, ()> = IndexMap::new();
294        for delta in &self.deltas {
295            if let Some(&i) = delta.inner.array_index.get(name) {
296                for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
297                    existing.entry(e.coord.clone()).or_default();
298                }
299            }
300        }
301        if let Some(p) = self.pending.as_ref()
302            && let Some(m) = p.inner.array_meta.get(name)
303        {
304            for e in &m.layout.storage.chunks {
305                existing.entry(e.coord.clone()).or_default();
306            }
307        }
308        Ok(ChunkedSchema {
309            full_shape,
310            chunk_shape,
311            dtype,
312            all_coords: existing.into_keys().collect(),
313        })
314    }
315
316    /// Returns all non-deleted visible arrays (newest-wins merge).
317    pub fn list_arrays(&self) -> Vec<MergedArrayMeta> {
318        let mut seen: IndexMap<String, MergedArrayMeta> = IndexMap::new();
319
320        // Walk from oldest to newest so later entries overwrite earlier ones.
321        for delta in &self.deltas {
322            for a in &delta.inner.footer.arrays {
323                if a.deleted {
324                    seen.shift_remove(&a.name);
325                } else {
326                    seen.insert(
327                        a.name.clone(),
328                        MergedArrayMeta {
329                            name: a.name.clone(),
330                            dtype: a.dtype.clone(),
331                            shape: a.layout.shape.clone(),
332                            chunk_shape: a.layout.storage.chunk_shape.clone(),
333                            dimension_names: a.layout.dimension_names.clone(),
334                            fill_value: a.fill_value.clone(),
335                        },
336                    );
337                }
338            }
339        }
340        if let Some(p) = self.pending.as_ref() {
341            for (name, a) in &p.inner.array_meta {
342                if a.deleted {
343                    seen.shift_remove(name);
344                } else {
345                    seen.insert(
346                        name.clone(),
347                        MergedArrayMeta {
348                            name: a.name.clone(),
349                            dtype: a.dtype.clone(),
350                            shape: a.layout.shape.clone(),
351                            chunk_shape: a.layout.storage.chunk_shape.clone(),
352                            dimension_names: a.layout.dimension_names.clone(),
353                            fill_value: a.fill_value.clone(),
354                        },
355                    );
356                }
357            }
358        }
359        seen.into_values().collect()
360    }
361
362    /// Returns aggregate statistics for `name`, or `None` if no stats exist yet.
363    pub fn array_stats(&self, name: &str) -> Option<&ArrayStats> {
364        self.stats.as_ref()?.get_array(name)
365    }
366
367    /// Number of committed (immutable) delta layers.
368    pub fn num_layers(&self) -> usize {
369        self.deltas.len()
370    }
371
372    /// Returns the value of attribute `key` on array `name`, or `None` if the
373    /// array has no such attribute. Errors if the array does not exist.
374    pub fn get_attribute(&self, name: &str, key: &str) -> Result<Option<&AttributeValue>> {
375        let meta = self.get_array(name)?;
376        let key_idx = match self
377            .pending
378            .as_ref()
379            .and_then(|p| p.inner.attr_keys.iter().position(|k| k == key))
380            .or_else(|| {
381                // Check global dicts in most-recent delta
382                self.deltas
383                    .iter()
384                    .rev()
385                    .find_map(|d| d.inner.footer.attr_keys.iter().position(|k| k == key))
386            }) {
387            Some(i) => i,
388            None => return Ok(None),
389        };
390        let val_idx = match meta.attributes.get(key_idx) {
391            Some(i) => i,
392            None => return Ok(None),
393        };
394        // Look up in pending first, then deltas
395        if let Some(p) = self.pending.as_ref()
396            && val_idx < p.inner.attr_values.len()
397        {
398            return Ok(Some(&p.inner.attr_values[val_idx]));
399        }
400        for delta in self.deltas.iter().rev() {
401            if val_idx < delta.inner.footer.attr_values.len() {
402                return Ok(Some(&delta.inner.footer.attr_values[val_idx]));
403            }
404        }
405        Ok(None)
406    }
407
408    /// Sets attribute `key` on array `name` to `value`, inserting or replacing
409    /// any existing entry. The change lands in the pending layer and is
410    /// persisted on the next [`flush`](Self::flush). Errors if the array does
411    /// not exist.
412    pub fn set_attribute(&mut self, name: &str, key: &str, value: AttributeValue) -> Result<()> {
413        // Ensure the array exists (in deltas or pending), and snapshot its meta
414        // in case we need to copy it down into the pending mutable delta.
415        // Clear the cloned chunks list so we don't carry stale block addresses
416        // from a lower layer into this delta's footer.
417        let mut existing_meta = self.get_array(name)?.clone();
418        existing_meta.layout.storage.chunks.clear();
419
420        let pending = self.pending_mut();
421        let key_idx = pending.intern_attr_key(key);
422        let val_idx = pending.intern_attr_value(value);
423
424        // Update the array meta in pending (copy from lower layer if absent).
425        if pending.array_meta_mut(name).is_none() {
426            pending.upsert_array_meta(existing_meta);
427        }
428        let meta = pending.array_meta_mut(name).unwrap();
429        meta.attributes.upsert(key_idx, val_idx);
430        Ok(())
431    }
432}
433
434// ── Array definition / deletion ──────────────────────────────────────
435
436impl ArrayFile {
437    /// Defines a new array in the pending layer.
438    ///
439    /// `shape` is the full array shape; `chunk_shape` tiles it into a grid of
440    /// independently stored chunks, or `None` to store the whole array as a
441    /// single chunk. If `dimension_names` does not have one entry per dimension
442    /// it is replaced with `dim0`, `dim1`, … . `fill_value` is returned for
443    /// elements that are never written.
444    ///
445    /// Errors with [`Error::ArrayAlreadyExists`] if an array of this name is
446    /// already visible. The definition is persisted on the next
447    /// [`flush`](Self::flush).
448    pub fn define_array<T: ArrayElement>(
449        &mut self,
450        name: impl Into<String>,
451        dimension_names: Vec<String>,
452        shape: Vec<usize>,
453        chunk_shape: Option<Vec<usize>>,
454        fill_value: Option<FillValue>,
455    ) -> Result<()> {
456        let name = name.into();
457        if self.resolve_array_meta(&name).is_some() {
458            return Err(Error::ArrayAlreadyExists { name });
459        }
460        let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
461        let ndim = shape_u32.len();
462        let chunk_shape_u32: Vec<u32> = chunk_shape
463            .map(|cs| cs.iter().map(|&s| s as u32).collect())
464            .unwrap_or_else(|| shape_u32.clone());
465        let dim_names = if dimension_names.len() == ndim {
466            dimension_names
467        } else {
468            (0..ndim).map(|i| format!("dim{i}")).collect()
469        };
470        let layout = ArrayLayout {
471            shape: shape_u32,
472            dimension_names: dim_names,
473            storage: StorageLayout {
474                chunk_shape: chunk_shape_u32,
475                chunks: vec![],
476            },
477        };
478        self.pending_mut().upsert_array_meta(ArrayMeta {
479            name,
480            dtype: T::DTYPE,
481            layout,
482            fill_value,
483            deleted: false,
484            attributes: Attributes::empty(AttrIndexKind::U16),
485        });
486        Ok(())
487    }
488
489    /// Logically deletes an array by writing a tombstone to the pending layer.
490    ///
491    /// The array is excluded from [`list_arrays`](Self::list_arrays) and all
492    /// reads immediately, but its bytes remain on disk until
493    /// [`compact`](Self::compact) reclaims them.
494    pub fn delete(&mut self, name: &str) -> Result<()> {
495        let meta = self.get_array(name)?.clone();
496        self.pending_mut().mark_deleted(meta);
497        Ok(())
498    }
499}
500
501// ── Chunk-level read/write (pub(crate) for ndarray_ext) ──────────────
502
503impl ArrayFile {
504    pub(crate) async fn read_chunk<T: ArrayElement>(
505        &self,
506        name: &str,
507        coord: &[u32],
508    ) -> Result<Vec<T>> {
509        if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
510            return Ok(T::decode_chunk(&bytes));
511        }
512        let meta = self.get_array(name)?;
513        let chunk_elems: usize = meta
514            .layout
515            .storage
516            .chunk_shape
517            .iter()
518            .enumerate()
519            .map(|(i, &cs)| {
520                let axis_len = meta.layout.shape[i] as usize;
521                let start = coord[i] as usize * cs as usize;
522                (cs as usize).min(axis_len.saturating_sub(start))
523            })
524            .product();
525        Ok(vec![T::fill_element(meta.fill_value.as_ref()); chunk_elems])
526    }
527
528    pub(crate) fn write_chunk_raw(
529        &mut self,
530        name: &str,
531        coord: Vec<u32>,
532        bytes: Vec<u8>,
533    ) -> Result<()> {
534        // If the array isn't yet present in pending, copy its meta down so the
535        // mutable delta has an entry to attach the chunk to. Clear the cloned
536        // chunks list — the lower-layer addresses don't apply to this delta's
537        // data file, and only chunks written into this session belong here.
538        let snapshot = if self
539            .pending
540            .as_ref()
541            .and_then(|p| p.inner.array_meta.get(name))
542            .is_none()
543        {
544            let mut m = self.get_array(name)?.clone();
545            m.layout.storage.chunks.clear();
546            Some(m)
547        } else {
548            None
549        };
550        let pending = self.pending_mut();
551        if let Some(meta) = snapshot {
552            pending.upsert_array_meta(meta);
553        }
554        pending.write_raw_chunk(name, coord, &bytes)
555    }
556
557    async fn resolve_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
558        if let Some(p) = self.pending.as_ref()
559            && let Some(bytes) = p.read_raw_chunk(name, coord)
560        {
561            return Ok(Some(bytes));
562        }
563        for delta in self.deltas.iter().rev() {
564            if let Some(bytes) = delta.read_raw_chunk(name, coord).await? {
565                return Ok(Some(bytes));
566            }
567        }
568        Ok(None)
569    }
570}
571
572// ── ndarray read/write ───────────────────────────────────────────────
573
574impl ArrayFile {
575    /// Writes `data` into array `name` with its origin at coordinate `start`.
576    ///
577    /// The region may span multiple chunks and need not be chunk-aligned;
578    /// partially covered chunks are read-modify-written automatically. `T` must
579    /// match the array's declared dtype, otherwise [`Error::DTypeMismatch`] is
580    /// returned. Writes accumulate in the pending layer until
581    /// [`flush`](Self::flush).
582    pub async fn write_array<T: ArrayElement>(
583        &mut self,
584        name: &str,
585        start: Vec<usize>,
586        data: ndarray::ArrayView<'_, T, ndarray::IxDyn>,
587    ) -> Result<()> {
588        crate::ndarray_ext::write_nd(self, name, data, &start).await
589    }
590
591    /// Reads the sub-region of array `name` starting at `start` with the given
592    /// `shape`.
593    ///
594    /// Pass `vec![], vec![]` to read the whole array. Chunks that were never
595    /// written are materialized from the array's fill value. `T` must match the
596    /// array's declared dtype, otherwise [`Error::DTypeMismatch`] is returned.
597    pub async fn read_array<T: ArrayElement>(
598        &self,
599        name: &str,
600        start: Vec<usize>,
601        shape: Vec<usize>,
602    ) -> Result<ndarray::ArcArray<T, ndarray::IxDyn>> {
603        use std::ops::Range;
604        let slice: Option<Vec<Range<usize>>> = if start.is_empty() && shape.is_empty() {
605            None
606        } else {
607            let meta = self.get_array(name)?;
608            let ndim = meta.layout.shape.len();
609            let effective_start = if start.len() == ndim {
610                start.clone()
611            } else {
612                vec![0; ndim]
613            };
614            let effective_shape: Vec<usize> = if shape.len() == ndim {
615                shape.clone()
616            } else {
617                meta.layout.shape.iter().map(|&s| s as usize).collect()
618            };
619            Some(
620                effective_start
621                    .iter()
622                    .zip(&effective_shape)
623                    .map(|(&s, &sz)| s..s + sz)
624                    .collect(),
625            )
626        };
627        crate::ndarray_ext::assemble_nd(self, name, slice.as_deref()).await
628    }
629}
630
631// ── Flush ────────────────────────────────────────────────────────────
632
633impl ArrayFile {
634    /// Commits pending writes to a new on-disk sidecar layer and refreshes the
635    /// `{stem}.stats` file.
636    ///
637    /// A no-op if there are no pending changes. Errors for in-memory files —
638    /// use [`flush_memory`](Self::flush_memory) instead.
639    pub async fn flush(&mut self) -> Result<()> {
640        if self.pending.is_none() {
641            return Ok(());
642        }
643        let (store, base_path) = match &self.store_dir {
644            Some(sd) => (Arc::clone(&sd.store), sd.base_path.clone()),
645            None => {
646                return Err(Error::Storage(
647                    "in-memory file: use flush_memory instead".into(),
648                ));
649            }
650        };
651        let overlay_index = self.deltas.len() as u32;
652        let scar_path = sidecar_path(&base_path, overlay_index);
653        let delta_path = Arc::<str>::from(scar_path.as_ref());
654        let storage =
655            Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path)) as Arc<dyn Storage>;
656        let hint = base_path.as_ref().to_string();
657        let dirty_names = self.commit_pending(storage, delta_path, hint).await?;
658
659        let merged = self.compute_stats_for(&dirty_names).await?;
660        let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&base_path));
661        s_storage
662            .write(bytes::Bytes::from(merged.serialize()?))
663            .await?;
664        self.stats = Some(merged);
665        Ok(())
666    }
667
668    /// Commits pending writes as a new in-memory layer, writing the serialized
669    /// sidecar into `storage`.
670    ///
671    /// The in-memory counterpart to [`flush`](Self::flush); a no-op if there
672    /// are no pending changes.
673    pub async fn flush_memory(&mut self, storage: &InMemoryStorage) -> Result<()> {
674        if self.pending.is_none() {
675            return Ok(());
676        }
677        let overlay_index = self.deltas.len() as u32;
678        let delta_path = Arc::<str>::from(format!("__memory_{overlay_index}__").as_str());
679        let arc: Arc<dyn Storage> = Arc::new(storage.clone());
680        let dirty_names = self.commit_pending(arc, delta_path, String::new()).await?;
681
682        let merged = self.compute_stats_for(&dirty_names).await?;
683        self.stats = Some(merged);
684        Ok(())
685    }
686
687    async fn compute_stats_for(&self, dirty_names: &[String]) -> Result<StatsFile> {
688        let mut merged = self.stats.clone().unwrap_or_default();
689        for name in dirty_names {
690            let schema = match self.get_chunked_schema(name) {
691                Ok(s) => s,
692                Err(_) => continue,
693            };
694            let fill_value = self
695                .resolve_array_meta(name)
696                .and_then(|m| m.fill_value.clone());
697            let shape_product: u64 = schema.full_shape.iter().map(|&s| s as u64).product();
698            let mut stats = ArrayStats::new(name.clone());
699            let mut written_non_null: u64 = 0;
700            for coord in &schema.all_coords {
701                if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
702                    let (min, max, nc, rc) =
703                        compute_chunk_partial(&bytes, &schema.dtype, fill_value.as_ref());
704                    written_non_null += rc - nc;
705                    merge_partial(&mut stats, min, max, nc, rc);
706                }
707            }
708            stats.row_count = shape_product;
709            stats.null_count = shape_product - written_non_null;
710            merged.upsert(stats);
711        }
712        Ok(merged)
713    }
714
715    /// Commits the pending mutable delta to `storage`, appends the resulting
716    /// immutable delta to `self.deltas`, and returns the names of arrays that
717    /// had dirty chunks (used to recompute stats).
718    async fn commit_pending(
719        &mut self,
720        storage: Arc<dyn Storage>,
721        delta_path: Arc<str>,
722        base_file_hint: String,
723    ) -> Result<Vec<String>> {
724        let mutable = self
725            .pending
726            .take()
727            .expect("commit_pending: no pending delta");
728        let dirty_names: Vec<String> = mutable
729            .inner
730            .array_meta
731            .iter()
732            .filter(|(_, m)| !m.layout.storage.chunks.is_empty())
733            .map(|(name, _)| name.clone())
734            .collect();
735        let immutable = mutable
736            .commit(storage, delta_path, self.cache.clone(), base_file_hint)
737            .await?;
738        self.deltas.push(immutable);
739        Ok(dirty_names)
740    }
741}
742
743// ── Compact ──────────────────────────────────────────────────────────
744
745impl ArrayFile {
746    /// Merges all committed layers into a single new base file, deleting the
747    /// sidecars and reclaiming space held by overwritten and tombstoned chunks.
748    ///
749    /// After a successful compaction [`num_layers`](Self::num_layers) returns
750    /// `1`. Recomputes and rewrites the `{stem}.stats` file.
751    pub async fn compact(&mut self) -> Result<()> {
752        // Build the merged view.
753        let merged_names: Vec<String> = self.list_arrays().into_iter().map(|m| m.name).collect();
754
755        // Allocate all chunks for merged arrays.
756        let mut allocator = DeltaAllocator::new(Arc::clone(&self.codec), self.block_target_size);
757        let mut arrays: Vec<ArrayMeta> = Vec::new();
758        let mut per_array_stats: Vec<ArrayStats> = Vec::new();
759
760        for name in &merged_names {
761            let meta = self
762                .resolve_array_meta(name)
763                .ok_or_else(|| Error::ArrayNotFound { name: name.clone() })?
764                .clone();
765
766            // Collect all chunk coords across all layers for this array.
767            let mut all_coords: indexmap::IndexSet<Vec<u32>> = indexmap::IndexSet::new();
768            for delta in &self.deltas {
769                if let Some(&i) = delta.inner.array_index.get(name.as_str()) {
770                    for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
771                        all_coords.insert(e.coord.clone());
772                    }
773                }
774            }
775
776            let shape_product: u64 = meta.layout.shape.iter().map(|&s| s as u64).product();
777            let mut new_chunks: Vec<ChunkEntry> = Vec::new();
778            let mut array_stats = ArrayStats::new(name.clone());
779            let mut written_non_null: u64 = 0;
780            for coord in &all_coords {
781                // Read from newest layer that has this chunk.
782                if let Some(raw) = self.resolve_raw_chunk(name, coord).await? {
783                    let (min, max, nc, rc) =
784                        compute_chunk_partial(&raw, &meta.dtype, meta.fill_value.as_ref());
785                    written_non_null += rc - nc;
786                    merge_partial(&mut array_stats, min, max, nc, rc);
787                    let alloc = allocator.allocate(&raw);
788                    new_chunks.push(ChunkEntry {
789                        coord: coord.clone(),
790                        address: ChunkAddress::from(alloc),
791                    });
792                }
793            }
794            array_stats.row_count = shape_product;
795            array_stats.null_count = shape_product - written_non_null;
796            per_array_stats.push(array_stats);
797
798            let mut new_meta = meta;
799            new_meta.layout.storage.chunks = new_chunks;
800            arrays.push(new_meta);
801        }
802
803        let crate::delta::AllocatorOutput {
804            mut file,
805            output_size,
806            blocks,
807        } = allocator.commit().await;
808
809        // Build attr dictionaries from all layers (simple union).
810        let mut attr_keys: Vec<String> = Vec::new();
811        let mut attr_values: Vec<crate::layout::AttributeValue> = Vec::new();
812        for delta in &self.deltas {
813            for k in &delta.inner.footer.attr_keys {
814                if !attr_keys.contains(k) {
815                    attr_keys.push(k.clone());
816                }
817            }
818            for v in &delta.inner.footer.attr_values {
819                if !attr_values.contains(v) {
820                    attr_values.push(v.clone());
821                }
822            }
823        }
824
825        let footer = Footer {
826            version: FOOTER_VERSION,
827            blocks,
828            arrays,
829            attr_keys,
830            attr_values,
831            overlay_index: 0,
832            base_file_hint: String::new(),
833        };
834        let footer_bytes = footer.serialize()?;
835
836        // Write the new base file.
837        let base_storage: Arc<dyn Storage> = if let Some(sd) = &self.store_dir {
838            // Delete old sidecars first.
839            for i in 1..self.deltas.len() {
840                let _ = sd
841                    .store
842                    .delete(&sidecar_path(&sd.base_path, i as u32))
843                    .await;
844            }
845            // Write new base.
846            Arc::new(ObjectStoreBackend::new(
847                Arc::clone(&sd.store),
848                sd.base_path.clone(),
849            ))
850        } else {
851            // In-memory: reuse the first layer's storage.
852            Arc::clone(&self.deltas[0].inner.storage)
853        };
854
855        write_file_then_bytes(&mut file, output_size, &footer_bytes, &*base_storage).await?;
856        let base_delta_path: Arc<str> = if let Some(sd) = &self.store_dir {
857            Arc::from(sd.base_path.as_ref())
858        } else {
859            Arc::from("__memory_0__")
860        };
861        let new_base =
862            Delta::<DeltaImmutable>::open(base_storage, base_delta_path, self.cache.clone())
863                .await?;
864        self.deltas = vec![new_base];
865
866        let mut new_stats = StatsFile::default();
867        for s in per_array_stats {
868            new_stats.upsert(s);
869        }
870        if let Some(sd) = &self.store_dir {
871            let s_storage =
872                ObjectStoreBackend::new(Arc::clone(&sd.store), stats_path(&sd.base_path));
873            s_storage
874                .write(bytes::Bytes::from(new_stats.serialize()?))
875                .await?;
876        }
877        self.stats = Some(new_stats);
878        Ok(())
879    }
880}
881
882// ── Helpers ──────────────────────────────────────────────────────────
883
884fn resolve_cache<C: CompressionCodec>(config: &FileConfig<C>) -> Option<Arc<DeltaCache>> {
885    if let Some(c) = &config.cache {
886        Some(Arc::clone(c))
887    } else if config.cache_capacity == 0 && config.io_cache_capacity == 0 {
888        None
889    } else {
890        Some(Arc::new(DeltaCache::new(
891            config.cache_capacity as u64,
892            config.io_cache_capacity as u64,
893        )))
894    }
895}
896
897fn sidecar_path(base: &object_store::path::Path, n: u32) -> object_store::path::Path {
898    let s = base.as_ref();
899    let without_af = s.strip_suffix(".af").unwrap_or(s);
900    object_store::path::Path::from(format!("{without_af}.{n}.af").as_str())
901}
902
903fn stats_path(base: &object_store::path::Path) -> object_store::path::Path {
904    let s = base.as_ref();
905    let without_af = s.strip_suffix(".af").unwrap_or(s);
906    object_store::path::Path::from(format!("{without_af}.stats").as_str())
907}
908
909async fn discover_sidecars_store(
910    store: &dyn ObjectStore,
911    base_path: &object_store::path::Path,
912) -> Result<Vec<(u32, object_store::path::Path)>> {
913    use futures::TryStreamExt;
914    let base_str = base_path.as_ref();
915    let stem_prefix = base_str
916        .strip_suffix(".af")
917        .ok_or_else(|| Error::Storage("path must end with .af".into()))?;
918    let list_prefix = base_str
919        .rfind('/')
920        .map(|pos| object_store::path::Path::from(&base_str[..pos]));
921    let objects: Vec<_> = store
922        .list(list_prefix.as_ref())
923        .try_collect()
924        .await
925        .map_err(|e| Error::Storage(e.to_string()))?;
926    let mut sidecars: Vec<(u32, object_store::path::Path)> = objects
927        .into_iter()
928        .filter_map(|meta| {
929            let s = meta.location.as_ref();
930            let rest = s.strip_prefix(stem_prefix)?.strip_prefix('.')?;
931            let (num_str, ext) = rest.rsplit_once('.')?;
932            if ext != "af" {
933                return None;
934            }
935            let n: u32 = num_str.parse().ok()?;
936            if n == 0 {
937                return None;
938            }
939            Some((n, meta.location))
940        })
941        .collect();
942    sidecars.sort_by_key(|(n, _)| *n);
943    Ok(sidecars)
944}
945
946async fn write_empty_base(storage: &dyn Storage) -> Result<()> {
947    let footer = Footer::new();
948    let bytes = footer.serialize()?;
949    storage.write(Bytes::from(bytes)).await
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955    use crate::codec::NoCompression;
956
957    #[tokio::test]
958    async fn shared_cache_is_reused_across_files() {
959        let shared = Arc::new(DeltaCache::new(1024 * 1024, 0));
960
961        let mut cfg_a = FileConfig::new(NoCompression);
962        cfg_a.cache = Some(Arc::clone(&shared));
963        let file_a = ArrayFile::create_memory(cfg_a).await.unwrap();
964
965        let mut cfg_b = FileConfig::new(NoCompression);
966        cfg_b.cache = Some(Arc::clone(&shared));
967        let file_b = ArrayFile::create_memory(cfg_b).await.unwrap();
968
969        let a = file_a.cache.as_ref().expect("file_a has cache");
970        let b = file_b.cache.as_ref().expect("file_b has cache");
971        assert!(Arc::ptr_eq(a, &shared));
972        assert!(Arc::ptr_eq(b, &shared));
973    }
974}