1use std::sync::Arc;
2
3use array_format::DeltaCache;
4use object_store::{ObjectStore, local::LocalFileSystem, path::Path, prefix::PrefixStore};
5use parking_lot::Mutex;
6use tracing::{debug, info, instrument};
7
8use crate::{
9 Error, Result,
10 config::{Codec, MetaFormat, StoreConfig},
11 dataset::{ArrayCache, DatasetView, open_dataset_view},
12 meta::{StoreMeta, load_meta, save_meta},
13};
14
15pub struct Atlas {
26 store: Arc<dyn ObjectStore>,
27 meta: Arc<Mutex<StoreMeta>>,
28 cache: Arc<ArrayCache>,
29 codec: Codec,
30 meta_format: MetaFormat,
31 meta_compression: Codec,
32}
33
34impl Atlas {
35 #[instrument(skip(store), fields(prefix = %prefix))]
40 pub async fn open(store: Arc<dyn ObjectStore>, prefix: Path) -> Result<Self> {
41 let store = prefixed(store, prefix);
42 let (meta, meta_format, meta_compression) = load_meta(&store).await?;
43 let codec = meta.codec;
44 info!(
45 datasets = meta.datasets.len(),
46 ?codec,
47 ?meta_format,
48 ?meta_compression,
49 "opened atlas store"
50 );
51 Ok(Self {
52 store,
53 meta: Arc::new(Mutex::new(meta)),
54 cache: default_cache(),
55 codec,
56 meta_format,
57 meta_compression,
58 })
59 }
60
61 #[instrument(skip(store, config), fields(prefix = %prefix, codec = ?config.codec, meta_format = ?config.meta_format, meta_compression = ?config.meta_compression))]
63 pub async fn create(store: Arc<dyn ObjectStore>, prefix: Path, config: StoreConfig) -> Result<Self> {
64 let store = prefixed(store, prefix);
65 let meta = StoreMeta { version: 1, codec: config.codec, ..Default::default() };
66 save_meta(&store, &meta, config.meta_format, config.meta_compression).await?;
67 info!("created atlas store");
68 Ok(Self {
69 store,
70 meta: Arc::new(Mutex::new(meta)),
71 cache: default_cache(),
72 codec: config.codec,
73 meta_format: config.meta_format,
74 meta_compression: config.meta_compression,
75 })
76 }
77
78 pub async fn open_path(path: impl AsRef<std::path::Path>) -> Result<Self> {
101 let store = Arc::new(LocalFileSystem::new_with_prefix(path.as_ref())?);
102 Self::open(store, Path::from("")).await
103 }
104
105 pub async fn create_path(path: impl AsRef<std::path::Path>, config: StoreConfig) -> Result<Self> {
119 let path = path.as_ref();
120 std::fs::create_dir_all(path)?;
121 let store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
122 Self::create(store, Path::from(""), config).await
123 }
124
125 #[instrument(skip(self))]
131 pub async fn create_dataset(&mut self, name: &str) -> Result<DatasetView> {
132 crate::validate_name(name)?;
133 {
134 let mut meta = self.meta.lock();
135 if meta.datasets.contains_key(name) {
136 return Err(Error::DatasetAlreadyExists(name.to_string()));
137 }
138 meta.datasets.insert(name.to_string(), Default::default());
139 }
140 debug!("created dataset");
141 Ok(DatasetView::new(
142 self.store.clone(),
143 self.cache.clone(),
144 name.to_string(),
145 self.meta.clone(),
146 self.codec.clone(),
147 ))
148 }
149
150 #[instrument(skip(self))]
154 pub async fn open_dataset(&self, name: &str) -> Result<DatasetView> {
155 open_dataset_view(
156 self.store.clone(),
157 self.cache.clone(),
158 self.meta.clone(),
159 name,
160 self.codec.clone(),
161 )
162 .await
163 }
164
165 #[instrument(skip(self))]
172 pub async fn delete_dataset(&mut self, name: &str) -> Result<()> {
173 let dataset_meta = {
174 let mut meta = self.meta.lock();
175 meta.datasets
176 .shift_remove(name)
177 .ok_or_else(|| Error::DatasetNotFound(name.to_string()))?
178 };
179 debug!(arrays = dataset_meta.arrays.len(), "deleting dataset");
180 for (array_name, schema) in &dataset_meta.arrays {
181 let handle = self
182 .cache
183 .get_or_insert(&self.store, array_name, &schema.codec);
184 let arc = handle.get().await?;
185 let mut guard = arc.write().await;
186 guard.delete(name)?;
187 }
189 Ok(())
190 }
191
192 pub fn list_datasets(&self) -> Vec<String> {
195 let meta = self.meta.lock();
196 meta.datasets.keys().cloned().collect()
197 }
198
199 pub fn dataset_exists(&self, name: &str) -> bool {
202 let meta = self.meta.lock();
203 meta.datasets.contains_key(name)
204 }
205
206 pub fn list_arrays(&self) -> Vec<String> {
210 let meta = self.meta.lock();
211 let mut arrays: Vec<String> = meta
212 .datasets
213 .values()
214 .flat_map(|d| d.arrays.keys().cloned())
215 .collect::<std::collections::HashSet<_>>()
216 .into_iter()
217 .collect();
218 arrays.sort();
219 arrays
220 }
221
222 pub fn array_dtype(&self, array: &str) -> Option<array_format::DType> {
226 let meta = self.meta.lock();
227 meta.datasets
228 .values()
229 .find_map(|d| d.arrays.get(array))
230 .map(|schema| schema.dtype.clone())
231 }
232
233 #[instrument(skip(self, dataset_names), fields(array = %array, n = dataset_names.len()))]
250 pub async fn read_array_across<T: array_format::ArrayElement + Send + Sync + 'static>(
251 &self,
252 array: &str,
253 dataset_names: &[String],
254 start: Vec<usize>,
255 shape: Vec<usize>,
256 ) -> Result<Vec<Option<ndarray::ArcArray<T, ndarray::IxDyn>>>> {
257 let (codec, present): (Codec, Vec<bool>) = {
260 let meta = self.meta.lock();
261 let mut codec: Option<Codec> = None;
262 let mut present: Vec<bool> = Vec::with_capacity(dataset_names.len());
263 for name in dataset_names {
264 let has = meta
265 .datasets
266 .get(name)
267 .and_then(|d| d.arrays.get(array))
268 .map(|schema| {
269 codec.get_or_insert(schema.codec);
270 true
271 })
272 .unwrap_or(false);
273 present.push(has);
274 }
275 let codec = codec.ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
276 (codec, present)
277 };
278
279 let handle = self.cache.get_or_insert(&self.store, array, &codec);
280 let arc = handle.get().await?;
281
282 let concurrency = num_cpus::get().max(1);
288 let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));
289 let mut joinset = tokio::task::JoinSet::new();
290 for (idx, (name, &has)) in dataset_names.iter().zip(present.iter()).enumerate() {
291 if !has {
292 continue;
293 }
294 let permit = Arc::clone(&sem)
295 .acquire_owned()
296 .await
297 .expect("semaphore never closed");
298 let arc = Arc::clone(&arc);
299 let name = name.clone();
300 let start = start.clone();
301 let shape = shape.clone();
302 joinset.spawn(async move {
303 let _permit = permit;
304 let guard = arc.read().await;
305 let res = guard.read_array::<T>(&name, start, shape).await;
306 (idx, res)
307 });
308 }
309
310 let mut out: Vec<Option<ndarray::ArcArray<T, ndarray::IxDyn>>> =
311 (0..dataset_names.len()).map(|_| None).collect();
312 while let Some(join_res) = joinset.join_next().await {
313 let (idx, read_res) = join_res
314 .map_err(|e| Error::ArrayFormat(array_format::Error::Storage(e.to_string())))?;
315 out[idx] = Some(read_res?);
316 }
317 Ok(out)
318 }
319
320 #[instrument(skip(self, dataset_names), fields(array = %array, n = dataset_names.len()))]
333 pub async fn read_array_across_stacked<
334 T: array_format::ArrayElement + Send + Sync + Clone + 'static,
335 >(
336 &self,
337 array: &str,
338 dataset_names: &[String],
339 start: Vec<usize>,
340 shape: Vec<usize>,
341 ) -> Result<ndarray::Array<T, ndarray::IxDyn>> {
342 if dataset_names.is_empty() {
343 return Err(Error::ArrayNotFound(array.to_string()));
344 }
345
346 let codec: Codec = {
348 let meta = self.meta.lock();
349 let mut codec: Option<Codec> = None;
350 for name in dataset_names {
351 let schema = meta
352 .datasets
353 .get(name)
354 .and_then(|d| d.arrays.get(array))
355 .ok_or_else(|| {
356 Error::ArrayNotFound(format!("{array} (in dataset {name})"))
357 })?;
358 codec.get_or_insert(schema.codec);
359 }
360 codec.expect("non-empty dataset_names, all schemas present")
361 };
362
363 let handle = self.cache.get_or_insert(&self.store, array, &codec);
364 let arc_file = handle.get().await?;
365
366 let first_arr = {
370 let guard = arc_file.read().await;
371 guard
372 .read_array::<T>(&dataset_names[0], start.clone(), shape.clone())
373 .await?
374 };
375 let per_dataset_shape: Vec<usize> = first_arr.shape().to_vec();
376 let n = dataset_names.len();
377 let mut out_shape = Vec::with_capacity(per_dataset_shape.len() + 1);
378 out_shape.push(n);
379 out_shape.extend(&per_dataset_shape);
380
381 let per_dataset_elements: usize = per_dataset_shape.iter().product();
386 let total_elements = n * per_dataset_elements;
387 let mut buf: Vec<T> = Vec::with_capacity(total_elements);
388 unsafe { buf.set_len(total_elements) };
392
393 fn write_row<T: array_format::ArrayElement + Clone>(
398 buf: &mut [T],
399 idx: usize,
400 per_row: usize,
401 src: &ndarray::ArcArray<T, ndarray::IxDyn>,
402 ) -> Result<()> {
403 let src_slice = src
404 .as_slice()
405 .ok_or_else(|| Error::ArrayFormat(array_format::Error::Storage(
406 "per-dataset read returned non-contiguous array".into(),
407 )))?;
408 let dst = &mut buf[idx * per_row..(idx + 1) * per_row];
409 dst.clone_from_slice(src_slice);
410 Ok(())
411 }
412
413 write_row(&mut buf, 0, per_dataset_elements, &first_arr)?;
414 drop(first_arr);
415
416 let concurrency = num_cpus::get().max(1);
419 let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));
420 let mut joinset = tokio::task::JoinSet::new();
421 for (idx, name) in dataset_names.iter().enumerate().skip(1) {
422 let permit = Arc::clone(&sem)
423 .acquire_owned()
424 .await
425 .expect("semaphore never closed");
426 let arc = Arc::clone(&arc_file);
427 let name = name.clone();
428 let start = start.clone();
429 let shape = shape.clone();
430 joinset.spawn(async move {
431 let _permit = permit;
432 let guard = arc.read().await;
433 let res = guard.read_array::<T>(&name, start, shape).await;
434 (idx, res)
435 });
436 }
437
438 while let Some(join_res) = joinset.join_next().await {
442 let (idx, read_res) = join_res
443 .map_err(|e| Error::ArrayFormat(array_format::Error::Storage(e.to_string())))?;
444 let arr = read_res?;
445 write_row(&mut buf, idx, per_dataset_elements, &arr)?;
446 }
447
448 ndarray::Array::from_shape_vec(ndarray::IxDyn(&out_shape), buf).map_err(|e| {
449 Error::ArrayFormat(array_format::Error::Storage(format!(
450 "stacked output shape mismatch: {e}"
451 )))
452 })
453 }
454
455 #[instrument(skip(self))]
462 pub async fn flush(&mut self) -> Result<()> {
463 let snapshot = self.force_init_all_known_arrays().await?;
464 let files = snapshot.len();
465 debug!(files, "flushing array files");
466 for arc in snapshot {
467 arc.write().await.flush().await?;
468 }
469 let meta_snapshot = self.meta.lock().clone();
470 let datasets = meta_snapshot.datasets.len();
471 save_meta(&self.store, &meta_snapshot, self.meta_format, self.meta_compression).await?;
472 info!(files, datasets, "flushed atlas store");
473 Ok(())
474 }
475
476 #[instrument(skip(self))]
479 pub async fn compact(&mut self) -> Result<()> {
480 let snapshot = self.force_init_all_known_arrays().await?;
481 let files = snapshot.len();
482 debug!(files, "compacting array files");
483 for arc in snapshot {
484 arc.write().await.compact().await?;
485 }
486 info!(files, "compacted atlas store");
487 Ok(())
488 }
489
490 async fn force_init_all_known_arrays(
494 &self,
495 ) -> Result<Vec<Arc<tokio::sync::RwLock<array_format::ArrayFile>>>> {
496 let specs: Vec<(String, Codec)> = {
497 let meta = self.meta.lock();
498 let mut seen = std::collections::HashSet::new();
499 let mut out = Vec::new();
500 for ds in meta.datasets.values() {
501 for (name, schema) in &ds.arrays {
502 if seen.insert(name.clone()) {
503 out.push((name.clone(), schema.codec.clone()));
504 }
505 }
506 }
507 out
508 };
509 let mut result = Vec::with_capacity(specs.len());
510 for (name, codec) in specs {
511 let handle = self.cache.get_or_insert(&self.store, &name, &codec);
512 result.push(handle.get().await?);
513 }
514 Ok(result)
515 }
516}
517
518fn prefixed(store: Arc<dyn ObjectStore>, prefix: Path) -> Arc<dyn ObjectStore> {
519 if prefix.as_ref().is_empty() {
520 store
521 } else {
522 Arc::new(PrefixStore::new(store, prefix))
523 }
524}
525
526fn default_cache() -> Arc<ArrayCache> {
527 let delta = Arc::new(DeltaCache::new(
528 256 * 1024 * 1024,
529 64 * 1024 * 1024,
530 ));
531 Arc::new(ArrayCache::new(delta))
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use object_store::memory::InMemory;
538
539 fn make_store() -> (Arc<dyn ObjectStore>, Path) {
540 (Arc::new(InMemory::new()), Path::from(""))
541 }
542
543 #[tokio::test]
544 async fn empty_store_lists_nothing() {
545 let (store, prefix) = make_store();
546 let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
547 assert!(s.list_datasets().is_empty());
548 assert!(s.list_arrays().is_empty());
549 }
550
551 #[tokio::test]
552 async fn dataset_exists_false_on_empty_store() {
553 let (store, prefix) = make_store();
554 let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
555 assert!(!s.dataset_exists("any"));
556 }
557
558 #[tokio::test]
559 async fn create_dataset_makes_it_visible() {
560 let (store, prefix) = make_store();
561 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
562 s.create_dataset("ds").await.unwrap();
563 assert!(s.dataset_exists("ds"));
564 assert!(s.list_datasets().contains(&"ds".to_string()));
565 }
566
567 #[tokio::test]
568 async fn duplicate_dataset_name_rejected() {
569 let (store, prefix) = make_store();
570 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
571 s.create_dataset("ds").await.unwrap();
572 let err = s.create_dataset("ds").await.err().unwrap();
573 assert!(matches!(err, crate::Error::DatasetAlreadyExists(_)));
574 }
575
576 #[tokio::test]
577 async fn open_nonexistent_dataset_errors() {
578 let (store, prefix) = make_store();
579 let s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
580 let err = s.open_dataset("ghost").await.err().unwrap();
581 assert!(matches!(err, crate::Error::DatasetNotFound(_)));
582 }
583
584 #[tokio::test]
585 async fn delete_nonexistent_dataset_errors() {
586 let (store, prefix) = make_store();
587 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
588 let err = s.delete_dataset("ghost").await.unwrap_err();
589 assert!(matches!(err, crate::Error::DatasetNotFound(_)));
590 }
591
592 #[tokio::test]
593 async fn delete_dataset_removes_it() {
594 let (store, prefix) = make_store();
595 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
596 s.create_dataset("to_delete").await.unwrap();
597 assert!(s.dataset_exists("to_delete"));
598 s.delete_dataset("to_delete").await.unwrap();
599 assert!(!s.dataset_exists("to_delete"));
600 }
601
602 #[tokio::test]
603 async fn list_datasets_returns_all_created() {
604 let (store, prefix) = make_store();
605 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
606 s.create_dataset("a").await.unwrap();
607 s.create_dataset("b").await.unwrap();
608 s.create_dataset("c").await.unwrap();
609 let mut names = s.list_datasets();
610 names.sort();
611 assert_eq!(names, vec!["a", "b", "c"]);
612 }
613
614 #[tokio::test]
615 async fn invalid_dataset_name_rejected() {
616 let (store, prefix) = make_store();
617 let mut s = Atlas::create(store, prefix, StoreConfig::default()).await.unwrap();
618 assert!(matches!(s.create_dataset("").await, Err(crate::Error::InvalidName(_))));
619 assert!(matches!(s.create_dataset("a/b").await, Err(crate::Error::InvalidName(_))));
620 assert!(matches!(s.create_dataset("_x").await, Err(crate::Error::InvalidName(_))));
621 assert!(matches!(s.create_dataset("..").await, Err(crate::Error::InvalidName(_))));
622 }
623
624 #[tokio::test]
625 async fn list_arrays_deduplicates_shared_names() {
626 let (store, prefix) = make_store();
627 let mut s = Atlas::create(store.clone(), prefix.clone(), StoreConfig::default()).await.unwrap();
628
629 {
630 let mut ds_a = s.create_dataset("a").await.unwrap();
631 ds_a.define_array::<f32>("shared", vec!["x".into()], vec![2], None, None)
632 .await
633 .unwrap();
634 ds_a.define_array::<f32>("only_a", vec!["x".into()], vec![2], None, None)
635 .await
636 .unwrap();
637 }
638
639 {
640 let mut ds_b = s.create_dataset("b").await.unwrap();
641 ds_b.define_array::<f32>("shared", vec!["x".into()], vec![2], None, None)
642 .await
643 .unwrap();
644 }
645
646 s.flush().await.unwrap();
647
648 let s2 = Atlas::open(store, prefix).await.unwrap();
649 let arrays = s2.list_arrays();
650 assert_eq!(arrays, vec!["only_a", "shared"]);
651 }
652
653 #[tokio::test]
654 async fn lz4_codec_roundtrip() {
655 let (store, prefix) = make_store();
656 let config = StoreConfig { codec: Codec::Lz4, ..Default::default() };
657 let mut s = Atlas::create(store.clone(), prefix.clone(), config).await.unwrap();
658
659 {
660 let mut ds = s.create_dataset("ds").await.unwrap();
661 ds.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
662 .await
663 .unwrap();
664 let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0, 4.0]).into_dyn();
665 ds.write_array("arr", vec![0], data.view()).await.unwrap();
666 }
667 s.flush().await.unwrap();
668
669 let s2 = Atlas::open(store, prefix).await.unwrap();
670 let ds2 = s2.open_dataset("ds").await.unwrap();
671 let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
672 let expected = ndarray::arr1(&[1.0_f32, 2.0, 3.0, 4.0]).into_dyn();
673 assert_eq!(result, expected.into_shared());
674 }
675
676 #[tokio::test]
677 async fn uncompressed_codec_roundtrip() {
678 let (store, prefix) = make_store();
679 let config = StoreConfig { codec: Codec::Uncompressed, ..Default::default() };
680 let mut s = Atlas::create(store.clone(), prefix.clone(), config).await.unwrap();
681
682 {
683 let mut ds = s.create_dataset("ds").await.unwrap();
684 ds.define_array::<i32>("arr", vec!["x".into()], vec![3], None, None)
685 .await
686 .unwrap();
687 let data = ndarray::arr1(&[10_i32, 20, 30]).into_dyn();
688 ds.write_array("arr", vec![0], data.view()).await.unwrap();
689 }
690 s.flush().await.unwrap();
691
692 let s2 = Atlas::open(store, prefix).await.unwrap();
693 let ds2 = s2.open_dataset("ds").await.unwrap();
694 let result = ds2.read_array::<i32>("arr", vec![], vec![]).await.unwrap().unwrap();
695 let expected = ndarray::arr1(&[10_i32, 20, 30]).into_dyn();
696 assert_eq!(result, expected.into_shared());
697 }
698
699 #[tokio::test]
700 async fn path_api_roundtrip() {
701 let tmp = tempfile::tempdir().unwrap();
702 let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
703
704 {
705 let mut s = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
706 {
707 let mut ds = s.create_dataset("ds").await.unwrap();
708 ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
709 ds.write_array("arr", vec![0], data.view()).await.unwrap();
710 }
711 s.flush().await.unwrap();
712 }
713
714 let s2 = Atlas::open_path(tmp.path()).await.unwrap();
715 let ds2 = s2.open_dataset("ds").await.unwrap();
716 let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
717 assert_eq!(result, data.into_shared());
718 }
719
720 #[tokio::test]
721 async fn msgpack_meta_format_roundtrip() {
722 let tmp = tempfile::tempdir().unwrap();
723 let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
724
725 {
726 let config = StoreConfig {
727 meta_format: MetaFormat::MsgPack,
728 ..Default::default()
729 };
730 let mut s = Atlas::create_path(tmp.path(), config).await.unwrap();
731 {
732 let mut ds = s.create_dataset("ds").await.unwrap();
733 ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
734 ds.write_array("arr", vec![0], data.view()).await.unwrap();
735 }
736 s.flush().await.unwrap();
737 }
738
739 assert!(tmp.path().join("atlas.msgpack").exists());
741 assert!(!tmp.path().join("atlas.json").exists());
742
743 let s2 = Atlas::open_path(tmp.path()).await.unwrap();
745 let ds2 = s2.open_dataset("ds").await.unwrap();
746 let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
747 assert_eq!(result, data.into_shared());
748 }
749
750 #[tokio::test]
751 async fn compressed_meta_roundtrip_through_atlas() {
752 let tmp = tempfile::tempdir().unwrap();
753 let data = ndarray::arr1(&[1.0_f32, 2.0, 3.0]).into_dyn();
754
755 {
756 let config = StoreConfig {
757 meta_format: MetaFormat::MsgPack,
758 meta_compression: Codec::Zstd,
759 ..Default::default()
760 };
761 let mut s = Atlas::create_path(tmp.path(), config).await.unwrap();
762 {
763 let mut ds = s.create_dataset("ds").await.unwrap();
764 ds.define_array::<f32>("arr", vec!["x".into()], vec![3], None, None).await.unwrap();
765 ds.write_array("arr", vec![0], data.view()).await.unwrap();
766 }
767 s.flush().await.unwrap();
768 }
769
770 assert!(tmp.path().join("atlas.msgpack.zst").exists());
772 assert!(!tmp.path().join("atlas.json").exists());
773 assert!(!tmp.path().join("atlas.msgpack").exists());
774
775 let s2 = Atlas::open_path(tmp.path()).await.unwrap();
776 let ds2 = s2.open_dataset("ds").await.unwrap();
777 let result = ds2.read_array::<f32>("arr", vec![], vec![]).await.unwrap().unwrap();
778 assert_eq!(result, data.into_shared());
779 }
780
781 #[tokio::test]
782 async fn create_path_creates_missing_directory() {
783 let tmp = tempfile::tempdir().unwrap();
784 let nested = tmp.path().join("missing").join("nested");
785 assert!(!nested.exists());
786
787 let _atlas = Atlas::create_path(&nested, StoreConfig::default()).await.unwrap();
788
789 assert!(nested.exists() && nested.is_dir());
790 assert!(nested.join("atlas.json").exists());
791 }
792
793 #[tokio::test]
794 async fn create_path_succeeds_when_directory_exists() {
795 let tmp = tempfile::tempdir().unwrap();
796 let _atlas = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
797 assert!(tmp.path().join("atlas.json").exists());
798 }
799
800 #[tokio::test]
804 async fn reading_one_array_leaves_others_uninitialized() {
805 let (store, prefix) = make_store();
806
807 let mut s = Atlas::create(store.clone(), prefix.clone(), StoreConfig::default())
809 .await
810 .unwrap();
811 for ds_name in ["ds_a", "ds_b"] {
812 let mut ds = s.create_dataset(ds_name).await.unwrap();
813 for arr in ["x", "y", "z"] {
814 ds.define_array::<f32>(arr, vec!["i".into()], vec![2], None, None)
815 .await
816 .unwrap();
817 let data = ndarray::arr1(&[1.0_f32, 2.0]).into_dyn();
818 ds.write_array(arr, vec![0], data.view()).await.unwrap();
819 }
820 }
821 s.flush().await.unwrap();
822 drop(s);
823
824 let s = Atlas::open(store, prefix).await.unwrap();
826 assert!(
827 s.cache.files.read().is_empty(),
828 "cache should start empty after open"
829 );
830
831 let ds_a = s.open_dataset("ds_a").await.unwrap();
833 let ds_b = s.open_dataset("ds_b").await.unwrap();
834 let _ = ds_a.read_array::<f32>("x", vec![], vec![]).await.unwrap();
835 let _ = ds_b.read_array::<f32>("x", vec![], vec![]).await.unwrap();
836
837 let files = s.cache.files.read();
838 assert!(
839 files.get("x").is_some_and(|a| a.try_get().is_some()),
840 "array `x` must be initialized after read"
841 );
842 assert!(
843 files.get("y").map_or(true, |a| a.try_get().is_none()),
844 "array `y` must NOT be initialized — was never read"
845 );
846 assert!(
847 files.get("z").map_or(true, |a| a.try_get().is_none()),
848 "array `z` must NOT be initialized — was never read"
849 );
850 }
851}