1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use ndarray::ArrayD;
5use zarrs::storage::AsyncReadableWritableListableStorage;
6use zarrs::storage::storage_adapter::usage_log::UsageLogStorageAdapter;
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9 use std::sync::Arc;
10
11 use futures::StreamExt;
12 use zarrs::array::{ArraySubset, ZARR_NAN_F32, data_type};
13 use zarrs::node::Node;
14
15 let mut store: AsyncReadableWritableListableStorage = Arc::new(
17 zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
18 );
19 if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1)
20 && arg1 == "--usage-log"
21 {
22 let log_writer = Arc::new(std::sync::Mutex::new(
23 std::io::stdout(),
25 ));
27 store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
28 chrono::Utc::now().format("[%T%.3f] ").to_string()
29 }));
30 }
31
32 zarrs::group::GroupBuilder::new()
34 .build(store.clone(), "/")?
35 .async_store_metadata()
36 .await?;
37
38 let group_path = "/group";
40 let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
41 group
42 .attributes_mut()
43 .insert("foo".into(), serde_json::Value::String("bar".into()));
44 group.async_store_metadata().await?;
45
46 println!(
47 "The group metadata is:\n{}\n",
48 group.metadata().to_string_pretty()
49 );
50
51 let array_path = "/group/array";
53 let array = zarrs::array::ArrayBuilder::new(
54 vec![8, 8], vec![4, 4], data_type::float32(),
57 ZARR_NAN_F32,
58 )
59 .dimension_names(["y", "x"].into())
61 .build_arc(store.clone(), array_path)?;
63
64 array.async_store_metadata().await?;
66
67 println!(
68 "The array metadata is:\n{}\n",
69 array.metadata().to_string_pretty()
70 );
71
72 let store_chunk = |i: u64| {
74 let array = array.clone();
75 async move {
76 let chunk_indices: Vec<u64> = vec![0, i];
77 let chunk_subset = array.chunk_grid().subset(&chunk_indices)?.ok_or_else(|| {
78 zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
79 })?;
80 array
81 .async_store_chunk(
82 &chunk_indices,
83 vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
84 )
85 .await
86 }
87 };
88 futures::stream::iter(0..2)
89 .map(Ok)
90 .try_for_each_concurrent(None, store_chunk)
91 .await?;
92
93 let subset_all = array.subset_all();
94 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
95 println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
96
97 array
99 .async_store_chunks(
100 &[1..2, 0..2],
101 &[
102 1.0f32, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
104 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
106 ],
107 )
108 .await?;
109 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
110 println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
111
112 array
114 .async_store_array_subset(
115 &[3..6, 3..6],
116 &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
117 )
118 .await?;
119 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
120 println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
121
122 array
124 .async_store_array_subset(
125 &[0..8, 6..7],
126 &[-0.6f32, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
127 )
128 .await?;
129 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
130 println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
131
132 array
134 .async_store_chunk_subset(
135 &[1, 1],
137 &[3..4, 0..4],
139 &[-7.4f32, -7.5, -7.6, -7.7],
140 )
141 .await?;
142 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
143 println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
144
145 array.async_erase_chunk(&[0, 0]).await?;
147 let data_all: ArrayD<f32> = array.async_retrieve_array_subset(&subset_all).await?;
148 println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
149
150 let chunk_indices = vec![0, 1];
152 let data_chunk: ArrayD<f32> = array.async_retrieve_chunk(&chunk_indices).await?;
153 println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
154
155 let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
157 let data_chunks: ArrayD<f32> = array.async_retrieve_chunks(&chunks).await?;
158 println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
159
160 let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); let data_subset: ArrayD<f32> = array.async_retrieve_array_subset(&subset).await?;
163 println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
164
165 let node = Node::async_open(store, "/").await.unwrap();
167 let tree = node.hierarchy_tree();
168 println!("hierarchy_tree:\n{}", tree);
169
170 Ok(())
171}
172
173#[tokio::main]
174async fn main() {
175 if let Err(err) = async_array_write_read().await {
176 println!("{:?}", err);
177 }
178}