1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use zarrs::storage::{
5 storage_adapter::usage_log::UsageLogStorageAdapter, AsyncReadableWritableListableStorage,
6};
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9 use futures::StreamExt;
10 use std::sync::Arc;
11 use zarrs::{
12 array::{DataType, FillValue, ZARR_NAN_F32},
13 array_subset::ArraySubset,
14 node::Node,
15 };
16
17 let mut store: AsyncReadableWritableListableStorage = Arc::new(
19 zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
20 );
21 if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1) {
22 if arg1 == "--usage-log" {
23 let log_writer = Arc::new(std::sync::Mutex::new(
24 std::io::stdout(),
26 ));
28 store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
29 chrono::Utc::now().format("[%T%.3f] ").to_string()
30 }));
31 }
32 }
33
34 zarrs::group::GroupBuilder::new()
36 .build(store.clone(), "/")?
37 .async_store_metadata()
38 .await?;
39
40 let group_path = "/group";
42 let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
43 group
44 .attributes_mut()
45 .insert("foo".into(), serde_json::Value::String("bar".into()));
46 group.async_store_metadata().await?;
47
48 println!(
49 "The group metadata is:\n{}\n",
50 group.metadata().to_string_pretty()
51 );
52
53 let array_path = "/group/array";
55 let array = zarrs::array::ArrayBuilder::new(
56 vec![8, 8], DataType::Float32,
58 vec![4, 4].try_into()?, FillValue::from(ZARR_NAN_F32),
60 )
61 .dimension_names(["y", "x"].into())
63 .build_arc(store.clone(), array_path)?;
65
66 array.async_store_metadata().await?;
68
69 println!(
70 "The array metadata is:\n{}\n",
71 array.metadata().to_string_pretty()
72 );
73
74 let store_chunk = |i: u64| {
76 let array = array.clone();
77 async move {
78 let chunk_indices: Vec<u64> = vec![0, i];
79 let chunk_subset = array
80 .chunk_grid()
81 .subset(&chunk_indices, array.shape())?
82 .ok_or_else(|| {
83 zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
84 })?;
85 array
86 .async_store_chunk_elements(
87 &chunk_indices,
88 &vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
89 )
90 .await
91 }
92 };
93 futures::stream::iter(0..2)
94 .map(Ok)
95 .try_for_each_concurrent(None, store_chunk)
96 .await?;
97
98 let subset_all = array.subset_all();
99 let data_all = array
100 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
101 .await?;
102 println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
103
104 array
106 .async_store_chunks_elements::<f32>(
107 &ArraySubset::new_with_ranges(&[1..2, 0..2]),
108 &[
109 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
111 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
113 ],
114 )
115 .await?;
116 let data_all = array
117 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
118 .await?;
119 println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
120
121 array
123 .async_store_array_subset_elements::<f32>(
124 &ArraySubset::new_with_ranges(&[3..6, 3..6]),
125 &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
126 )
127 .await?;
128 let data_all = array
129 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
130 .await?;
131 println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
132
133 array
135 .async_store_array_subset_elements::<f32>(
136 &ArraySubset::new_with_ranges(&[0..8, 6..7]),
137 &[-0.6, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
138 )
139 .await?;
140 let data_all = array
141 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
142 .await?;
143 println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
144
145 array
147 .async_store_chunk_subset_elements::<f32>(
148 &[1, 1],
150 &ArraySubset::new_with_ranges(&[3..4, 0..4]),
152 &[-7.4, -7.5, -7.6, -7.7],
153 )
154 .await?;
155 let data_all = array
156 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
157 .await?;
158 println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
159
160 array.async_erase_chunk(&[0, 0]).await?;
162 let data_all = array
163 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
164 .await?;
165 println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
166
167 let chunk_indices = vec![0, 1];
169 let data_chunk = array
170 .async_retrieve_chunk_ndarray::<f32>(&chunk_indices)
171 .await?;
172 println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
173
174 let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
176 let data_chunks = array.async_retrieve_chunks_ndarray::<f32>(&chunks).await?;
177 println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
178
179 let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); let data_subset = array
182 .async_retrieve_array_subset_ndarray::<f32>(&subset)
183 .await?;
184 println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
185
186 let node = Node::async_open(store, "/").await.unwrap();
188 let tree = node.hierarchy_tree();
189 println!("hierarchy_tree:\n{}", tree);
190
191 Ok(())
192}
193
194#[tokio::main]
195async fn main() {
196 if let Err(err) = async_array_write_read().await {
197 println!("{:?}", err);
198 }
199}