async_array_write_read/
async_array_write_read.rs

1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use zarrs::storage::{
5    storage_adapter::usage_log::UsageLogStorageAdapter, AsyncReadableWritableListableStorage,
6};
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9    use futures::StreamExt;
10    use std::sync::Arc;
11    use zarrs::{
12        array::{DataType, FillValue, ZARR_NAN_F32},
13        array_subset::ArraySubset,
14        node::Node,
15    };
16
17    // Create a store
18    let mut store: AsyncReadableWritableListableStorage = Arc::new(
19        zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
20    );
21    if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1) {
22        if arg1 == "--usage-log" {
23            let log_writer = Arc::new(std::sync::Mutex::new(
24                // std::io::BufWriter::new(
25                std::io::stdout(),
26                //    )
27            ));
28            store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
29                chrono::Utc::now().format("[%T%.3f] ").to_string()
30            }));
31        }
32    }
33
34    // Create the root group
35    zarrs::group::GroupBuilder::new()
36        .build(store.clone(), "/")?
37        .async_store_metadata()
38        .await?;
39
40    // Create a group with attributes
41    let group_path = "/group";
42    let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
43    group
44        .attributes_mut()
45        .insert("foo".into(), serde_json::Value::String("bar".into()));
46    group.async_store_metadata().await?;
47
48    println!(
49        "The group metadata is:\n{}\n",
50        group.metadata().to_string_pretty()
51    );
52
53    // Create an array
54    let array_path = "/group/array";
55    let array = zarrs::array::ArrayBuilder::new(
56        vec![8, 8], // array shape
57        DataType::Float32,
58        vec![4, 4].try_into()?, // regular chunk shape
59        FillValue::from(ZARR_NAN_F32),
60    )
61    // .bytes_to_bytes_codecs(vec![]) // uncompressed
62    .dimension_names(["y", "x"].into())
63    // .storage_transformers(vec![].into())
64    .build_arc(store.clone(), array_path)?;
65
66    // Write array metadata to store
67    array.async_store_metadata().await?;
68
69    println!(
70        "The array metadata is:\n{}\n",
71        array.metadata().to_string_pretty()
72    );
73
74    // Write some chunks
75    let store_chunk = |i: u64| {
76        let array = array.clone();
77        async move {
78            let chunk_indices: Vec<u64> = vec![0, i];
79            let chunk_subset = array
80                .chunk_grid()
81                .subset(&chunk_indices, array.shape())?
82                .ok_or_else(|| {
83                    zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
84                })?;
85            array
86                .async_store_chunk_elements(
87                    &chunk_indices,
88                    &vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
89                )
90                .await
91        }
92    };
93    futures::stream::iter(0..2)
94        .map(Ok)
95        .try_for_each_concurrent(None, store_chunk)
96        .await?;
97
98    let subset_all = array.subset_all();
99    let data_all = array
100        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
101        .await?;
102    println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
103
104    // Store multiple chunks
105    array
106        .async_store_chunks_elements::<f32>(
107            &ArraySubset::new_with_ranges(&[1..2, 0..2]),
108            &[
109                //
110                1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
111                //
112                1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
113            ],
114        )
115        .await?;
116    let data_all = array
117        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
118        .await?;
119    println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
120
121    // Write a subset spanning multiple chunks, including updating chunks already written
122    array
123        .async_store_array_subset_elements::<f32>(
124            &ArraySubset::new_with_ranges(&[3..6, 3..6]),
125            &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
126        )
127        .await?;
128    let data_all = array
129        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
130        .await?;
131    println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
132
133    // Store array subset
134    array
135        .async_store_array_subset_elements::<f32>(
136            &ArraySubset::new_with_ranges(&[0..8, 6..7]),
137            &[-0.6, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
138        )
139        .await?;
140    let data_all = array
141        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
142        .await?;
143    println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
144
145    // Store chunk subset
146    array
147        .async_store_chunk_subset_elements::<f32>(
148            // chunk indices
149            &[1, 1],
150            // subset within chunk
151            &ArraySubset::new_with_ranges(&[3..4, 0..4]),
152            &[-7.4, -7.5, -7.6, -7.7],
153        )
154        .await?;
155    let data_all = array
156        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
157        .await?;
158    println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
159
160    // Erase a chunk
161    array.async_erase_chunk(&[0, 0]).await?;
162    let data_all = array
163        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
164        .await?;
165    println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
166
167    // Read a chunk
168    let chunk_indices = vec![0, 1];
169    let data_chunk = array
170        .async_retrieve_chunk_ndarray::<f32>(&chunk_indices)
171        .await?;
172    println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
173
174    // Read chunks
175    let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
176    let data_chunks = array.async_retrieve_chunks_ndarray::<f32>(&chunks).await?;
177    println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
178
179    // Retrieve an array subset
180    let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region
181    let data_subset = array
182        .async_retrieve_array_subset_ndarray::<f32>(&subset)
183        .await?;
184    println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
185
186    // Show the hierarchy
187    let node = Node::async_open(store, "/").await.unwrap();
188    let tree = node.hierarchy_tree();
189    println!("hierarchy_tree:\n{}", tree);
190
191    Ok(())
192}
193
194#[tokio::main]
195async fn main() {
196    if let Err(err) = async_array_write_read().await {
197        println!("{:?}", err);
198    }
199}