1#![allow(missing_docs)]
2
3use futures::TryStreamExt;
4use zarrs::storage::{
5 storage_adapter::usage_log::UsageLogStorageAdapter, AsyncReadableWritableListableStorage,
6};
7
8async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
9 use futures::StreamExt;
10 use std::sync::Arc;
11 use zarrs::{
12 array::{DataType, ZARR_NAN_F32},
13 array_subset::ArraySubset,
14 node::Node,
15 };
16
17 let mut store: AsyncReadableWritableListableStorage = Arc::new(
19 zarrs_object_store::AsyncObjectStore::new(object_store::memory::InMemory::new()),
20 );
21 if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1) {
22 if arg1 == "--usage-log" {
23 let log_writer = Arc::new(std::sync::Mutex::new(
24 std::io::stdout(),
26 ));
28 store = Arc::new(UsageLogStorageAdapter::new(store, log_writer, || {
29 chrono::Utc::now().format("[%T%.3f] ").to_string()
30 }));
31 }
32 }
33
34 zarrs::group::GroupBuilder::new()
36 .build(store.clone(), "/")?
37 .async_store_metadata()
38 .await?;
39
40 let group_path = "/group";
42 let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
43 group
44 .attributes_mut()
45 .insert("foo".into(), serde_json::Value::String("bar".into()));
46 group.async_store_metadata().await?;
47
48 println!(
49 "The group metadata is:\n{}\n",
50 group.metadata().to_string_pretty()
51 );
52
53 let array_path = "/group/array";
55 let array = zarrs::array::ArrayBuilder::new(
56 vec![8, 8], vec![4, 4], DataType::Float32,
59 ZARR_NAN_F32,
60 )
61 .dimension_names(["y", "x"].into())
63 .build_arc(store.clone(), array_path)?;
65
66 array.async_store_metadata().await?;
68
69 println!(
70 "The array metadata is:\n{}\n",
71 array.metadata().to_string_pretty()
72 );
73
74 let store_chunk = |i: u64| {
76 let array = array.clone();
77 async move {
78 let chunk_indices: Vec<u64> = vec![0, i];
79 let chunk_subset = array.chunk_grid().subset(&chunk_indices)?.ok_or_else(|| {
80 zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
81 })?;
82 array
83 .async_store_chunk_elements(
84 &chunk_indices,
85 &vec![i as f32 * 0.1; chunk_subset.num_elements() as usize],
86 )
87 .await
88 }
89 };
90 futures::stream::iter(0..2)
91 .map(Ok)
92 .try_for_each_concurrent(None, store_chunk)
93 .await?;
94
95 let subset_all = array.subset_all();
96 let data_all = array
97 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
98 .await?;
99 println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
100
101 array
103 .async_store_chunks_elements::<f32>(
104 &ArraySubset::new_with_ranges(&[1..2, 0..2]),
105 &[
106 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
108 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
110 ],
111 )
112 .await?;
113 let data_all = array
114 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
115 .await?;
116 println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
117
118 array
120 .async_store_array_subset_elements::<f32>(
121 &ArraySubset::new_with_ranges(&[3..6, 3..6]),
122 &[-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
123 )
124 .await?;
125 let data_all = array
126 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
127 .await?;
128 println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
129
130 array
132 .async_store_array_subset_elements::<f32>(
133 &ArraySubset::new_with_ranges(&[0..8, 6..7]),
134 &[-0.6, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
135 )
136 .await?;
137 let data_all = array
138 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
139 .await?;
140 println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
141
142 array
144 .async_store_chunk_subset_elements::<f32>(
145 &[1, 1],
147 &ArraySubset::new_with_ranges(&[3..4, 0..4]),
149 &[-7.4, -7.5, -7.6, -7.7],
150 )
151 .await?;
152 let data_all = array
153 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
154 .await?;
155 println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
156
157 array.async_erase_chunk(&[0, 0]).await?;
159 let data_all = array
160 .async_retrieve_array_subset_ndarray::<f32>(&subset_all)
161 .await?;
162 println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
163
164 let chunk_indices = vec![0, 1];
166 let data_chunk = array
167 .async_retrieve_chunk_ndarray::<f32>(&chunk_indices)
168 .await?;
169 println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
170
171 let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
173 let data_chunks = array.async_retrieve_chunks_ndarray::<f32>(&chunks).await?;
174 println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
175
176 let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); let data_subset = array
179 .async_retrieve_array_subset_ndarray::<f32>(&subset)
180 .await?;
181 println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
182
183 let node = Node::async_open(store, "/").await.unwrap();
185 let tree = node.hierarchy_tree();
186 println!("hierarchy_tree:\n{}", tree);
187
188 Ok(())
189}
190
191#[tokio::main]
192async fn main() {
193 if let Err(err) = async_array_write_read().await {
194 println!("{:?}", err);
195 }
196}