mod lazy_subset;
#[cfg(test)]
mod samples;
use std::sync::Arc;
use object_store::local::LocalFileSystem;
use tracing::debug;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::AsyncReadableListableStorage;
use zarrs_object_store::AsyncObjectStore;
use crate::error::Result;
use lazy_subset::{AsyncLazyElement, AsyncLazySubset};
pub(super) struct Features {
#[allow(dead_code)]
storage: AsyncReadableListableStorage,
}
impl Features {
pub(super) fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
debug!("Opening features at {:?}", path.as_ref());
let store = LocalFileSystem::new_with_prefix(path)?;
let storage: AsyncReadableListableStorage = Arc::new(AsyncObjectStore::new(store));
Ok(Self { storage })
}
#[allow(dead_code)]
pub(super) async fn lazy_subset<T: AsyncLazyElement>(
&self,
subset: ArraySubset,
) -> AsyncLazySubset<T> {
AsyncLazySubset::<T>::new(Arc::clone(&self.storage), subset)
}
}
#[cfg(test)]
mod test {
use zarrs::array_subset::ArraySubset;
use super::Features;
use super::samples::{FeaturesTestBuilder, LayerConfig};
#[tokio::test]
async fn lazy_subset_returns_correct_data() {
let (tmp, _storage) = FeaturesTestBuilder::new()
.dimensions(8, 8)
.chunks(4, 4)
.layer(LayerConfig::constant("A", 5.0))
.layer(LayerConfig::sequential("B"))
.build()
.unwrap();
let features = Features::open(tmp.path()).unwrap();
let subset = ArraySubset::new_with_start_shape(vec![0, 0], vec![4, 4]).unwrap();
let lazy = features.lazy_subset::<f32>(subset).await;
let a = lazy.get("A").await.unwrap();
assert_eq!(a.shape(), &[4, 4]);
assert!(a.iter().all(|&v| v == 5.0));
let b = lazy.get("B").await.unwrap();
assert_eq!(b.shape(), &[4, 4]);
assert_eq!(b[[0, 0]], 1.0);
assert_eq!(b[[0, 3]], 4.0);
}
#[tokio::test]
async fn lazy_subset_pads_out_of_bounds() {
let (tmp, _storage) = FeaturesTestBuilder::new()
.dimensions(4, 4)
.chunks(2, 2)
.layer(LayerConfig::ones("A"))
.build()
.unwrap();
let features = Features::open(tmp.path()).unwrap();
let subset = ArraySubset::new_with_start_shape(vec![0, 0], vec![6, 6]).unwrap();
let lazy = features.lazy_subset::<f32>(subset).await;
let data = lazy.get("A").await.unwrap();
assert_eq!(data.shape(), &[6, 6]);
assert_eq!(data[[0, 0]], 1.0);
assert!(data[[5, 5]].is_nan());
}
#[tokio::test(flavor = "multi_thread")]
async fn features_lazy_subset_concurrent_access() {
use super::samples::{FeaturesTestBuilder, LayerConfig};
use std::sync::Arc;
use zarrs::array_subset::ArraySubset;
let (_tmp, _storage) = FeaturesTestBuilder::new()
.dimensions(4, 4)
.chunks(2, 2)
.layer(LayerConfig::constant("A", 5.0))
.layer(LayerConfig::sequential("B"))
.build()
.unwrap();
let features = Features::open(_tmp.path()).unwrap();
let subset = ArraySubset::new_with_start_shape(vec![0, 0], vec![4, 4]).unwrap();
let lazy = Arc::new(features.lazy_subset(subset).await);
let handles: Vec<_> = (0..8)
.map(|i| {
let lazy = Arc::clone(&lazy);
let var = if i % 2 == 0 { "A" } else { "B" };
let var_owned = var.to_string();
tokio::spawn(async move {
let data = lazy.get(&var_owned).await;
(var_owned, data)
})
})
.collect();
let mut a_results = vec![];
let mut b_results = vec![];
for h in handles {
let (var, data) = h.await.expect("task panicked");
let data = data.unwrap();
match var.as_str() {
"A" => a_results.push(data),
"B" => b_results.push(data),
_ => panic!("unexpected variable {var}"),
}
}
assert_eq!(a_results.len(), 4);
assert_eq!(b_results.len(), 4);
for r in &a_results {
assert!(r.iter().all(|&v| v == 5.0));
}
for r in &a_results[1..] {
assert_eq!(r, &a_results[0]);
}
let expected_b: Vec<f32> = (1..=16).map(|x| x as f32).collect();
for r in &b_results {
let flat: Vec<f32> = r.iter().copied().collect();
assert_eq!(flat, expected_b);
}
for r in &b_results[1..] {
assert_eq!(r, &b_results[0]);
}
}
}