Skip to main content

async_array_write_read/
async_array_write_read.rs

1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use ndarray::ArrayD;
5use zarrs::storage::AsyncReadableWritableListableStorage;
6use zarrs::storage::storage_adapter::usage_log::UsageLogStorageAdapter;
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9    use std::sync::Arc;
10
11    use futures::StreamExt;
12    use zarrs::array::{ArraySubset, ZARR_NAN_F32, data_type};
13    use zarrs::node::Node;
14
15    // Create a store
16    let mut store: AsyncReadableWritableListableStorage = Arc::new(
17        zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
18    );
19    if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1)
20        && arg1 == "--usage-log"
21    {
22        let log_writer = Arc::new(std::sync::Mutex::new(
23            // std::io::BufWriter::new(
24            std::io::stdout(),
25            //    )
26        ));
27        store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
28            chrono::Utc::now().format("[%T%.3f] ").to_string()
29        }));
30    }
31
32    // Create the root group
33    zarrs::group::GroupBuilder::new()
34        .build(store.clone(), "/")?
35        .async_store_metadata()
36        .await?;
37
38    // Create a group with attributes
39    let group_path = "/group";
40    let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
41    group
42        .attributes_mut()
43        .insert("foo".into(), serde_json::Value::String("bar".into()));
44    group.async_store_metadata().await?;
45
46    println!(
47        "The group metadata is:\n{}\n",
48        group.metadata().to_string_pretty()
49    );
50
51    // Create an array
52    let array_path = "/group/array";
53    let array = zarrs::array::ArrayBuilder::new(
54        vec![8, 8], // array shape
55        vec![4, 4], // regular chunk shape
56        data_type::float32(),
57        ZARR_NAN_F32,
58    )
59    // .bytes_to_bytes_codecs(vec![]) // uncompressed
60    .dimension_names(["y", "x"].into())
61    // .storage_transformers(vec![].into())
62    .build_arc(store.clone(), array_path)?;
63
64    // Write array metadata to store
65    array.async_store_metadata().await?;
66
67    println!(
68        "The array metadata is:\n{}\n",
69        array.metadata().to_string_pretty()
70    );
71
72    // Write some chunks
73    let store_chunk = |i: u64| {
74        let array = array.clone();
75        async move {
76            let chunk_indices: Vec<u64> = vec![0, i];
77            let chunk_subset = array.chunk_grid().subset(&chunk_indices)?.ok_or_else(|| {
78                zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
79            })?;
80            array
81                .async_store_chunk(
82                    &chunk_indices,
83                    vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
84                )
85                .await
86        }
87    };
88    futures::stream::iter(0..2)
89        .map(Ok)
90        .try_for_each_concurrent(None, store_chunk)
91        .await?;
92
93    let subset_all = array.subset_all();
94    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
95    println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
96
97    // Store multiple chunks
98    array
99        .async_store_chunks(
100            &[1..2, 0..2],
101            &[
102                //
103                1.0f32, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
104                //
105                1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
106            ],
107        )
108        .await?;
109    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
110    println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
111
112    // Write a subset spanning multiple chunks, including updating chunks already written
113    array
114        .async_store_array_subset(
115            &[3..6, 3..6],
116            &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
117        )
118        .await?;
119    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
120    println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
121
122    // Store array subset
123    array
124        .async_store_array_subset(
125            &[0..8, 6..7],
126            &[-0.6f32, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
127        )
128        .await?;
129    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
130    println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
131
132    // Store chunk subset
133    array
134        .async_store_chunk_subset(
135            // chunk indices
136            &[1, 1],
137            // subset within chunk
138            &[3..4, 0..4],
139            &[-7.4f32, -7.5, -7.6, -7.7],
140        )
141        .await?;
142    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
143    println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
144
145    // Erase a chunk
146    array.async_erase_chunk(&[0, 0]).await?;
147    let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
148    println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
149
150    // Read a chunk
151    let chunk_indices = vec![0, 1];
152    let data_chunk: ArrayD<f32> = array.async_retrieve_chunk(&chunk_indices).await?;
153    println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
154
155    // Read chunks
156    let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
157    let data_chunks: ArrayD<f32> = array.async_retrieve_chunks(&chunks).await?;
158    println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
159
160    // Retrieve an array subset
161    let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region
162    let data_subset: ArrayD<f32> = array.async_retrieve_array_subset(&subset).await?;
163    println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
164
165    // Show the hierarchy
166    let node = Node::async_open(store, "/").await.unwrap();
167    let tree = node.hierarchy_tree();
168    println!("hierarchy_tree:\n{}", tree);
169
170    Ok(())
171}
172
173#[tokio::main]
174async fn main() {
175    if let Err(err) = async_array_write_read().await {
176        println!("{:?}", err);
177    }
178}