Skip to main content

atlas/
store.rs

1use std::sync::Arc;
2
3use array_format::DeltaCache;
4use object_store::{ObjectStore, local::LocalFileSystem, path::Path, prefix::PrefixStore};
5use parking_lot::Mutex;
6use tracing::{debug, info, instrument};
7
8use crate::{
9    Error, Result,
10    config::{Codec, MetaFormat, StoreConfig},
11    dataset::{ArrayCache, DatasetView, open_dataset_view},
12    meta::{StoreMeta, load_meta, save_meta},
13};
14
15/// Handle to an opened or newly created atlas store.
16///
17/// Owns the [`object_store`] backend, the in-memory store metadata, a
18/// per-array file cache, and the chosen array / metadata codecs. All
19/// mutations (`create_dataset`, `delete_dataset`, and everything that
20/// flows through a [`DatasetView`]) update in-memory state only —
21/// nothing reaches disk until [`Atlas::flush`].
22///
23/// `Atlas` is `Send + Sync` and safe to share across tasks; each array
24/// file is independently guarded by a `tokio::sync::RwLock`.
25pub struct Atlas {
26    store: Arc<dyn ObjectStore>,
27    meta: Arc<Mutex<StoreMeta>>,
28    cache: Arc<ArrayCache>,
29    codec: Codec,
30    meta_format: MetaFormat,
31    meta_compression: Codec,
32}
33
34impl Atlas {
35    /// Open an existing store at `prefix` within `store`.
36    ///
37    /// Reads `atlas.json` exactly once. Subsequent mutations only touch the
38    /// in-memory meta until [`Atlas::flush`] is called.
39    #[instrument(skip(store), fields(prefix = %prefix))]
40    pub async fn open(store: Arc<dyn ObjectStore>, prefix: Path) -> Result<Self> {
41        let store = prefixed(store, prefix);
42        let (meta, meta_format, meta_compression) = load_meta(&store).await?;
43        let codec = meta.codec;
44        info!(
45            datasets = meta.datasets.len(),
46            ?codec,
47            ?meta_format,
48            ?meta_compression,
49            "opened atlas store"
50        );
51        Ok(Self {
52            store,
53            meta: Arc::new(Mutex::new(meta)),
54            cache: default_cache(),
55            codec,
56            meta_format,
57            meta_compression,
58        })
59    }
60
61    /// Create a new store at `prefix` within `store`.
62    #[instrument(skip(store, config), fields(prefix = %prefix, codec = ?config.codec, meta_format = ?config.meta_format, meta_compression = ?config.meta_compression))]
63    pub async fn create(store: Arc<dyn ObjectStore>, prefix: Path, config: StoreConfig) -> Result<Self> {
64        let store = prefixed(store, prefix);
65        let meta = StoreMeta { version: 1, codec: config.codec, ..Default::default() };
66        save_meta(&store, &meta, config.meta_format, config.meta_compression).await?;
67        info!("created atlas store");
68        Ok(Self {
69            store,
70            meta: Arc::new(Mutex::new(meta)),
71            cache: default_cache(),
72            codec: config.codec,
73            meta_format: config.meta_format,
74            meta_compression: config.meta_compression,
75        })
76    }
77
78    /// Open an existing store at the given local filesystem path.
79    ///
80    /// The metadata format (`atlas.json` / `atlas.msgpack` / `…zst` / `…lz4`)
81    /// and array codec are auto-detected from the on-disk files — no
82    /// [`StoreConfig`] needed on reopen.
83    ///
84    /// # Examples
85    ///
86    /// ```
87    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
88    /// use atlas::{Atlas, StoreConfig};
89    /// let tmp = tempfile::tempdir().unwrap();
90    /// // Create + flush a store so there's something to open.
91    /// {
92    ///     let mut s = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
93    ///     s.create_dataset("ds1").await.unwrap();
94    ///     s.flush().await.unwrap();
95    /// }
96    /// let s = Atlas::open_path(tmp.path()).await.unwrap();
97    /// assert!(s.dataset_exists("ds1"));
98    /// # });
99    /// ```
100    pub async fn open_path(path: impl AsRef<std::path::Path>) -> Result<Self> {
101        let store = Arc::new(LocalFileSystem::new_with_prefix(path.as_ref())?);
102        Self::open(store, Path::from("")).await
103    }
104
105    /// Create a new store at the given local filesystem path. The directory is created
106    /// (recursively, like `mkdir -p`) if it does not already exist.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
112    /// use atlas::{Atlas, StoreConfig};
113    /// let tmp = tempfile::tempdir().unwrap();
114    /// let s = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
115    /// assert!(s.list_datasets().is_empty());
116    /// # });
117    /// ```
118    pub async fn create_path(path: impl AsRef<std::path::Path>, config: StoreConfig) -> Result<Self> {
119        let path = path.as_ref();
120        std::fs::create_dir_all(path)?;
121        let store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
122        Self::create(store, Path::from(""), config).await
123    }
124
125    /// Create a new dataset in this store and return a [`DatasetView`]
126    /// for populating it. Errors with [`Error::DatasetAlreadyExists`] if
127    /// a dataset with this name is already registered, or
128    /// [`Error::InvalidName`] if `name` violates the naming rules
129    /// (non-empty, no `/`, no leading `_`, not `.` or `..`).
130    #[instrument(skip(self))]
131    pub async fn create_dataset(&mut self, name: &str) -> Result<DatasetView> {
132        crate::validate_name(name)?;
133        {
134            let mut meta = self.meta.lock();
135            if meta.datasets.contains_key(name) {
136                return Err(Error::DatasetAlreadyExists(name.to_string()));
137            }
138            meta.datasets.insert(name.to_string(), Default::default());
139        }
140        debug!("created dataset");
141        Ok(DatasetView::new(
142            self.store.clone(),
143            self.cache.clone(),
144            name.to_string(),
145            self.meta.clone(),
146            self.codec.clone(),
147        ))
148    }
149
150    /// Return a [`DatasetView`] for an existing dataset. Errors with
151    /// [`Error::DatasetNotFound`] if no dataset with this name exists.
152    /// Cheap — reads the in-memory metadata, never touches disk.
153    #[instrument(skip(self))]
154    pub async fn open_dataset(&self, name: &str) -> Result<DatasetView> {
155        open_dataset_view(
156            self.store.clone(),
157            self.cache.clone(),
158            self.meta.clone(),
159            name,
160            self.codec.clone(),
161        )
162        .await
163    }
164
165    /// Remove a dataset from this store. Tombstones the dataset's entries
166    /// inside every shared array file but does not flush — call
167    /// [`Atlas::flush`] to persist the deletion, and optionally
168    /// [`Atlas::compact`] afterwards to reclaim the storage.
169    /// Errors with [`Error::DatasetNotFound`] if no dataset with this
170    /// name exists.
171    #[instrument(skip(self))]
172    pub async fn delete_dataset(&mut self, name: &str) -> Result<()> {
173        let dataset_meta = {
174            let mut meta = self.meta.lock();
175            meta.datasets
176                .shift_remove(name)
177                .ok_or_else(|| Error::DatasetNotFound(name.to_string()))?
178        };
179        debug!(arrays = dataset_meta.arrays.len(), "deleting dataset");
180        for (array_name, schema) in &dataset_meta.arrays {
181            let handle = self
182                .cache
183                .get_or_insert(&self.store, array_name, &schema.codec);
184            let arc = handle.get().await?;
185            let mut guard = arc.write().await;
186            guard.delete(name)?;
187            // No flush here; persistence happens on Atlas::flush().
188        }
189        Ok(())
190    }
191
192    /// All dataset names currently registered in this store, in insertion order.
193    /// Reads from the in-memory store metadata — no disk I/O.
194    pub fn list_datasets(&self) -> Vec<String> {
195        let meta = self.meta.lock();
196        meta.datasets.keys().cloned().collect()
197    }
198
199    /// `true` if a dataset with this name is registered. O(1) hash lookup in
200    /// the in-memory store metadata.
201    pub fn dataset_exists(&self, name: &str) -> bool {
202        let meta = self.meta.lock();
203        meta.datasets.contains_key(name)
204    }
205
206    /// Distinct array names across all datasets in this store, sorted.
207    /// One entry per physical `.af` file — datasets sharing an array name
208    /// (the common case) collapse to a single entry here.
209    pub fn list_arrays(&self) -> Vec<String> {
210        let meta = self.meta.lock();
211        let mut arrays: Vec<String> = meta
212            .datasets
213            .values()
214            .flat_map(|d| d.arrays.keys().cloned())
215            .collect::<std::collections::HashSet<_>>()
216            .into_iter()
217            .collect();
218        arrays.sort();
219        arrays
220    }
221
222    /// Returns the dtype of `array` if any dataset in this store declares it.
223    /// Used by `read_array_across`'s Python binding to pick the generic
224    /// instantiation without round-tripping through a `DatasetView`.
225    pub fn array_dtype(&self, array: &str) -> Option<array_format::DType> {
226        let meta = self.meta.lock();
227        meta.datasets
228            .values()
229            .find_map(|d| d.arrays.get(array))
230            .map(|schema| schema.dtype.clone())
231    }
232
233    /// Bulk read the same slice of `array` from many datasets that share its
234    /// physical file. Runs at most `num_cpus` reads concurrently — matching
235    /// what a well-tuned dask threadpool would do — to keep
236    /// `tokio::task::spawn_blocking`'s decompression pool from oversubscribing
237    /// the actual CPU cores.
238    ///
239    /// This exists because `open_as_many_xarray_dataset` over N datasets used to incur N
240    /// separate Python → Rust → tokio::block_on transitions plus Python-side
241    /// dask graph overhead. One call here replaces all of that and gets the
242    /// same parallelism dask was providing — but in pure Rust, with no GIL
243    /// involvement until the results return.
244    ///
245    /// `start` and `shape` follow the same conventions as
246    /// [`DatasetView::read_array`]: empty `start` + empty `shape` mean the
247    /// full array. Per-dataset entries that don't declare `array` are
248    /// returned as `None`.
249    #[instrument(skip(self, dataset_names), fields(array = %array, n = dataset_names.len()))]
250    pub async fn read_array_across<T: array_format::ArrayElement + Send + Sync + 'static>(
251        &self,
252        array: &str,
253        dataset_names: &[String],
254        start: Vec<usize>,
255        shape: Vec<usize>,
256    ) -> Result<Vec<Option<ndarray::ArcArray<T, ndarray::IxDyn>>>> {
257        // Discover the codec for `array` from any dataset that defines it,
258        // and pre-flight which dataset names declare it.
259        let (codec, present): (Codec, Vec<bool>) = {
260            let meta = self.meta.lock();
261            let mut codec: Option<Codec> = None;
262            let mut present: Vec<bool> = Vec::with_capacity(dataset_names.len());
263            for name in dataset_names {
264                let has = meta
265                    .datasets
266                    .get(name)
267                    .and_then(|d| d.arrays.get(array))
268                    .map(|schema| {
269                        codec.get_or_insert(schema.codec);
270                        true
271                    })
272                    .unwrap_or(false);
273                present.push(has);
274            }
275            let codec = codec.ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
276            (codec, present)
277        };
278
279        let handle = self.cache.get_or_insert(&self.store, array, &codec);
280        let arc = handle.get().await?;
281
282        // Spawn each per-dataset read as a top-level tokio task so the
283        // multi-thread runtime distributes them across worker threads.
284        // A semaphore caps in-flight tasks at `concurrency` (≈ num_cpus)
285        // to keep `tokio::task::spawn_blocking`'s decompression pool from
286        // oversubscribing the actual CPU cores.
287        let concurrency = num_cpus::get().max(1);
288        let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));
289        let mut joinset = tokio::task::JoinSet::new();
290        for (idx, (name, &has)) in dataset_names.iter().zip(present.iter()).enumerate() {
291            if !has {
292                continue;
293            }
294            let permit = Arc::clone(&sem)
295                .acquire_owned()
296                .await
297                .expect("semaphore never closed");
298            let arc = Arc::clone(&arc);
299            let name = name.clone();
300            let start = start.clone();
301            let shape = shape.clone();
302            joinset.spawn(async move {
303                let _permit = permit;
304                let guard = arc.read().await;
305                let res = guard.read_array::<T>(&name, start, shape).await;
306                (idx, res)
307            });
308        }
309
310        let mut out: Vec<Option<ndarray::ArcArray<T, ndarray::IxDyn>>> =
311            (0..dataset_names.len()).map(|_| None).collect();
312        while let Some(join_res) = joinset.join_next().await {
313            let (idx, read_res) = join_res
314                .map_err(|e| Error::ArrayFormat(array_format::Error::Storage(e.to_string())))?;
315            out[idx] = Some(read_res?);
316        }
317        Ok(out)
318    }
319
320    /// Like [`Atlas::read_array_across`] but returns one stacked
321    /// `(len(dataset_names), *per_dataset_shape)` `ndarray::Array` instead of
322    /// a `Vec` of per-dataset arrays.
323    ///
324    /// The output buffer is pre-allocated once; each parallel read writes its
325    /// row in as the task completes, overlapping the serial copy with the
326    /// remaining in-flight reads. Saves the ~5.7 GiB of memory copies that
327    /// the Python-side `np.stack` on the per-dataset list would do on a
328    /// 1000-dataset gridded workload.
329    ///
330    /// Errors if any listed dataset doesn't declare `array` — the stacked
331    /// representation has no positional "missing" sentinel.
332    #[instrument(skip(self, dataset_names), fields(array = %array, n = dataset_names.len()))]
333    pub async fn read_array_across_stacked<
334        T: array_format::ArrayElement + Send + Sync + Clone + 'static,
335    >(
336        &self,
337        array: &str,
338        dataset_names: &[String],
339        start: Vec<usize>,
340        shape: Vec<usize>,
341    ) -> Result<ndarray::Array<T, ndarray::IxDyn>> {
342        if dataset_names.is_empty() {
343            return Err(Error::ArrayNotFound(array.to_string()));
344        }
345
346        // Discover the codec and verify ALL listed datasets declare the array.
347        let codec: Codec = {
348            let meta = self.meta.lock();
349            let mut codec: Option<Codec> = None;
350            for name in dataset_names {
351                let schema = meta
352                    .datasets
353                    .get(name)
354                    .and_then(|d| d.arrays.get(array))
355                    .ok_or_else(|| {
356                        Error::ArrayNotFound(format!("{array} (in dataset {name})"))
357                    })?;
358                codec.get_or_insert(schema.codec);
359            }
360            codec.expect("non-empty dataset_names, all schemas present")
361        };
362
363        let handle = self.cache.get_or_insert(&self.store, array, &codec);
364        let arc_file = handle.get().await?;
365
366        // Read the first dataset synchronously to discover the per-dataset
367        // shape (after `start`/`shape` slicing) so we can pre-allocate the
368        // stacked output. Then write its row in.
369        let first_arr = {
370            let guard = arc_file.read().await;
371            guard
372                .read_array::<T>(&dataset_names[0], start.clone(), shape.clone())
373                .await?
374        };
375        let per_dataset_shape: Vec<usize> = first_arr.shape().to_vec();
376        let n = dataset_names.len();
377        let mut out_shape = Vec::with_capacity(per_dataset_shape.len() + 1);
378        out_shape.push(n);
379        out_shape.extend(&per_dataset_shape);
380
381        // Allocate the output as a flat `Vec<T>` of N * per_dataset_elements
382        // entries. We bypass `Array::default` so we don't pay an extra
383        // zero-fill memset of the entire buffer — every slot will be written
384        // by either the first-dataset read above or a spawned task below.
385        let per_dataset_elements: usize = per_dataset_shape.iter().product();
386        let total_elements = n * per_dataset_elements;
387        let mut buf: Vec<T> = Vec::with_capacity(total_elements);
388        // SAFETY: every element is written exactly once before we hand the
389        // Vec to `Array::from_shape_vec`. Until then, the uninitialised
390        // portion is never read.
391        unsafe { buf.set_len(total_elements) };
392
393        // Helper: copy a per-dataset ArcArray into row `idx` of the flat
394        // buffer via `copy_from_slice` (memcpy). Both source and destination
395        // are C-order contiguous (array-format's `assemble_nd` builds via
396        // `Array::from_elem`, our Vec is contiguous by construction).
397        fn write_row<T: array_format::ArrayElement + Clone>(
398            buf: &mut [T],
399            idx: usize,
400            per_row: usize,
401            src: &ndarray::ArcArray<T, ndarray::IxDyn>,
402        ) -> Result<()> {
403            let src_slice = src
404                .as_slice()
405                .ok_or_else(|| Error::ArrayFormat(array_format::Error::Storage(
406                    "per-dataset read returned non-contiguous array".into(),
407                )))?;
408            let dst = &mut buf[idx * per_row..(idx + 1) * per_row];
409            dst.clone_from_slice(src_slice);
410            Ok(())
411        }
412
413        write_row(&mut buf, 0, per_dataset_elements, &first_arr)?;
414        drop(first_arr);
415
416        // Spawn the remaining N-1 reads with the same concurrency-capped
417        // pattern as `read_array_across`.
418        let concurrency = num_cpus::get().max(1);
419        let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));
420        let mut joinset = tokio::task::JoinSet::new();
421        for (idx, name) in dataset_names.iter().enumerate().skip(1) {
422            let permit = Arc::clone(&sem)
423                .acquire_owned()
424                .await
425                .expect("semaphore never closed");
426            let arc = Arc::clone(&arc_file);
427            let name = name.clone();
428            let start = start.clone();
429            let shape = shape.clone();
430            joinset.spawn(async move {
431                let _permit = permit;
432                let guard = arc.read().await;
433                let res = guard.read_array::<T>(&name, start, shape).await;
434                (idx, res)
435            });
436        }
437
438        // As tasks complete, memcpy their row into the pre-allocated buffer.
439        // The serial memcpy overlaps with the remaining in-flight parallel
440        // reads happening on the runtime's other workers.
441        while let Some(join_res) = joinset.join_next().await {
442            let (idx, read_res) = join_res
443                .map_err(|e| Error::ArrayFormat(array_format::Error::Storage(e.to_string())))?;
444            let arr = read_res?;
445            write_row(&mut buf, idx, per_dataset_elements, &arr)?;
446        }
447
448        ndarray::Array::from_shape_vec(ndarray::IxDyn(&out_shape), buf).map_err(|e| {
449            Error::ArrayFormat(array_format::Error::Storage(format!(
450                "stacked output shape mismatch: {e}"
451            )))
452        })
453    }
454
455    /// Flush every known array file's pending writes AND persist the in-memory
456    /// `atlas.json`. This is the single durability boundary for the store.
457    ///
458    /// Force-initializes every array referenced in meta, even ones never
459    /// touched by a `DatasetView` (lazy-init wins are on the read path, not
460    /// on flush).
461    #[instrument(skip(self))]
462    pub async fn flush(&mut self) -> Result<()> {
463        let snapshot = self.force_init_all_known_arrays().await?;
464        let files = snapshot.len();
465        debug!(files, "flushing array files");
466        for arc in snapshot {
467            arc.write().await.flush().await?;
468        }
469        let meta_snapshot = self.meta.lock().clone();
470        let datasets = meta_snapshot.datasets.len();
471        save_meta(&self.store, &meta_snapshot, self.meta_format, self.meta_compression).await?;
472        info!(files, datasets, "flushed atlas store");
473        Ok(())
474    }
475
476    /// Compact every known array file in place (reclaims tombstoned space).
477    /// Force-initializes every array referenced in meta.
478    #[instrument(skip(self))]
479    pub async fn compact(&mut self) -> Result<()> {
480        let snapshot = self.force_init_all_known_arrays().await?;
481        let files = snapshot.len();
482        debug!(files, "compacting array files");
483        for arc in snapshot {
484            arc.write().await.compact().await?;
485        }
486        info!(files, "compacted atlas store");
487        Ok(())
488    }
489
490    /// Ensures every array referenced by any dataset in meta has an
491    /// initialized `ArrayFile` in the cache, and returns the inner locks
492    /// (deduped by array name).
493    async fn force_init_all_known_arrays(
494        &self,
495    ) -> Result<Vec<Arc<tokio::sync::RwLock<array_format::ArrayFile>>>> {
496        let specs: Vec<(String, Codec)> = {
497            let meta = self.meta.lock();
498            let mut seen = std::collections::HashSet::new();
499            let mut out = Vec::new();
500            for ds in meta.datasets.values() {
501                for (name, schema) in &ds.arrays {
502                    if seen.insert(name.clone()) {
503                        out.push((name.clone(), schema.codec.clone()));
504                    }
505                }
506            }
507            out
508        };
509        let mut result = Vec::with_capacity(specs.len());
510        for (name, codec) in specs {
511            let handle = self.cache.get_or_insert(&self.store, &name, &codec);
512            result.push(handle.get().await?);
513        }
514        Ok(result)
515    }
516}
517
518fn prefixed(store: Arc<dyn ObjectStore>, prefix: Path) -> Arc<dyn ObjectStore> {
519    if prefix.as_ref().is_empty() {
520        store
521    } else {
522        Arc::new(PrefixStore::new(store, prefix))
523    }
524}
525
526fn default_cache() -> Arc<ArrayCache> {
527    let delta = Arc::new(DeltaCache::new(
528        256 * 1024 * 1024,
529        64 * 1024 * 1024,
530    ));
531    Arc::new(ArrayCache::new(delta))
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use object_store::memory::InMemory;
538
539    fn make_store() -> (Arc<dyn ObjectStore>, Path) {
540        (Arc::new(InMemory::new()), Path::from(""))
541    }
542
543    #[tokio::test]
544    async fn empty_store_lists_nothing() {
545        let (store, prefix) = make_store();
546        let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
547        assert!(s.list_datasets().is_empty());
548        assert!(s.list_arrays().is_empty());
549    }
550
551    #[tokio::test]
552    async fn dataset_exists_false_on_empty_store() {
553        let (store, prefix) = make_store();
554        let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
555        assert!(!s.dataset_exists("any"));
556    }
557
558    #[tokio::test]
559    async fn create_dataset_makes_it_visible() {
560        let (store, prefix) = make_store();
561        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
562        s.create_dataset("ds").await.unwrap();
563        assert!(s.dataset_exists("ds"));
564        assert!(s.list_datasets().contains(&"ds".to_string()));
565    }
566
567    #[tokio::test]
568    async fn duplicate_dataset_name_rejected() {
569        let (store, prefix) = make_store();
570        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
571        s.create_dataset("ds").await.unwrap();
572        let err = s.create_dataset("ds").await.err().unwrap();
573        assert!(matches!(err, crate::Error::DatasetAlreadyExists(_)));
574    }
575
576    #[tokio::test]
577    async fn open_nonexistent_dataset_errors() {
578        let (store, prefix) = make_store();
579        let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
580        let err = s.open_dataset("ghost").await.err().unwrap();
581        assert!(matches!(err, crate::Error::DatasetNotFound(_)));
582    }
583
584    #[tokio::test]
585    async fn delete_nonexistent_dataset_errors() {
586        let (store, prefix) = make_store();
587        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
588        let err = s.delete_dataset("ghost").await.unwrap_err();
589        assert!(matches!(err, crate::Error::DatasetNotFound(_)));
590    }
591
592    #[tokio::test]
593    async fn delete_dataset_removes_it() {
594        let (store, prefix) = make_store();
595        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
596        s.create_dataset("to_delete").await.unwrap();
597        assert!(s.dataset_exists("to_delete"));
598        s.delete_dataset("to_delete").await.unwrap();
599        assert!(!s.dataset_exists("to_delete"));
600    }
601
602    #[tokio::test]
603    async fn list_datasets_returns_all_created() {
604        let (store, prefix) = make_store();
605        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
606        s.create_dataset("a").await.unwrap();
607        s.create_dataset("b").await.unwrap();
608        s.create_dataset("c").await.unwrap();
609        let mut names = s.list_datasets();
610        names.sort();
611        assert_eq!(names, vec!["a", "b", "c"]);
612    }
613
614    #[tokio::test]
615    async fn invalid_dataset_name_rejected() {
616        let (store, prefix) = make_store();
617        let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
618        assert!(matches!(s.create_dataset("").await, Err(crate::Error::InvalidName(_))));
619        assert!(matches!(s.create_dataset("a/b").await, Err(crate::Error::InvalidName(_))));
620        assert!(matches!(s.create_dataset("_x").await, Err(crate::Error::InvalidName(_))));
621        assert!(matches!(s.create_dataset("..").await, Err(crate::Error::InvalidName(_))));
622    }
623
624    #[tokio::test]
625    async fn list_arrays_deduplicates_shared_names() {
626        let (store, prefix) = make_store();
627        let mut s = Atlas::create(store.clone(), prefix.clone(), StoreConfig::default()).await.unwrap();
628
629        {
630            let mut ds_a = s.create_dataset("a").await.unwrap();
631            ds_a.define_array::<f32>("shared", vec!["x".into()], vec![2], None, None)
632                .await
633                .unwrap();
634            ds_a.define_array::<f32>("only_a", vec!["x".into()], vec![2], None, None)
635                .await
636                .unwrap();
637        }
638
639        {
640            let mut ds_b = s.create_dataset("b").await.unwrap();
641            ds_b.define_array::<f32>("shared", vec!["x".into()], vec![2], None, None)
642                .await
643                .unwrap();
644        }
645
646        s.flush().await.unwrap();
647
648        let s2 = Atlas::open(store, prefix).await.unwrap();
649        let arrays = s2.list_arrays();
650        assert_eq!(arrays, vec!["only_a", "shared"]);
651    }
652
653    #[tokio::test]
654    async fn lz4_codec_roundtrip() {
655        let (store, prefix) = make_store();
656        let config = StoreConfig { codec: Codec::Lz4, ..Default::default() };
657        let mut s = Atlas::create(store.clone(), prefix.clone(), config).await.unwrap();
658
659        {
660            let mut ds = s.create_dataset("ds").await.unwrap();
661            ds.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
662                .await
663                .unwrap();
664            let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0, 4.0]).into_dyn();
665            ds.write_array("arr", vec![0], data.view()).await.unwrap();
666        }
667        s.flush().await.unwrap();
668
669        let s2 = Atlas::open(store, prefix).await.unwrap();
670        let ds2 = s2.open_dataset("ds").await.unwrap();
671        let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
672        let expected = ndarray::arr1(&[1.0_f32, 2.0, 3.0, 4.0]).into_dyn();
673        assert_eq!(result, expected.into_shared());
674    }
675
676    #[tokio::test]
677    async fn uncompressed_codec_roundtrip() {
678        let (store, prefix) = make_store();
679        let config = StoreConfig { codec: Codec::Uncompressed, ..Default::default() };
680        let mut s = Atlas::create(store.clone(), prefix.clone(), config).await.unwrap();
681
682        {
683            let mut ds = s.create_dataset("ds").await.unwrap();
684            ds.define_array::<i32>("arr", vec!["x".into()], vec![3], None, None)
685                .await
686                .unwrap();
687            let data = ndarray::arr1(&[10_i32, 20, 30]).into_dyn();
688            ds.write_array("arr", vec![0], data.view()).await.unwrap();
689        }
690        s.flush().await.unwrap();
691
692        let s2 = Atlas::open(store, prefix).await.unwrap();
693        let ds2 = s2.open_dataset("ds").await.unwrap();
694        let result = ds2.read_array::<i32>("arr", vec![], vec![]).await.unwrap().unwrap();
695        let expected = ndarray::arr1(&[10_i32, 20, 30]).into_dyn();
696        assert_eq!(result, expected.into_shared());
697    }
698
699    #[tokio::test]
700    async fn path_api_roundtrip() {
701        let tmp = tempfile::tempdir().unwrap();
702        let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
703
704        {
705            let mut s = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
706            {
707                let mut ds = s.create_dataset("ds").await.unwrap();
708                ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
709                ds.write_array("arr", vec![0], data.view()).await.unwrap();
710            }
711            s.flush().await.unwrap();
712        }
713
714        let s2 = Atlas::open_path(tmp.path()).await.unwrap();
715        let ds2 = s2.open_dataset("ds").await.unwrap();
716        let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
717        assert_eq!(result, data.into_shared());
718    }
719
720    #[tokio::test]
721    async fn msgpack_meta_format_roundtrip() {
722        let tmp = tempfile::tempdir().unwrap();
723        let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
724
725        {
726            let config = StoreConfig {
727                meta_format: MetaFormat::MsgPack,
728                ..Default::default()
729            };
730            let mut s = Atlas::create_path(tmp.path(), config).await.unwrap();
731            {
732                let mut ds = s.create_dataset("ds").await.unwrap();
733                ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
734                ds.write_array("arr", vec![0], data.view()).await.unwrap();
735            }
736            s.flush().await.unwrap();
737        }
738
739        // On-disk file is atlas.msgpack, not atlas.json.
740        assert!(tmp.path().join("atlas.msgpack").exists());
741        assert!(!tmp.path().join("atlas.json").exists());
742
743        // Open auto-detects format and reads data back.
744        let s2 = Atlas::open_path(tmp.path()).await.unwrap();
745        let ds2 = s2.open_dataset("ds").await.unwrap();
746        let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
747        assert_eq!(result, data.into_shared());
748    }
749
750    #[tokio::test]
751    async fn compressed_meta_roundtrip_through_atlas() {
752        let tmp = tempfile::tempdir().unwrap();
753        let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
754
755        {
756            let config = StoreConfig {
757                meta_format: MetaFormat::MsgPack,
758                meta_compression: Codec::Zstd,
759                ..Default::default()
760            };
761            let mut s = Atlas::create_path(tmp.path(), config).await.unwrap();
762            {
763                let mut ds = s.create_dataset("ds").await.unwrap();
764                ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
765                ds.write_array("arr", vec![0], data.view()).await.unwrap();
766            }
767            s.flush().await.unwrap();
768        }
769
770        // On-disk file is the zstd-compressed msgpack variant.
771        assert!(tmp.path().join("atlas.msgpack.zst").exists());
772        assert!(!tmp.path().join("atlas.json").exists());
773        assert!(!tmp.path().join("atlas.msgpack").exists());
774
775        let s2 = Atlas::open_path(tmp.path()).await.unwrap();
776        let ds2 = s2.open_dataset("ds").await.unwrap();
777        let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
778        assert_eq!(result, data.into_shared());
779    }
780
781    #[tokio::test]
782    async fn create_path_creates_missing_directory() {
783        let tmp = tempfile::tempdir().unwrap();
784        let nested = tmp.path().join("missing").join("nested");
785        assert!(!nested.exists());
786
787        let _atlas = Atlas::create_path(&nested, StoreConfig::default()).await.unwrap();
788
789        assert!(nested.exists() && nested.is_dir());
790        assert!(nested.join("atlas.json").exists());
791    }
792
793    #[tokio::test]
794    async fn create_path_succeeds_when_directory_exists() {
795        let tmp = tempfile::tempdir().unwrap();
796        let _atlas = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
797        assert!(tmp.path().join("atlas.json").exists());
798    }
799
800    /// Reading array `x` from many datasets must not open files for arrays
801    /// `y` and `z` that those datasets also reference. This is the load-bearing
802    /// regression test for lazy initialization.
803    #[tokio::test]
804    async fn reading_one_array_leaves_others_uninitialized() {
805        let (store, prefix) = make_store();
806
807        // Seed: two datasets, each defining arrays x, y, z.
808        let mut s = Atlas::create(store.clone(), prefix.clone(), StoreConfig::default())
809            .await
810            .unwrap();
811        for ds_name in ["ds_a", "ds_b"] {
812            let mut ds = s.create_dataset(ds_name).await.unwrap();
813            for arr in ["x", "y", "z"] {
814                ds.define_array::<f32>(arr, vec!["i".into()], vec![2], None, None)
815                    .await
816                    .unwrap();
817                let data = ndarray::arr1(&[1.0_f32, 2.0]).into_dyn();
818                ds.write_array(arr, vec![0], data.view()).await.unwrap();
819            }
820        }
821        s.flush().await.unwrap();
822        drop(s);
823
824        // Reopen — fresh cache, nothing initialized.
825        let s = Atlas::open(store, prefix).await.unwrap();
826        assert!(
827            s.cache.files.read().is_empty(),
828            "cache should start empty after open"
829        );
830
831        // Read only `x` from both datasets.
832        let ds_a = s.open_dataset("ds_a").await.unwrap();
833        let ds_b = s.open_dataset("ds_b").await.unwrap();
834        let _ = ds_a.read_array::<f32>("x", vec![], vec![]).await.unwrap();
835        let _ = ds_b.read_array::<f32>("x", vec![], vec![]).await.unwrap();
836
837        let files = s.cache.files.read();
838        assert!(
839            files.get("x").is_some_and(|a| a.try_get().is_some()),
840            "array `x` must be initialized after read"
841        );
842        assert!(
843            files.get("y").map_or(true, |a| a.try_get().is_none()),
844            "array `y` must NOT be initialized — was never read"
845        );
846        assert!(
847            files.get("z").map_or(true, |a| a.try_get().is_none()),
848            "array `z` must NOT be initialized — was never read"
849        );
850    }
851}