Struct zarrs::storage::store::AsyncObjectStore
source · pub struct AsyncObjectStore<T: ObjectStore> { /* private fields */ }
Available on crate features
object_store
and async
only.Expand description
An asynchronous store backed by an object_store::ObjectStore
.
Implementations§
source§impl<T: ObjectStore> AsyncObjectStore<T>
impl<T: ObjectStore> AsyncObjectStore<T>
sourcepub fn new(object_store: T) -> Self
pub fn new(object_store: T) -> Self
Create a new AsyncObjectStore
.
Examples found in repository?
examples/async_array_write_read.rs (lines 22-24)
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
async fn async_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
use futures::{stream::FuturesUnordered, StreamExt};
use std::sync::Arc;
use zarrs::{
array::{DataType, FillValue, ZARR_NAN_F32},
array_subset::ArraySubset,
node::Node,
storage::store,
};
// Create a store
// let path = tempfile::TempDir::new()?;
// let mut store: ReadableWritableListableStorage = Arc::new(store::AsyncFilesystemStore::new(path.path())?);
// let mut store: ReadableWritableListableStorage = Arc::new(store::AsyncFilesystemStore::new(
// "tests/data/array_write_read.zarr",
// )?);
let mut store: AsyncReadableWritableListableStorage = Arc::new(store::AsyncObjectStore::new(
object_store::memory::InMemory::new(),
));
if let Some(arg1) = std::env::args().collect::<Vec<_>>().get(1) {
if arg1 == "--usage-log" {
let log_writer = Arc::new(std::sync::Mutex::new(
// std::io::BufWriter::new(
std::io::stdout(),
// )
));
let usage_log = Arc::new(UsageLogStorageTransformer::new(log_writer, || {
chrono::Utc::now().format("[%T%.3f] ").to_string()
}));
store = usage_log
.clone()
.create_async_readable_writable_listable_transformer(store);
}
}
// Create a group
let group_path = "/group";
let mut group = zarrs::group::GroupBuilder::new().build(store.clone(), group_path)?;
// Update group metadata
group
.attributes_mut()
.insert("foo".into(), serde_json::Value::String("bar".into()));
// Write group metadata to store
group.async_store_metadata().await?;
println!(
"The group metadata is:\n{}\n",
serde_json::to_string_pretty(&group.metadata())?
);
// Create an array
let array_path = "/group/array";
let array = zarrs::array::ArrayBuilder::new(
vec![8, 8], // array shape
DataType::Float32,
vec![4, 4].try_into()?, // regular chunk shape
FillValue::from(ZARR_NAN_F32),
)
// .bytes_to_bytes_codecs(vec![]) // uncompressed
.dimension_names(["y", "x"].into())
// .storage_transformers(vec![].into())
.build(store.clone(), array_path)?;
// Write array metadata to store
array.async_store_metadata().await?;
println!(
"The array metadata is:\n{}\n",
serde_json::to_string_pretty(&array.metadata())?
);
// Write some chunks
let subsets = (0..2)
.map(|i| {
let chunk_indices: Vec<u64> = vec![0, i];
array
.chunk_grid()
.subset(&chunk_indices, array.shape())?
.ok_or_else(|| {
zarrs::array::ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec())
})
.map(|chunk_subset| (i, chunk_indices, chunk_subset))
})
.collect::<Result<Vec<_>, _>>()?;
let mut futures = subsets
.iter()
.map(|(i, chunk_indices, chunk_subset)| {
array.async_store_chunk_elements(
&chunk_indices,
vec![*i as f32 * 0.1; chunk_subset.num_elements() as usize],
)
})
.collect::<FuturesUnordered<_>>();
while let Some(item) = futures.next().await {
item?;
}
let subset_all = ArraySubset::new_with_shape(array.shape().to_vec());
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");
// Store multiple chunks
array
.async_store_chunks_elements::<f32>(
&ArraySubset::new_with_ranges(&[1..2, 0..2]),
vec![
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
],
)
.await?;
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
// Write a subset spanning multiple chunks, including updating chunks already written
array
.async_store_array_subset_elements::<f32>(
&ArraySubset::new_with_ranges(&[3..6, 3..6]),
vec![-3.3, -3.4, -3.5, -4.3, -4.4, -4.5, -5.3, -5.4, -5.5],
)
.await?;
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_store_array_subset [3..6, 3..6]:\n{data_all:+4.1}\n");
// Store array subset
array
.async_store_array_subset_elements::<f32>(
&ArraySubset::new_with_ranges(&[0..8, 6..7]),
vec![-0.6, -1.6, -2.6, -3.6, -4.6, -5.6, -6.6, -7.6],
)
.await?;
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_store_array_subset [0..8, 6..7]:\n{data_all:+4.1}\n");
// Store chunk subset
array
.async_store_chunk_subset_elements::<f32>(
// chunk indices
&[1, 1],
// subset within chunk
&ArraySubset::new_with_ranges(&[3..4, 0..4]),
vec![-7.4, -7.5, -7.6, -7.7],
)
.await?;
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_store_chunk_subset [3..4, 0..4] of chunk [1, 1]:\n{data_all:+4.1}\n");
// Erase a chunk
array.async_erase_chunk(&[0, 0]).await?;
let data_all = array
.async_retrieve_array_subset_ndarray::<f32>(&subset_all)
.await?;
println!("async_erase_chunk [0, 0]:\n{data_all:+4.1}\n");
// Read a chunk
let chunk_indices = vec![0, 1];
let data_chunk = array
.async_retrieve_chunk_ndarray::<f32>(&chunk_indices)
.await?;
println!("async_retrieve_chunk [0, 1]:\n{data_chunk:+4.1}\n");
// Read chunks
let chunks = ArraySubset::new_with_ranges(&[0..2, 1..2]);
let data_chunks = array.async_retrieve_chunks_ndarray::<f32>(&chunks).await?;
println!("async_retrieve_chunks [0..2, 1..2]:\n{data_chunks:+4.1}\n");
// Retrieve an array subset
let subset = ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region
let data_subset = array
.async_retrieve_array_subset_ndarray::<f32>(&subset)
.await?;
println!("async_retrieve_array_subset [2..6, 3..5]:\n{data_subset:+4.1}\n");
// Show the hierarchy
let node = Node::async_new(&*store, "/").await.unwrap();
let tree = node.hierarchy_tree();
println!("hierarchy_tree:\n{}", tree);
Ok(())
}
Trait Implementations§
source§impl<T: ObjectStore> AsyncListableStorageTraits for AsyncObjectStore<T>
impl<T: ObjectStore> AsyncListableStorageTraits for AsyncObjectStore<T>
source§fn list<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<StoreKeys, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<StoreKeys, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn list_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<StoreKeys, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<StoreKeys, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn list_dir<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<StoreKeysPrefixes, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_dir<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<StoreKeysPrefixes, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§impl<T: ObjectStore> AsyncReadableStorageTraits for AsyncObjectStore<T>
impl<T: ObjectStore> AsyncReadableStorageTraits for AsyncObjectStore<T>
source§fn get<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<MaybeBytes, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<MaybeBytes, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn get_partial_values_key<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey,
byte_ranges: &'life2 [ByteRange]
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<Vec<u8>>>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn get_partial_values_key<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey,
byte_ranges: &'life2 [ByteRange]
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<Vec<u8>>>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Retrieve partial bytes from a list of byte ranges for a store key. Read more
source§fn get_partial_values<'life0, 'life1, 'async_trait>(
&'life0 self,
key_ranges: &'life1 [StoreKeyRange]
) -> Pin<Box<dyn Future<Output = Result<Vec<MaybeBytes>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_partial_values<'life0, 'life1, 'async_trait>(
&'life0 self,
key_ranges: &'life1 [StoreKeyRange]
) -> Pin<Box<dyn Future<Output = Result<Vec<MaybeBytes>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Retrieve partial bytes from a list of
StoreKeyRange
. Read moresource§fn size_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn size_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Return the size in bytes of all keys under
prefix
. Read moresource§fn size_key<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<Option<u64>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn size_key<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<Option<u64>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Return the size in bytes of the value at
key
. Read moresource§fn size<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn size<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Return the size in bytes of the readable storage. Read more
source§fn get_partial_values_batched_by_key<'life0, 'life1, 'async_trait>(
&'life0 self,
key_ranges: &'life1 [StoreKeyRange]
) -> Pin<Box<dyn Future<Output = Result<Vec<MaybeBytes>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_partial_values_batched_by_key<'life0, 'life1, 'async_trait>(
&'life0 self,
key_ranges: &'life1 [StoreKeyRange]
) -> Pin<Box<dyn Future<Output = Result<Vec<MaybeBytes>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
A utility method with the same input and output as
get_partial_values
that internally calls get_partial_values_key
with byte ranges grouped by key. Read moresource§impl<T: ObjectStore> AsyncWritableStorageTraits for AsyncObjectStore<T>
impl<T: ObjectStore> AsyncWritableStorageTraits for AsyncObjectStore<T>
source§fn set<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey,
value: Bytes
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn set<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey,
value: Bytes
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn set_partial_values<'life0, 'life1, 'async_trait>(
&'life0 self,
key_start_values: &'life1 [StoreKeyStartValue<'_>]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn set_partial_values<'life0, 'life1, 'async_trait>(
&'life0 self,
key_start_values: &'life1 [StoreKeyStartValue<'_>]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Store bytes according to a list of
StoreKeyStartValue
. Read moresource§fn erase<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn erase<'life0, 'life1, 'async_trait>(
&'life0 self,
key: &'life1 StoreKey
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn erase_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn erase_prefix<'life0, 'life1, 'async_trait>(
&'life0 self,
prefix: &'life1 StorePrefix
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
impl<T: ObjectStore> AsyncReadableWritableStorageTraits for AsyncObjectStore<T>
Auto Trait Implementations§
impl<T> Freeze for AsyncObjectStore<T>where
T: Freeze,
impl<T> RefUnwindSafe for AsyncObjectStore<T>where
T: RefUnwindSafe,
impl<T> Send for AsyncObjectStore<T>
impl<T> Sync for AsyncObjectStore<T>
impl<T> Unpin for AsyncObjectStore<T>where
T: Unpin,
impl<T> UnwindSafe for AsyncObjectStore<T>where
T: UnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more