Skip to main content

atlas/
dataset.rs

1use std::{collections::HashMap, sync::Arc};
2
3use array_format::{ArrayElement, ArrayStats, DeltaCache, FillValue};
4use ndarray::{ArcArray, ArrayView, IxDyn};
5use object_store::ObjectStore;
6use parking_lot::{Mutex, RwLock};
7use tracing::{debug, instrument, trace};
8
9use crate::{
10    Error, Result,
11    array::AtlasArray,
12    config::Codec,
13    meta::{DatasetMeta, StoreMeta},
14    schema::{ArraySchema, Attr},
15};
16
17/// Shared lazy-handle map: array name → `Arc<AtlasArray>`. Cloned by reference
18/// from `Atlas` into every `DatasetView`, so all views observe the same
19/// initialization state. The map lock (`parking_lot::RwLock`) is never held
20/// across an `await` point; `AtlasArray` defers its actual I/O via
21/// `tokio::sync::OnceCell` so each underlying file opens at most once.
22pub(crate) struct ArrayCache {
23    pub(crate) files: RwLock<HashMap<String, Arc<AtlasArray>>>,
24    pub(crate) delta: Arc<DeltaCache>,
25}
26
27impl ArrayCache {
28    pub(crate) fn new(delta: Arc<DeltaCache>) -> Self {
29        Self {
30            files: RwLock::new(HashMap::new()),
31            delta,
32        }
33    }
34
35    /// Returns the lazy handle for `array_name`, registering a new one if
36    /// absent. Does **not** open or create the underlying file — that happens
37    /// on the first `AtlasArray::get().await`.
38    pub(crate) fn get_or_insert(
39        &self,
40        store: &Arc<dyn ObjectStore>,
41        array_name: &str,
42        codec: &Codec,
43    ) -> Arc<AtlasArray> {
44        if let Some(arc) = self.files.read().get(array_name) {
45            return arc.clone();
46        }
47        let mut guard = self.files.write();
48        guard
49            .entry(array_name.to_string())
50            .or_insert_with(|| {
51                Arc::new(AtlasArray::new(
52                    store.clone(),
53                    codec.clone(),
54                    array_name.to_string(),
55                    self.delta.clone(),
56                ))
57            })
58            .clone()
59    }
60}
61
62/// A borrowed handle to one dataset within an [`Atlas`](crate::Atlas).
63///
64/// Carries no independent state — every mutation (`define_array`,
65/// `write_array`, `set_attribute`, `delete_array`) updates the parent
66/// atlas's shared in-memory metadata. Persistence happens when the
67/// parent [`Atlas::flush`](crate::Atlas::flush) is called; `DatasetView`
68/// has no `flush` of its own.
69///
70/// `DatasetView` is `Send` and can be moved across tasks, but holding
71/// many concurrent views to the same dataset and mutating from each is
72/// not protected — the underlying lock is on the shared metadata, not
73/// per-view.
74pub struct DatasetView {
75    store: Arc<dyn ObjectStore>,
76    pub(crate) cache: Arc<ArrayCache>,
77    name: String,
78    /// Shared handle to the parent `Atlas`'s in-memory `StoreMeta`. All
79    /// mutations on this view go through here; persistence happens on
80    /// `Atlas::flush()`.
81    atlas_meta: Arc<Mutex<StoreMeta>>,
82    codec: Codec,
83}
84
85impl DatasetView {
86    pub(crate) fn new(
87        store: Arc<dyn ObjectStore>,
88        cache: Arc<ArrayCache>,
89        name: String,
90        atlas_meta: Arc<Mutex<StoreMeta>>,
91        codec: Codec,
92    ) -> Self {
93        Self {
94            store,
95            cache,
96            name,
97            atlas_meta,
98            codec,
99        }
100    }
101
102    /// Returns a clone of the metadata for this dataset.
103    pub fn meta(&self) -> DatasetMeta {
104        self.atlas_meta
105            .lock()
106            .datasets
107            .get(&self.name)
108            .cloned()
109            .unwrap_or_default()
110    }
111
112    /// The dataset name this view points to.
113    pub fn name(&self) -> &str {
114        &self.name
115    }
116
117    /// All array names declared in this dataset, in insertion order.
118    /// Reads from the shared in-memory meta — no disk I/O.
119    pub fn list_arrays(&self) -> Vec<String> {
120        self.atlas_meta
121            .lock()
122            .datasets
123            .get(&self.name)
124            .map(|d| d.arrays.keys().cloned().collect())
125            .unwrap_or_default()
126    }
127
128    /// Set or overwrite a typed attribute on this dataset. Buffered in the
129    /// in-memory meta until the parent [`Atlas::flush`](crate::Atlas::flush).
130    pub fn set_attribute(&mut self, key: &str, value: Attr) {
131        let mut meta = self.atlas_meta.lock();
132        meta.datasets
133            .entry(self.name.clone())
134            .or_default()
135            .attributes
136            .insert(key.to_string(), value);
137    }
138
139    /// Look up an attribute by key. `None` if the key isn't present (or the
140    /// dataset has no attributes at all yet).
141    pub fn get_attribute(&self, key: &str) -> Option<Attr> {
142        self.atlas_meta
143            .lock()
144            .datasets
145            .get(&self.name)
146            .and_then(|d| d.attributes.get(key).cloned())
147    }
148
149    /// Returns the cached schema for `array`, or `None` if no array with that
150    /// name exists in this dataset.
151    pub fn array_meta(&self, array: &str) -> Option<ArraySchema> {
152        self.atlas_meta
153            .lock()
154            .datasets
155            .get(&self.name)
156            .and_then(|d| d.arrays.get(array).cloned())
157    }
158
159    /// Returns aggregate statistics for `array` in this dataset, or `None`
160    /// if no such array exists or stats haven't been computed yet (stats are
161    /// computed on flush).
162    pub async fn array_stats(&self, array: &str) -> Option<ArrayStats> {
163        let codec = self.array_codec(array)?;
164        let handle = self.cache.get_or_insert(&self.store, array, &codec);
165        let arc = handle.get().await.ok()?;
166        let guard = arc.read().await;
167        guard.array_stats(&self.name).cloned()
168    }
169
170    /// Declare a new array in this dataset.
171    ///
172    /// `dims` are named dimensions (one per axis); `shape` is the logical
173    /// size per axis. `chunk_shape = None` means one chunk per axis (a
174    /// single block per dataset entry — fastest write for small arrays;
175    /// pessimal for slice reads on large arrays). `fill_value` is the
176    /// scalar returned for unwritten cells; cells equal to it are tallied
177    /// as nulls in `array_stats` after [`Atlas::flush`](crate::Atlas::flush).
178    ///
179    /// Errors with [`Error::ArrayAlreadyExists`] if this dataset already
180    /// declares an array with that name, or [`Error::InvalidName`] if
181    /// `array` violates the naming rules.
182    #[instrument(skip(self, fill_value), fields(dataset = %self.name, dtype = ?T::DTYPE))]
183    pub async fn define_array<T: ArrayElement>(
184        &mut self,
185        array: &str,
186        dims: Vec<String>,
187        shape: Vec<usize>,
188        chunk_shape: Option<Vec<usize>>,
189        fill_value: Option<FillValue>,
190    ) -> Result<()> {
191        crate::validate_name(array)?;
192        {
193            let meta = self.atlas_meta.lock();
194            if let Some(ds) = meta.datasets.get(&self.name) {
195                if ds.arrays.contains_key(array) {
196                    return Err(Error::ArrayAlreadyExists(array.to_string()));
197                }
198            }
199        }
200
201        let handle = self.cache.get_or_insert(&self.store, array, &self.codec);
202        let arc = handle.get().await?;
203        arc.write().await.define_array::<T>(
204            &self.name,
205            dims.clone(),
206            shape.clone(),
207            chunk_shape.clone(),
208            fill_value,
209        )?;
210
211        let actual_chunk = chunk_shape.unwrap_or_else(|| shape.clone());
212        debug!(?shape, chunk_shape = ?actual_chunk, "defined array");
213        let schema = ArraySchema {
214            dtype: T::DTYPE.clone(),
215            shape,
216            chunk_shape: actual_chunk,
217            dimension_names: dims,
218            codec: self.codec.clone(),
219        };
220        let mut meta = self.atlas_meta.lock();
221        meta.datasets
222            .entry(self.name.clone())
223            .or_default()
224            .arrays
225            .insert(array.to_string(), schema);
226        Ok(())
227    }
228
229    /// Write a slab of values into an array previously declared via
230    /// [`define_array`](Self::define_array).
231    ///
232    /// `start` is the per-axis offset to begin writing at; `data`'s shape
233    /// determines the extent. Out-of-bounds writes truncate at the array's
234    /// declared shape. The bytes are buffered in the per-array in-memory
235    /// layer; nothing reaches disk until [`Atlas::flush`](crate::Atlas::flush).
236    ///
237    /// Errors with [`Error::ArrayNotFound`] if no array with this name has
238    /// been declared.
239    #[instrument(skip(self, data), fields(dataset = %self.name, elems = data.len()))]
240    pub async fn write_array<T: ArrayElement>(
241        &mut self,
242        array: &str,
243        start: Vec<usize>,
244        data: ArrayView<'_, T, IxDyn>,
245    ) -> Result<()> {
246        let codec = self
247            .array_codec(array)
248            .ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
249        let handle = self.cache.get_or_insert(&self.store, array, &codec);
250        let arc = handle.get().await?;
251        let mut guard = arc.write().await;
252        let shape: Vec<usize> = data.shape().to_vec();
253        let bytes = data.len() * std::mem::size_of::<T>();
254        let start_log = start.clone();
255        let t0 = std::time::Instant::now();
256        guard.write_array::<T>(&self.name, start, data).await?;
257        debug!(
258            array,
259            start = ?start_log,
260            ?shape,
261            bytes,
262            elapsed_us = t0.elapsed().as_micros() as u64,
263            "wrote chunk"
264        );
265        Ok(())
266    }
267
268    /// Read a full or partial array from this dataset.
269    ///
270    /// Empty `start` + empty `shape` reads the full array. Otherwise both
271    /// must have one entry per dimension; only chunks overlapping the
272    /// requested region are decompressed.
273    ///
274    /// Returns `Ok(None)` if this dataset doesn't declare an array with
275    /// that name.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
281    /// use atlas::{Atlas, StoreConfig};
282    /// use ndarray::Array2;
283    /// let tmp = tempfile::tempdir().unwrap();
284    /// let mut s = Atlas::create_path(tmp.path(), StoreConfig::default()).await.unwrap();
285    /// {
286    ///     let mut ds = s.create_dataset("ds").await.unwrap();
287    ///     ds.define_array::<f32>("temp", vec!["x".into(), "y".into()],
288    ///                            vec![4, 8], None, None).await.unwrap();
289    ///     let data = Array2::<f32>::from_elem([4, 8], 9.0).into_dyn();
290    ///     ds.write_array("temp", vec![0, 0], data.view()).await.unwrap();
291    ///
292    ///     // Full read.
293    ///     let full = ds.read_array::<f32>("temp", vec![], vec![]).await.unwrap().unwrap();
294    ///     assert_eq!(full.shape(), &[4, 8]);
295    ///
296    ///     // Partial read — a 2×4 sub-region.
297    ///     let part = ds.read_array::<f32>("temp", vec![1, 2], vec![2, 4]).await.unwrap().unwrap();
298    ///     assert_eq!(part.shape(), &[2, 4]);
299    /// }
300    /// s.flush().await.unwrap();
301    /// # });
302    /// ```
303    #[instrument(skip(self), fields(dataset = %self.name))]
304    pub async fn read_array<T: ArrayElement>(
305        &self,
306        array: &str,
307        start: Vec<usize>,
308        shape: Vec<usize>,
309    ) -> Result<Option<ArcArray<T, IxDyn>>> {
310        let codec = match self.array_codec(array) {
311            Some(c) => c,
312            None => {
313                debug!("array not present in dataset");
314                return Ok(None);
315            }
316        };
317        trace!(?start, ?shape, "reading array");
318        let handle = self.cache.get_or_insert(&self.store, array, &codec);
319        let arc = handle.get().await?;
320        let guard = arc.read().await;
321        Ok(Some(guard.read_array::<T>(&self.name, start, shape).await?))
322    }
323
324    /// Returns the fill value passed to `define_array` for `array`, or `None`
325    /// if the array isn't present in this dataset or was defined without one.
326    pub async fn array_fill_value(&self, array: &str) -> Result<Option<FillValue>> {
327        let codec = match self.array_codec(array) {
328            Some(c) => c,
329            None => return Ok(None),
330        };
331        let handle = self.cache.get_or_insert(&self.store, array, &codec);
332        let arc = handle.get().await?;
333        let guard = arc.read().await;
334        Ok(guard.get_array(&self.name)?.fill_value.clone())
335    }
336
337    /// Remove an array from this dataset. Tombstones the dataset's entry
338    /// inside the shared array file; persistence happens on the next
339    /// [`Atlas::flush`](crate::Atlas::flush). Errors with
340    /// [`Error::ArrayNotFound`] if no array with that name is declared here.
341    #[instrument(skip(self), fields(dataset = %self.name))]
342    pub async fn delete_array(&mut self, array: &str) -> Result<()> {
343        let codec = self
344            .array_codec(array)
345            .ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
346        let handle = self.cache.get_or_insert(&self.store, array, &codec);
347        let arc = handle.get().await?;
348        arc.write().await.delete(&self.name)?;
349        let mut meta = self.atlas_meta.lock();
350        if let Some(ds_meta) = meta.datasets.get_mut(&self.name) {
351            ds_meta.arrays.shift_remove(array);
352        }
353        debug!("deleted array");
354        Ok(())
355    }
356
357    /// Looks up the per-array codec from `atlas_meta`. Returns `None` if the
358    /// array isn't defined in this dataset.
359    fn array_codec(&self, array: &str) -> Option<Codec> {
360        self.atlas_meta
361            .lock()
362            .datasets
363            .get(&self.name)
364            .and_then(|d| d.arrays.get(array).map(|s| s.codec.clone()))
365    }
366}
367
368pub(crate) async fn open_dataset_view(
369    store: Arc<dyn ObjectStore>,
370    cache: Arc<ArrayCache>,
371    atlas_meta: Arc<Mutex<StoreMeta>>,
372    name: &str,
373    codec: Codec,
374) -> Result<DatasetView> {
375    {
376        let meta = atlas_meta.lock();
377        if !meta.datasets.contains_key(name) {
378            return Err(Error::DatasetNotFound(name.to_string()));
379        }
380    }
381    Ok(DatasetView::new(
382        store,
383        cache,
384        name.to_string(),
385        atlas_meta,
386        codec,
387    ))
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use object_store::memory::InMemory;
394
395    fn make_store() -> Arc<dyn ObjectStore> {
396        Arc::new(InMemory::new())
397    }
398
399    fn shared_meta_with(name: &str) -> Arc<Mutex<StoreMeta>> {
400        let mut meta = StoreMeta::default();
401        meta.datasets
402            .insert(name.to_string(), DatasetMeta::default());
403        Arc::new(Mutex::new(meta))
404    }
405
406    fn test_cache() -> Arc<ArrayCache> {
407        Arc::new(ArrayCache::new(Arc::new(DeltaCache::new(
408            256 * 1024 * 1024,
409            64 * 1024 * 1024,
410        ))))
411    }
412
413    fn empty_view(store: Arc<dyn ObjectStore>, name: &str) -> DatasetView {
414        DatasetView::new(
415            store,
416            test_cache(),
417            name.to_string(),
418            shared_meta_with(name),
419            Codec::default(),
420        )
421    }
422
423    // --- attribute tests (synchronous, no I/O) ---
424
425    #[test]
426    fn get_attribute_missing_returns_none() {
427        let view = empty_view(make_store(), "ds");
428        assert!(view.get_attribute("x").is_none());
429    }
430
431    #[test]
432    fn set_and_get_attribute_roundtrip() {
433        let mut view = empty_view(make_store(), "ds");
434        view.set_attribute("k", Attr::Int64(42));
435        assert_eq!(view.get_attribute("k"), Some(Attr::Int64(42)));
436    }
437
438    #[test]
439    fn set_attribute_overwrites_previous() {
440        let mut view = empty_view(make_store(), "ds");
441        view.set_attribute("k", Attr::Int64(1));
442        view.set_attribute("k", Attr::Int64(2));
443        assert_eq!(view.get_attribute("k"), Some(Attr::Int64(2)));
444    }
445
446    #[test]
447    fn name_returns_dataset_name() {
448        let view = empty_view(make_store(), "my_dataset");
449        assert_eq!(view.name(), "my_dataset");
450    }
451
452    #[test]
453    fn list_arrays_empty_when_no_arrays_defined() {
454        let view = empty_view(make_store(), "ds");
455        assert!(view.list_arrays().is_empty());
456    }
457
458    // --- array lookup without I/O ---
459
460    #[tokio::test]
461    async fn read_array_returns_none_for_unknown_array() {
462        let view = empty_view(make_store(), "ds");
463        let result = view
464            .read_array::<f32>("missing", vec![], vec![])
465            .await
466            .unwrap();
467        assert!(result.is_none());
468    }
469
470    #[tokio::test]
471    async fn array_meta_returns_none_for_unknown_array() {
472        let view = empty_view(make_store(), "ds");
473        assert!(view.array_meta("missing").is_none());
474    }
475
476    // --- define_array behaviour ---
477
478    #[tokio::test]
479    async fn define_array_appears_in_list() {
480        let mut view = empty_view(make_store(), "ds");
481        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
482            .await
483            .unwrap();
484        assert_eq!(view.list_arrays(), vec!["arr"]);
485    }
486
487    #[tokio::test]
488    async fn define_duplicate_array_rejected() {
489        let mut view = empty_view(make_store(), "ds");
490        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
491            .await
492            .unwrap();
493        let err = view
494            .define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
495            .await
496            .unwrap_err();
497        assert!(matches!(err, crate::Error::ArrayAlreadyExists(_)));
498    }
499
500    #[tokio::test]
501    async fn define_array_invalid_name_rejected() {
502        let mut view = empty_view(make_store(), "ds");
503        let err = view
504            .define_array::<f32>("a/b", vec!["x".into()], vec![4], None, None)
505            .await
506            .unwrap_err();
507        assert!(matches!(err, crate::Error::InvalidName(_)));
508    }
509
510    // --- write / read roundtrip ---
511
512    #[tokio::test]
513    async fn write_then_read_returns_data() {
514        use ndarray::ArrayD;
515        let mut view = empty_view(make_store(), "ds");
516        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
517            .await
518            .unwrap();
519        let data = ArrayD::<f32>::from_elem(vec![4], 7.0_f32);
520        view.write_array("arr", vec![0], data.view()).await.unwrap();
521        let result = view
522            .read_array::<f32>("arr", vec![], vec![])
523            .await
524            .unwrap()
525            .unwrap();
526        assert_eq!(result, data.into_shared());
527    }
528
529    // --- delete_array ---
530
531    #[tokio::test]
532    async fn delete_array_removes_from_list() {
533        let mut view = empty_view(make_store(), "ds");
534        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
535            .await
536            .unwrap();
537        view.delete_array("arr").await.unwrap();
538        assert!(view.list_arrays().is_empty());
539    }
540
541    #[tokio::test]
542    async fn delete_nonexistent_array_errors() {
543        let mut view = empty_view(make_store(), "ds");
544        let err = view.delete_array("ghost").await.unwrap_err();
545        assert!(matches!(err, crate::Error::ArrayNotFound(_)));
546    }
547
548    // --- meta ---
549
550    #[tokio::test]
551    async fn define_array_records_meta() {
552        use array_format::DType;
553        let mut view = empty_view(make_store(), "ds");
554        view.define_array::<f32>(
555            "arr",
556            vec!["x".into(), "y".into()],
557            vec![4, 8],
558            Some(vec![2, 2]),
559            None,
560        )
561        .await
562        .unwrap();
563
564        let meta = view.meta();
565        let arr_schema = meta.arrays.get("arr").expect("meta entry missing");
566        assert_eq!(arr_schema.dtype, DType::Float32);
567        assert_eq!(arr_schema.shape, vec![4, 8]);
568        assert_eq!(arr_schema.chunk_shape, vec![2, 2]);
569        assert_eq!(arr_schema.dimension_names, vec!["x", "y"]);
570        assert!(meta.attributes.is_empty());
571    }
572
573    #[tokio::test]
574    async fn define_array_default_chunk_equals_shape() {
575        use array_format::DType;
576        let mut view = empty_view(make_store(), "ds");
577        view.define_array::<i32>("arr", vec!["t".into()], vec![10], None, None)
578            .await
579            .unwrap();
580
581        let meta = view.meta();
582        let arr_schema = meta.arrays.get("arr").unwrap();
583        assert_eq!(arr_schema.dtype, DType::Int32);
584        assert_eq!(arr_schema.chunk_shape, vec![10]);
585    }
586
587    #[test]
588    fn set_attribute_records_value_in_meta() {
589        let mut view = empty_view(make_store(), "ds");
590        view.set_attribute("count", Attr::Int64(5));
591        view.set_attribute("label", Attr::String("x".into()));
592
593        let meta = view.meta();
594        assert_eq!(meta.attributes.get("count"), Some(&Attr::Int64(5)));
595        assert_eq!(
596            meta.attributes.get("label"),
597            Some(&Attr::String("x".into()))
598        );
599    }
600
601    #[tokio::test]
602    async fn delete_array_removes_meta_entry() {
603        let mut view = empty_view(make_store(), "ds");
604        view.define_array::<f64>("arr", vec!["x".into()], vec![4], None, None)
605            .await
606            .unwrap();
607        assert!(view.meta().arrays.contains_key("arr"));
608        view.delete_array("arr").await.unwrap();
609        assert!(!view.meta().arrays.contains_key("arr"));
610    }
611
612    #[tokio::test]
613    async fn array_meta_returns_schema_after_define() {
614        use array_format::DType;
615        let mut view = empty_view(make_store(), "ds");
616        view.define_array::<f64>("arr", vec!["t".into()], vec![5], None, None)
617            .await
618            .unwrap();
619        let meta = view.array_meta("arr").unwrap();
620        assert_eq!(meta.dtype, DType::Float64);
621        assert_eq!(meta.shape, vec![5]);
622    }
623
624    // --- array_stats ---
625
626    #[tokio::test]
627    async fn array_stats_returns_none_for_unknown_array() {
628        let view = empty_view(make_store(), "ds");
629        assert!(view.array_stats("ghost").await.is_none());
630    }
631
632    #[tokio::test]
633    async fn array_stats_none_before_flush() {
634        let mut view = empty_view(make_store(), "ds");
635        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
636            .await
637            .unwrap();
638        assert!(view.array_stats("arr").await.is_none());
639    }
640
641    /// Flush every initialized array file in the shared cache. Used by tests
642    /// that need to persist stats without going through `Atlas::flush`.
643    async fn flush_initialized(cache: &Arc<ArrayCache>) {
644        let snapshot: Vec<_> = {
645            let guard = cache.files.read();
646            guard
647                .values()
648                .filter_map(|a| a.try_get().map(|arc| (a.clone(), arc)))
649                .collect()
650        };
651        for (_handle, arc) in snapshot {
652            arc.write().await.flush().await.unwrap();
653        }
654    }
655
656    #[tokio::test]
657    async fn array_stats_populated_after_flush() {
658        use array_format::StatValue;
659        let store = make_store();
660        let mut view = empty_view(store.clone(), "ds");
661        view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
662            .await
663            .unwrap();
664        let data = ndarray::arr1(&[1.0_f32, 3.0, 2.0, 4.0]).into_dyn();
665        view.write_array("arr", vec![0], data.view()).await.unwrap();
666        flush_initialized(&view.cache).await;
667
668        let stats = view.array_stats("arr").await.unwrap();
669        assert_eq!(stats.row_count, 4);
670        assert_eq!(stats.null_count, 0);
671        assert_eq!(stats.min, Some(StatValue::Float(1.0)));
672        assert_eq!(stats.max, Some(StatValue::Float(4.0)));
673    }
674
675    #[tokio::test]
676    async fn array_stats_count_fill_value_as_null() {
677        use array_format::{FillValue, StatValue};
678        let store = make_store();
679        let mut view = empty_view(store.clone(), "ds");
680        view.define_array::<i32>(
681            "arr",
682            vec!["x".into()],
683            vec![6],
684            None,
685            Some(FillValue::Int(-1)),
686        )
687        .await
688        .unwrap();
689        // Two cells equal the fill (-1); four are real data.
690        let data = ndarray::arr1(&[5_i32, -1, 7, -1, 2, 9]).into_dyn();
691        view.write_array("arr", vec![0], data.view()).await.unwrap();
692        flush_initialized(&view.cache).await;
693
694        let stats = view.array_stats("arr").await.unwrap();
695        assert_eq!(stats.row_count, 6);
696        assert_eq!(
697            stats.null_count, 2,
698            "two fill-equal cells must count as null"
699        );
700        // min/max exclude fill-valued cells.
701        assert_eq!(stats.min, Some(StatValue::Int(2)));
702        assert_eq!(stats.max, Some(StatValue::Int(9)));
703    }
704
705    #[tokio::test]
706    async fn array_stats_without_fill_value_treats_sentinel_as_data() {
707        use array_format::StatValue;
708        // Baseline: same `-1` values but no fill_value declared — they must
709        // not count as null, and must be included in min/max.
710        let store = make_store();
711        let mut view = empty_view(store.clone(), "ds");
712        view.define_array::<i32>("arr", vec!["x".into()], vec![4], None, None)
713            .await
714            .unwrap();
715        let data = ndarray::arr1(&[5_i32, -1, 7, 9]).into_dyn();
716        view.write_array("arr", vec![0], data.view()).await.unwrap();
717        flush_initialized(&view.cache).await;
718
719        let stats = view.array_stats("arr").await.unwrap();
720        assert_eq!(stats.row_count, 4);
721        assert_eq!(stats.null_count, 0);
722        assert_eq!(stats.min, Some(StatValue::Int(-1)));
723        assert_eq!(stats.max, Some(StatValue::Int(9)));
724    }
725
726    #[tokio::test]
727    async fn array_stats_nan_fill_value_for_float() {
728        use array_format::{FillValue, StatValue};
729        let store = make_store();
730        let mut view = empty_view(store.clone(), "ds");
731        view.define_array::<f64>(
732            "arr",
733            vec!["x".into()],
734            vec![4],
735            None,
736            Some(FillValue::Float(f64::NAN)),
737        )
738        .await
739        .unwrap();
740        // NaN cells are matched to the NaN fill (bit-pattern compare in array_format).
741        let data = ndarray::arr1(&[1.0_f64, f64::NAN, 3.0, f64::NAN]).into_dyn();
742        view.write_array("arr", vec![0], data.view()).await.unwrap();
743        flush_initialized(&view.cache).await;
744
745        let stats = view.array_stats("arr").await.unwrap();
746        assert_eq!(stats.row_count, 4);
747        assert_eq!(stats.null_count, 2);
748        assert_eq!(stats.min, Some(StatValue::Float(1.0)));
749        assert_eq!(stats.max, Some(StatValue::Float(3.0)));
750    }
751
752    // --- cache sharing ---
753
754    #[tokio::test]
755    async fn two_views_share_cached_array_file() {
756        let store = make_store();
757        let cache = test_cache();
758        let shared = Arc::new(Mutex::new({
759            let mut m = StoreMeta::default();
760            m.datasets.insert("ds_a".into(), DatasetMeta::default());
761            m.datasets.insert("ds_b".into(), DatasetMeta::default());
762            m
763        }));
764
765        let mut view_a = DatasetView::new(
766            store.clone(),
767            cache.clone(),
768            "ds_a".to_string(),
769            shared.clone(),
770            Codec::default(),
771        );
772        view_a
773            .define_array::<f32>("arr", vec!["x".into()], vec![2], None, None)
774            .await
775            .unwrap();
776
777        let mut view_b = DatasetView::new(
778            store.clone(),
779            cache.clone(),
780            "ds_b".to_string(),
781            shared.clone(),
782            Codec::default(),
783        );
784        view_b
785            .define_array::<f32>("arr", vec!["x".into()], vec![2], None, None)
786            .await
787            .unwrap();
788
789        // Both views share the same lazy handle from the global cache.
790        let handle_a = view_a.cache.files.read().get("arr").unwrap().clone();
791        let handle_b = view_b.cache.files.read().get("arr").unwrap().clone();
792        assert!(
793            Arc::ptr_eq(&handle_a, &handle_b),
794            "expected both views to share the same AtlasArray handle"
795        );
796    }
797}