1use std::{collections::HashMap, sync::Arc};
2
3use array_format::{ArrayElement, ArrayStats, DeltaCache, FillValue};
4use ndarray::{ArcArray, ArrayView, IxDyn};
5use object_store::ObjectStore;
6use parking_lot::{Mutex, RwLock};
7use tracing::{debug, instrument, trace};
8
9use crate::{
10 Error, Result,
11 array::AtlasArray,
12 config::Codec,
13 meta::{DatasetMeta, StoreMeta},
14 schema::{ArraySchema, Attr},
15};
16
17pub(crate) struct ArrayCache {
23 pub(crate) files: RwLock<HashMap<String, Arc<AtlasArray>>>,
24 pub(crate) delta: Arc<DeltaCache>,
25}
26
27impl ArrayCache {
28 pub(crate) fn new(delta: Arc<DeltaCache>) -> Self {
29 Self {
30 files: RwLock::new(HashMap::new()),
31 delta,
32 }
33 }
34
35 pub(crate) fn get_or_insert(
39 &self,
40 store: &Arc<dyn ObjectStore>,
41 array_name: &str,
42 codec: &Codec,
43 ) -> Arc<AtlasArray> {
44 if let Some(arc) = self.files.read().get(array_name) {
45 return arc.clone();
46 }
47 let mut guard = self.files.write();
48 guard
49 .entry(array_name.to_string())
50 .or_insert_with(|| {
51 Arc::new(AtlasArray::new(
52 store.clone(),
53 codec.clone(),
54 array_name.to_string(),
55 self.delta.clone(),
56 ))
57 })
58 .clone()
59 }
60}
61
62pub struct DatasetView {
75 store: Arc<dyn ObjectStore>,
76 pub(crate) cache: Arc<ArrayCache>,
77 name: String,
78 atlas_meta: Arc<Mutex<StoreMeta>>,
82 codec: Codec,
83}
84
85impl DatasetView {
86 pub(crate) fn new(
87 store: Arc<dyn ObjectStore>,
88 cache: Arc<ArrayCache>,
89 name: String,
90 atlas_meta: Arc<Mutex<StoreMeta>>,
91 codec: Codec,
92 ) -> Self {
93 Self {
94 store,
95 cache,
96 name,
97 atlas_meta,
98 codec,
99 }
100 }
101
102 pub fn meta(&self) -> DatasetMeta {
104 self.atlas_meta
105 .lock()
106 .datasets
107 .get(&self.name)
108 .cloned()
109 .unwrap_or_default()
110 }
111
112 pub fn name(&self) -> &str {
114 &self.name
115 }
116
117 pub fn list_arrays(&self) -> Vec<String> {
120 self.atlas_meta
121 .lock()
122 .datasets
123 .get(&self.name)
124 .map(|d| d.arrays.keys().cloned().collect())
125 .unwrap_or_default()
126 }
127
128 pub fn set_attribute(&mut self, key: &str, value: Attr) {
131 let mut meta = self.atlas_meta.lock();
132 meta.datasets
133 .entry(self.name.clone())
134 .or_default()
135 .attributes
136 .insert(key.to_string(), value);
137 }
138
139 pub fn get_attribute(&self, key: &str) -> Option<Attr> {
142 self.atlas_meta
143 .lock()
144 .datasets
145 .get(&self.name)
146 .and_then(|d| d.attributes.get(key).cloned())
147 }
148
149 pub fn array_meta(&self, array: &str) -> Option<ArraySchema> {
152 self.atlas_meta
153 .lock()
154 .datasets
155 .get(&self.name)
156 .and_then(|d| d.arrays.get(array).cloned())
157 }
158
159 pub async fn array_stats(&self, array: &str) -> Option<ArrayStats> {
163 let codec = self.array_codec(array)?;
164 let handle = self.cache.get_or_insert(&self.store, array, &codec);
165 let arc = handle.get().await.ok()?;
166 let guard = arc.read().await;
167 guard.array_stats(&self.name).cloned()
168 }
169
170 #[instrument(skip(self, fill_value), fields(dataset = %self.name, dtype = ?T::DTYPE))]
183 pub async fn define_array<T: ArrayElement>(
184 &mut self,
185 array: &str,
186 dims: Vec<String>,
187 shape: Vec<usize>,
188 chunk_shape: Option<Vec<usize>>,
189 fill_value: Option<FillValue>,
190 ) -> Result<()> {
191 crate::validate_name(array)?;
192 {
193 let meta = self.atlas_meta.lock();
194 if let Some(ds) = meta.datasets.get(&self.name) {
195 if ds.arrays.contains_key(array) {
196 return Err(Error::ArrayAlreadyExists(array.to_string()));
197 }
198 }
199 }
200
201 let handle = self.cache.get_or_insert(&self.store, array, &self.codec);
202 let arc = handle.get().await?;
203 arc.write().await.define_array::<T>(
204 &self.name,
205 dims.clone(),
206 shape.clone(),
207 chunk_shape.clone(),
208 fill_value,
209 )?;
210
211 let actual_chunk = chunk_shape.unwrap_or_else(|| shape.clone());
212 debug!(?shape, chunk_shape = ?actual_chunk, "defined array");
213 let schema = ArraySchema {
214 dtype: T::DTYPE.clone(),
215 shape,
216 chunk_shape: actual_chunk,
217 dimension_names: dims,
218 codec: self.codec.clone(),
219 };
220 let mut meta = self.atlas_meta.lock();
221 meta.datasets
222 .entry(self.name.clone())
223 .or_default()
224 .arrays
225 .insert(array.to_string(), schema);
226 Ok(())
227 }
228
229 #[instrument(skip(self, data), fields(dataset = %self.name, elems = data.len()))]
240 pub async fn write_array<T: ArrayElement>(
241 &mut self,
242 array: &str,
243 start: Vec<usize>,
244 data: ArrayView<'_, T, IxDyn>,
245 ) -> Result<()> {
246 let codec = self
247 .array_codec(array)
248 .ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
249 let handle = self.cache.get_or_insert(&self.store, array, &codec);
250 let arc = handle.get().await?;
251 let mut guard = arc.write().await;
252 let shape: Vec<usize> = data.shape().to_vec();
253 let bytes = data.len() * std::mem::size_of::<T>();
254 let start_log = start.clone();
255 let t0 = std::time::Instant::now();
256 guard.write_array::<T>(&self.name, start, data).await?;
257 debug!(
258 array,
259 start = ?start_log,
260 ?shape,
261 bytes,
262 elapsed_us = t0.elapsed().as_micros() as u64,
263 "wrote chunk"
264 );
265 Ok(())
266 }
267
268 #[instrument(skip(self), fields(dataset = %self.name))]
304 pub async fn read_array<T: ArrayElement>(
305 &self,
306 array: &str,
307 start: Vec<usize>,
308 shape: Vec<usize>,
309 ) -> Result<Option<ArcArray<T, IxDyn>>> {
310 let codec = match self.array_codec(array) {
311 Some(c) => c,
312 None => {
313 debug!("array not present in dataset");
314 return Ok(None);
315 }
316 };
317 trace!(?start, ?shape, "reading array");
318 let handle = self.cache.get_or_insert(&self.store, array, &codec);
319 let arc = handle.get().await?;
320 let guard = arc.read().await;
321 Ok(Some(guard.read_array::<T>(&self.name, start, shape).await?))
322 }
323
324 pub async fn array_fill_value(&self, array: &str) -> Result<Option<FillValue>> {
327 let codec = match self.array_codec(array) {
328 Some(c) => c,
329 None => return Ok(None),
330 };
331 let handle = self.cache.get_or_insert(&self.store, array, &codec);
332 let arc = handle.get().await?;
333 let guard = arc.read().await;
334 Ok(guard.get_array(&self.name)?.fill_value.clone())
335 }
336
337 #[instrument(skip(self), fields(dataset = %self.name))]
342 pub async fn delete_array(&mut self, array: &str) -> Result<()> {
343 let codec = self
344 .array_codec(array)
345 .ok_or_else(|| Error::ArrayNotFound(array.to_string()))?;
346 let handle = self.cache.get_or_insert(&self.store, array, &codec);
347 let arc = handle.get().await?;
348 arc.write().await.delete(&self.name)?;
349 let mut meta = self.atlas_meta.lock();
350 if let Some(ds_meta) = meta.datasets.get_mut(&self.name) {
351 ds_meta.arrays.shift_remove(array);
352 }
353 debug!("deleted array");
354 Ok(())
355 }
356
357 fn array_codec(&self, array: &str) -> Option<Codec> {
360 self.atlas_meta
361 .lock()
362 .datasets
363 .get(&self.name)
364 .and_then(|d| d.arrays.get(array).map(|s| s.codec.clone()))
365 }
366}
367
368pub(crate) async fn open_dataset_view(
369 store: Arc<dyn ObjectStore>,
370 cache: Arc<ArrayCache>,
371 atlas_meta: Arc<Mutex<StoreMeta>>,
372 name: &str,
373 codec: Codec,
374) -> Result<DatasetView> {
375 {
376 let meta = atlas_meta.lock();
377 if !meta.datasets.contains_key(name) {
378 return Err(Error::DatasetNotFound(name.to_string()));
379 }
380 }
381 Ok(DatasetView::new(
382 store,
383 cache,
384 name.to_string(),
385 atlas_meta,
386 codec,
387 ))
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use object_store::memory::InMemory;
394
395 fn make_store() -> Arc<dyn ObjectStore> {
396 Arc::new(InMemory::new())
397 }
398
399 fn shared_meta_with(name: &str) -> Arc<Mutex<StoreMeta>> {
400 let mut meta = StoreMeta::default();
401 meta.datasets
402 .insert(name.to_string(), DatasetMeta::default());
403 Arc::new(Mutex::new(meta))
404 }
405
406 fn test_cache() -> Arc<ArrayCache> {
407 Arc::new(ArrayCache::new(Arc::new(DeltaCache::new(
408 256 * 1024 * 1024,
409 64 * 1024 * 1024,
410 ))))
411 }
412
413 fn empty_view(store: Arc<dyn ObjectStore>, name: &str) -> DatasetView {
414 DatasetView::new(
415 store,
416 test_cache(),
417 name.to_string(),
418 shared_meta_with(name),
419 Codec::default(),
420 )
421 }
422
423 #[test]
426 fn get_attribute_missing_returns_none() {
427 let view = empty_view(make_store(), "ds");
428 assert!(view.get_attribute("x").is_none());
429 }
430
431 #[test]
432 fn set_and_get_attribute_roundtrip() {
433 let mut view = empty_view(make_store(), "ds");
434 view.set_attribute("k", Attr::Int64(42));
435 assert_eq!(view.get_attribute("k"), Some(Attr::Int64(42)));
436 }
437
438 #[test]
439 fn set_attribute_overwrites_previous() {
440 let mut view = empty_view(make_store(), "ds");
441 view.set_attribute("k", Attr::Int64(1));
442 view.set_attribute("k", Attr::Int64(2));
443 assert_eq!(view.get_attribute("k"), Some(Attr::Int64(2)));
444 }
445
446 #[test]
447 fn name_returns_dataset_name() {
448 let view = empty_view(make_store(), "my_dataset");
449 assert_eq!(view.name(), "my_dataset");
450 }
451
452 #[test]
453 fn list_arrays_empty_when_no_arrays_defined() {
454 let view = empty_view(make_store(), "ds");
455 assert!(view.list_arrays().is_empty());
456 }
457
458 #[tokio::test]
461 async fn read_array_returns_none_for_unknown_array() {
462 let view = empty_view(make_store(), "ds");
463 let result = view
464 .read_array::<f32>("missing", vec![], vec![])
465 .await
466 .unwrap();
467 assert!(result.is_none());
468 }
469
470 #[tokio::test]
471 async fn array_meta_returns_none_for_unknown_array() {
472 let view = empty_view(make_store(), "ds");
473 assert!(view.array_meta("missing").is_none());
474 }
475
476 #[tokio::test]
479 async fn define_array_appears_in_list() {
480 let mut view = empty_view(make_store(), "ds");
481 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
482 .await
483 .unwrap();
484 assert_eq!(view.list_arrays(), vec!["arr"]);
485 }
486
487 #[tokio::test]
488 async fn define_duplicate_array_rejected() {
489 let mut view = empty_view(make_store(), "ds");
490 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
491 .await
492 .unwrap();
493 let err = view
494 .define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
495 .await
496 .unwrap_err();
497 assert!(matches!(err, crate::Error::ArrayAlreadyExists(_)));
498 }
499
500 #[tokio::test]
501 async fn define_array_invalid_name_rejected() {
502 let mut view = empty_view(make_store(), "ds");
503 let err = view
504 .define_array::<f32>("a/b", vec!["x".into()], vec![4], None, None)
505 .await
506 .unwrap_err();
507 assert!(matches!(err, crate::Error::InvalidName(_)));
508 }
509
510 #[tokio::test]
513 async fn write_then_read_returns_data() {
514 use ndarray::ArrayD;
515 let mut view = empty_view(make_store(), "ds");
516 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
517 .await
518 .unwrap();
519 let data = ArrayD::<f32>::from_elem(vec![4], 7.0_f32);
520 view.write_array("arr", vec![0], data.view()).await.unwrap();
521 let result = view
522 .read_array::<f32>("arr", vec![], vec![])
523 .await
524 .unwrap()
525 .unwrap();
526 assert_eq!(result, data.into_shared());
527 }
528
529 #[tokio::test]
532 async fn delete_array_removes_from_list() {
533 let mut view = empty_view(make_store(), "ds");
534 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
535 .await
536 .unwrap();
537 view.delete_array("arr").await.unwrap();
538 assert!(view.list_arrays().is_empty());
539 }
540
541 #[tokio::test]
542 async fn delete_nonexistent_array_errors() {
543 let mut view = empty_view(make_store(), "ds");
544 let err = view.delete_array("ghost").await.unwrap_err();
545 assert!(matches!(err, crate::Error::ArrayNotFound(_)));
546 }
547
548 #[tokio::test]
551 async fn define_array_records_meta() {
552 use array_format::DType;
553 let mut view = empty_view(make_store(), "ds");
554 view.define_array::<f32>(
555 "arr",
556 vec!["x".into(), "y".into()],
557 vec![4, 8],
558 Some(vec![2, 2]),
559 None,
560 )
561 .await
562 .unwrap();
563
564 let meta = view.meta();
565 let arr_schema = meta.arrays.get("arr").expect("meta entry missing");
566 assert_eq!(arr_schema.dtype, DType::Float32);
567 assert_eq!(arr_schema.shape, vec![4, 8]);
568 assert_eq!(arr_schema.chunk_shape, vec![2, 2]);
569 assert_eq!(arr_schema.dimension_names, vec!["x", "y"]);
570 assert!(meta.attributes.is_empty());
571 }
572
573 #[tokio::test]
574 async fn define_array_default_chunk_equals_shape() {
575 use array_format::DType;
576 let mut view = empty_view(make_store(), "ds");
577 view.define_array::<i32>("arr", vec!["t".into()], vec![10], None, None)
578 .await
579 .unwrap();
580
581 let meta = view.meta();
582 let arr_schema = meta.arrays.get("arr").unwrap();
583 assert_eq!(arr_schema.dtype, DType::Int32);
584 assert_eq!(arr_schema.chunk_shape, vec![10]);
585 }
586
587 #[test]
588 fn set_attribute_records_value_in_meta() {
589 let mut view = empty_view(make_store(), "ds");
590 view.set_attribute("count", Attr::Int64(5));
591 view.set_attribute("label", Attr::String("x".into()));
592
593 let meta = view.meta();
594 assert_eq!(meta.attributes.get("count"), Some(&Attr::Int64(5)));
595 assert_eq!(
596 meta.attributes.get("label"),
597 Some(&Attr::String("x".into()))
598 );
599 }
600
601 #[tokio::test]
602 async fn delete_array_removes_meta_entry() {
603 let mut view = empty_view(make_store(), "ds");
604 view.define_array::<f64>("arr", vec!["x".into()], vec![4], None, None)
605 .await
606 .unwrap();
607 assert!(view.meta().arrays.contains_key("arr"));
608 view.delete_array("arr").await.unwrap();
609 assert!(!view.meta().arrays.contains_key("arr"));
610 }
611
612 #[tokio::test]
613 async fn array_meta_returns_schema_after_define() {
614 use array_format::DType;
615 let mut view = empty_view(make_store(), "ds");
616 view.define_array::<f64>("arr", vec!["t".into()], vec![5], None, None)
617 .await
618 .unwrap();
619 let meta = view.array_meta("arr").unwrap();
620 assert_eq!(meta.dtype, DType::Float64);
621 assert_eq!(meta.shape, vec![5]);
622 }
623
624 #[tokio::test]
627 async fn array_stats_returns_none_for_unknown_array() {
628 let view = empty_view(make_store(), "ds");
629 assert!(view.array_stats("ghost").await.is_none());
630 }
631
632 #[tokio::test]
633 async fn array_stats_none_before_flush() {
634 let mut view = empty_view(make_store(), "ds");
635 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
636 .await
637 .unwrap();
638 assert!(view.array_stats("arr").await.is_none());
639 }
640
641 async fn flush_initialized(cache: &Arc<ArrayCache>) {
644 let snapshot: Vec<_> = {
645 let guard = cache.files.read();
646 guard
647 .values()
648 .filter_map(|a| a.try_get().map(|arc| (a.clone(), arc)))
649 .collect()
650 };
651 for (_handle, arc) in snapshot {
652 arc.write().await.flush().await.unwrap();
653 }
654 }
655
656 #[tokio::test]
657 async fn array_stats_populated_after_flush() {
658 use array_format::StatValue;
659 let store = make_store();
660 let mut view = empty_view(store.clone(), "ds");
661 view.define_array::<f32>("arr", vec!["x".into()], vec![4], None, None)
662 .await
663 .unwrap();
664 let data = ndarray::arr1(&[1.0_f32, 3.0, 2.0, 4.0]).into_dyn();
665 view.write_array("arr", vec![0], data.view()).await.unwrap();
666 flush_initialized(&view.cache).await;
667
668 let stats = view.array_stats("arr").await.unwrap();
669 assert_eq!(stats.row_count, 4);
670 assert_eq!(stats.null_count, 0);
671 assert_eq!(stats.min, Some(StatValue::Float(1.0)));
672 assert_eq!(stats.max, Some(StatValue::Float(4.0)));
673 }
674
675 #[tokio::test]
676 async fn array_stats_count_fill_value_as_null() {
677 use array_format::{FillValue, StatValue};
678 let store = make_store();
679 let mut view = empty_view(store.clone(), "ds");
680 view.define_array::<i32>(
681 "arr",
682 vec!["x".into()],
683 vec![6],
684 None,
685 Some(FillValue::Int(-1)),
686 )
687 .await
688 .unwrap();
689 let data = ndarray::arr1(&[5_i32, -1, 7, -1, 2, 9]).into_dyn();
691 view.write_array("arr", vec![0], data.view()).await.unwrap();
692 flush_initialized(&view.cache).await;
693
694 let stats = view.array_stats("arr").await.unwrap();
695 assert_eq!(stats.row_count, 6);
696 assert_eq!(
697 stats.null_count, 2,
698 "two fill-equal cells must count as null"
699 );
700 assert_eq!(stats.min, Some(StatValue::Int(2)));
702 assert_eq!(stats.max, Some(StatValue::Int(9)));
703 }
704
705 #[tokio::test]
706 async fn array_stats_without_fill_value_treats_sentinel_as_data() {
707 use array_format::StatValue;
708 let store = make_store();
711 let mut view = empty_view(store.clone(), "ds");
712 view.define_array::<i32>("arr", vec!["x".into()], vec![4], None, None)
713 .await
714 .unwrap();
715 let data = ndarray::arr1(&[5_i32, -1, 7, 9]).into_dyn();
716 view.write_array("arr", vec![0], data.view()).await.unwrap();
717 flush_initialized(&view.cache).await;
718
719 let stats = view.array_stats("arr").await.unwrap();
720 assert_eq!(stats.row_count, 4);
721 assert_eq!(stats.null_count, 0);
722 assert_eq!(stats.min, Some(StatValue::Int(-1)));
723 assert_eq!(stats.max, Some(StatValue::Int(9)));
724 }
725
726 #[tokio::test]
727 async fn array_stats_nan_fill_value_for_float() {
728 use array_format::{FillValue, StatValue};
729 let store = make_store();
730 let mut view = empty_view(store.clone(), "ds");
731 view.define_array::<f64>(
732 "arr",
733 vec!["x".into()],
734 vec![4],
735 None,
736 Some(FillValue::Float(f64::NAN)),
737 )
738 .await
739 .unwrap();
740 let data = ndarray::arr1(&[1.0_f64, f64::NAN, 3.0, f64::NAN]).into_dyn();
742 view.write_array("arr", vec![0], data.view()).await.unwrap();
743 flush_initialized(&view.cache).await;
744
745 let stats = view.array_stats("arr").await.unwrap();
746 assert_eq!(stats.row_count, 4);
747 assert_eq!(stats.null_count, 2);
748 assert_eq!(stats.min, Some(StatValue::Float(1.0)));
749 assert_eq!(stats.max, Some(StatValue::Float(3.0)));
750 }
751
752 #[tokio::test]
755 async fn two_views_share_cached_array_file() {
756 let store = make_store();
757 let cache = test_cache();
758 let shared = Arc::new(Mutex::new({
759 let mut m = StoreMeta::default();
760 m.datasets.insert("ds_a".into(), DatasetMeta::default());
761 m.datasets.insert("ds_b".into(), DatasetMeta::default());
762 m
763 }));
764
765 let mut view_a = DatasetView::new(
766 store.clone(),
767 cache.clone(),
768 "ds_a".to_string(),
769 shared.clone(),
770 Codec::default(),
771 );
772 view_a
773 .define_array::<f32>("arr", vec!["x".into()], vec![2], None, None)
774 .await
775 .unwrap();
776
777 let mut view_b = DatasetView::new(
778 store.clone(),
779 cache.clone(),
780 "ds_b".to_string(),
781 shared.clone(),
782 Codec::default(),
783 );
784 view_b
785 .define_array::<f32>("arr", vec!["x".into()], vec![2], None, None)
786 .await
787 .unwrap();
788
789 let handle_a = view_a.cache.files.read().get("arr").unwrap().clone();
791 let handle_b = view_b.cache.files.read().get("arr").unwrap().clone();
792 assert!(
793 Arc::ptr_eq(&handle_a, &handle_b),
794 "expected both views to share the same AtlasArray handle"
795 );
796 }
797}