async_array_write_read/
async_array_write_read.rs

1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use zarrs::storage::{
5    storage_adapter::usage_log::UsageLogStorageAdapter, AsyncReadableWritableListableStorage,
6};
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9    use futures::StreamExt;
10    use std::sync::Arc;
11    use zarrs::{
12        array::{DataType, ZARR_NAN_F32},
13        array_subset::ArraySubset,
14        node::Node,
15    };
16
17    // Create a store
18    let mut store: AsyncReadableWritableListableStorage = Arc::new(
19        zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
20    );
21    if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1) {
22        if arg1 == "--usage-log" {
23            let log_writer = Arc::new(std::sync::Mutex::new(
24                // std::io::BufWriter::new(
25                std::io::stdout(),
26                //    )
27            ));
28            store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
29                chrono::Utc::now().format("[%T%.3f] ").to_string()
30            }));
31        }
32    }
33
34    // Create the root group
35    zarrs::group::GroupBuilder::new()
36        .build(store.clone(), "/")?
37        .async_store_metadata()
38        .await?;
39
40    // Create a group with attributes
41    let group_path = "/group";
42    let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
43    group
44        .attributes_mut()
45        .insert("foo".into(), serde_json::Value::String("bar".into()));
46    group.async_store_metadata().await?;
47
48    println!(
49        "The group metadata is:\n{}\n",
50        group.metadata().to_string_pretty()
51    );
52
53    // Create an array
54    let array_path = "/group/array";
55    let array = zarrs::array::ArrayBuilder::new(
56        vec![8, 8], // array shape
57        vec![4, 4], // regular chunk shape
58        DataType::Float32,
59        ZARR_NAN_F32,
60    )
61    // .bytes_to_bytes_codecs(vec![]) // uncompressed
62    .dimension_names(["y", "x"].into())
63    // .storage_transformers(vec![].into())
64    .build_arc(store.clone(), array_path)?;
65
66    // Write array metadata to store
67    array.async_store_metadata().await?;
68
69    println!(
70        "The array metadata is:\n{}\n",
71        array.metadata().to_string_pretty()
72    );
73
74    // Write some chunks
75    let store_chunk = |i: u64| {
76        let array = array.clone();
77        async move {
78            let chunk_indices: Vec<u64> = vec![0, i];
79            let chunk_subset = array.chunk_grid().subset(&chunk_indices)?.ok_or_else(|| {
80                zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
81            })?;
82            array
83                .async_store_chunk_elements(
84                    &chunk_indices,
85                    &vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
86                )
87                .await
88        }
89    };
90    futures::stream::iter(0..2)
91        .map(Ok)
92        .try_for_each_concurrent(None, store_chunk)
93        .await?;
94
95    let subset_all = array.subset_all();
96    let data_all = array
97        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
98        .await?;
99    println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
100
101    // Store multiple chunks
102    array
103        .async_store_chunks_elements::<f32>(
104            &ArraySubset::new_with_ranges(&[1..2, 0..2]),
105            &[
106                //
107                1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
108                //
109                1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
110            ],
111        )
112        .await?;
113    let data_all = array
114        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
115        .await?;
116    println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
117
118    // Write a subset spanning multiple chunks, including updating chunks already written
119    array
120        .async_store_array_subset_elements::<f32>(
121            &ArraySubset::new_with_ranges(&[3..6, 3..6]),
122            &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
123        )
124        .await?;
125    let data_all = array
126        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
127        .await?;
128    println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
129
130    // Store array subset
131    array
132        .async_store_array_subset_elements::<f32>(
133            &ArraySubset::new_with_ranges(&[0..8, 6..7]),
134            &[-0.6, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
135        )
136        .await?;
137    let data_all = array
138        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
139        .await?;
140    println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
141
142    // Store chunk subset
143    array
144        .async_store_chunk_subset_elements::<f32>(
145            // chunk indices
146            &[1, 1],
147            // subset within chunk
148            &ArraySubset::new_with_ranges(&[3..4, 0..4]),
149            &[-7.4, -7.5, -7.6, -7.7],
150        )
151        .await?;
152    let data_all = array
153        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
154        .await?;
155    println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
156
157    // Erase a chunk
158    array.async_erase_chunk(&[0, 0]).await?;
159    let data_all = array
160        .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
161        .await?;
162    println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
163
164    // Read a chunk
165    let chunk_indices = vec![0, 1];
166    let data_chunk = array
167        .async_retrieve_chunk_ndarray::<f32>(&chunk_indices)
168        .await?;
169    println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
170
171    // Read chunks
172    let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
173    let data_chunks = array.async_retrieve_chunks_ndarray::<f32>(&chunks).await?;
174    println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
175
176    // Retrieve an array subset
177    let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region
178    let data_subset = array
179        .async_retrieve_array_subset_ndarray::<f32>(&subset)
180        .await?;
181    println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
182
183    // Show the hierarchy
184    let node = Node::async_open(store, "/").await.unwrap();
185    let tree = node.hierarchy_tree();
186    println!("hierarchy_tree:\n{}", tree);
187
188    Ok(())
189}
190
191#[tokio::main]
192async fn main() {
193    if let Err(err) = async_array_write_read().await {
194        println!("{:?}", err);
195    }
196}