use aes_gcm::{aes::cipher::consts::U12, AeadInPlace, Aes256Gcm, Key, Nonce, Tag};
use async_stream::try_stream;
use async_trait::async_trait;
use candid::{
utils::{encode_args, ArgumentEncoder},
CandidType, Decode, Principal,
};
use chrono::DateTime;
use futures::{stream::BoxStream, StreamExt};
use ic_agent::Agent;
use ic_cose_types::{BoxError, CanisterCaller};
use ic_oss_types::{format_error, object_store::*};
use serde_bytes::{ByteArray, ByteBuf, Bytes};
use std::{collections::BTreeSet, ops::Range, sync::Arc};
pub use object_store::{
self, path::Path, CopyMode, CopyOptions, DynObjectStore, MultipartUpload, ObjectStore,
RenameOptions, RenameTargetMode,
};
use crate::rand_bytes;
pub static STORE_NAME: &str = "ICObjectStore";
#[derive(Clone)]
pub struct Client {
agent: Arc<Agent>,
canister: Principal,
cipher: Option<Arc<Aes256Gcm>>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:Client({})", STORE_NAME, self.canister)
}
}
impl Client {
pub fn new(agent: Arc<Agent>, canister: Principal, aes_secret: Option<[u8; 32]>) -> Client {
use aes_gcm::KeyInit;
let cipher = aes_secret.map(|secret| {
let key = Key::<Aes256Gcm>::from(secret);
Arc::new(Aes256Gcm::new(&key))
});
Client {
agent,
canister,
cipher,
}
}
}
impl ObjectStoreSDK for Client {
fn canister(&self) -> &Principal {
&self.canister
}
fn cipher(&self) -> Option<Arc<Aes256Gcm>> {
self.cipher.clone()
}
}
impl CanisterCaller for Client {
async fn canister_query<
In: ArgumentEncoder + Send,
Out: CandidType + for<'a> candid::Deserialize<'a>,
>(
&self,
canister: &Principal,
method: &str,
args: In,
) -> Result<Out, BoxError> {
let input = encode_args(args)?;
let res = self
.agent
.query(canister, method)
.with_arg(input)
.call()
.await?;
let output = Decode!(res.as_slice(), Out)?;
Ok(output)
}
async fn canister_update<
In: ArgumentEncoder + Send,
Out: CandidType + for<'a> candid::Deserialize<'a>,
>(
&self,
canister: &Principal,
method: &str,
args: In,
) -> Result<Out, BoxError> {
let input = encode_args(args)?;
let res = self
.agent
.update(canister, method)
.with_arg(input)
.call_and_wait()
.await?;
let output = Decode!(res.as_slice(), Out)?;
Ok(output)
}
}
#[async_trait]
pub trait ObjectStoreSDK: CanisterCaller + Sized {
fn canister(&self) -> &Principal;
fn cipher(&self) -> Option<Arc<Aes256Gcm>>;
async fn get_state(&self) -> Result<StateInfo, String> {
self.canister_query(self.canister(), "get_state", ())
.await
.map_err(format_error)?
}
async fn is_member(&self, member_kind: &str, user: &Principal) -> Result<bool, String> {
self.canister_query(self.canister(), "is_member", (member_kind, user))
.await
.map_err(format_error)?
}
async fn admin_add_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
self.canister_update(self.canister(), "admin_add_managers", (args,))
.await
.map_err(format_error)?
}
async fn admin_remove_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
self.canister_update(self.canister(), "admin_remove_managers", (args,))
.await
.map_err(format_error)?
}
async fn admin_add_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
self.canister_update(self.canister(), "admin_add_auditors", (args,))
.await
.map_err(format_error)?
}
async fn admin_remove_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
self.canister_update(self.canister(), "admin_remove_auditors", (args,))
.await
.map_err(format_error)?
}
async fn put_opts(&self, path: &Path, payload: &Bytes, opts: PutOptions) -> Result<PutResult> {
if payload.len() > MAX_PAYLOAD_SIZE as usize {
return Err(Error::Precondition {
path: path.as_ref().to_string(),
error: format!(
"payload size {} exceeds max size {}",
payload.len(),
MAX_PAYLOAD_SIZE
),
});
}
self.canister_update(self.canister(), "put_opts", (path.as_ref(), payload, opts))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn delete(&self, path: &Path) -> Result<()> {
self.canister_update(self.canister(), "delete", (path.as_ref(),))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.canister_update(self.canister(), "copy", (from.as_ref(), to.as_ref()))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.canister_update(
self.canister(),
"copy_if_not_exists",
(from.as_ref(), to.as_ref()),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.canister_update(self.canister(), "rename", (from.as_ref(), to.as_ref()))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.canister_update(
self.canister(),
"rename_if_not_exists",
(from.as_ref(), to.as_ref()),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.canister_update(self.canister(), "create_multipart", (path.as_ref(),))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: u64,
payload: &Bytes,
) -> Result<PartId> {
self.canister_update(
self.canister(),
"put_part",
(path.as_ref(), id, part_idx, payload),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
opts: &PutMultipartOptions,
) -> Result<PutResult> {
self.canister_update(
self.canister(),
"complete_multipart",
(path.as_ref(), id, opts),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.canister_update(self.canister(), "abort_multipart", (path.as_ref(), id))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn get_part(&self, path: &Path, part_idx: u64) -> Result<ByteBuf> {
self.canister_query(self.canister(), "get_part", (path.as_ref(), part_idx))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn get_opts(&self, path: &Path, opts: GetOptions) -> Result<GetResult> {
self.canister_query(self.canister(), "get_opts", (path.as_ref(), opts))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn get_ranges(&self, path: &Path, ranges: &[(u64, u64)]) -> Result<Vec<ByteBuf>> {
if ranges.is_empty() {
return Ok(Vec::new());
}
self.canister_query(self.canister(), "get_ranges", (path.as_ref(), ranges))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn head(&self, path: &Path) -> Result<ObjectMeta> {
self.canister_query(self.canister(), "head", (path.as_ref(),))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn list(&self, prefix: Option<&Path>) -> Result<Vec<ObjectMeta>> {
self.canister_query(self.canister(), "list", (prefix.map(|p| p.as_ref()),))
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<Vec<ObjectMeta>> {
self.canister_query(
self.canister(),
"list_with_offset",
(prefix.map(|p| p.as_ref()), offset.as_ref()),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.canister_query(
self.canister(),
"list_with_delimiter",
(prefix.map(|p| p.as_ref()),),
)
.await
.map_err(|error| Error::Generic {
error: format_error(error),
})?
}
}
#[derive(Debug)]
pub struct MultipartUploader {
part_idx: u64,
parts_cache: Vec<u8>,
opts: PutMultipartOptions,
state: Arc<UploadState>,
}
struct UploadState {
client: Arc<Client>,
path: Path,
id: MultipartId,
}
impl std::fmt::Debug for UploadState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:UploadState({}, {})", STORE_NAME, self.path, self.id)
}
}
#[async_trait]
impl MultipartUpload for MultipartUploader {
fn put_part(&mut self, payload: object_store::PutPayload) -> object_store::UploadPart {
let payload = bytes::Bytes::from(payload);
self.parts_cache.extend_from_slice(&payload);
if self.parts_cache.len() < CHUNK_SIZE as usize {
return Box::pin(futures::future::ready(Ok(())));
}
let mut parts: Vec<object_store::UploadPart> = Vec::new();
while self.parts_cache.len() >= CHUNK_SIZE as usize {
let state = self.state.clone();
let mut chunk = self
.parts_cache
.drain(..CHUNK_SIZE as usize)
.collect::<Vec<u8>>();
if let Some(cipher) = &self.state.client.cipher {
let nonce = derive_gcm_nonce(
self.opts.aes_nonce.as_ref().as_ref().unwrap(),
self.part_idx,
);
match encrypt_chunk(cipher, Nonce::from_slice(&nonce), &mut chunk, &state.path) {
Ok(tag) => {
self.opts.aes_tags.as_mut().unwrap().push(tag);
}
Err(err) => {
return Box::pin(futures::future::ready(Err(err)));
}
}
}
let part_idx = self.part_idx;
self.part_idx += 1;
parts.push(Box::pin(async move {
let _ = state
.client
.put_part(&state.path, &state.id, part_idx, Bytes::new(&chunk))
.await
.map_err(from_error)?;
Ok(())
}))
}
Box::pin(async move {
for part in parts {
part.await?;
}
Ok(())
})
}
async fn complete(&mut self) -> object_store::Result<object_store::PutResult> {
for part in self.parts_cache.chunks_mut(CHUNK_SIZE as usize) {
let part_idx = self.part_idx;
self.part_idx += 1;
if let Some(cipher) = &self.state.client.cipher {
let nonce =
derive_gcm_nonce(self.opts.aes_nonce.as_ref().as_ref().unwrap(), part_idx);
match encrypt_chunk(cipher, Nonce::from_slice(&nonce), part, &self.state.path) {
Ok(tag) => {
self.opts.aes_tags.as_mut().unwrap().push(tag);
}
Err(err) => {
return Err(err);
}
}
}
let _ = self
.state
.client
.put_part(&self.state.path, &self.state.id, part_idx, Bytes::new(part))
.await
.map_err(from_error)?;
}
self.parts_cache.clear();
let res = self
.state
.client
.complete_multipart(&self.state.path, &self.state.id, &self.opts)
.await
.map_err(from_error)?;
Ok(object_store::PutResult {
e_tag: res.e_tag,
version: res.version,
})
}
async fn abort(&mut self) -> object_store::Result<()> {
self.state
.client
.abort_multipart(&self.state.path, &self.state.id)
.await
.map_err(from_error)
}
}
#[derive(Clone)]
pub struct ObjectStoreClient {
client: Arc<Client>,
}
impl ObjectStoreClient {
pub fn new(client: Arc<Client>) -> ObjectStoreClient {
ObjectStoreClient { client }
}
pub async fn get_state(&self) -> Result<StateInfo, String> {
self.client.get_state().await
}
async fn get_opts_inner(
&self,
path: &Path,
opts: object_store::GetOptions,
) -> object_store::Result<object_store::GetResult> {
let options = GetOptions {
if_match: opts.if_match,
if_none_match: opts.if_none_match,
if_modified_since: opts.if_modified_since.map(|v| v.timestamp_millis() as u64),
if_unmodified_since: opts
.if_unmodified_since
.map(|v| v.timestamp_millis() as u64),
range: opts.range.clone().map(to_get_range),
version: opts.version,
head: opts.head,
};
let res: GetResult = self
.client
.get_opts(path, options)
.await
.map_err(from_error)?;
let rr = if let Some(r) = &opts.range {
r.as_range(res.meta.size)
.map_err(|err| object_store::Error::Generic {
store: STORE_NAME,
source: err.into(),
})?
} else {
0..res.meta.size
};
let range = res.range.0..res.range.1;
let meta = from_object_meta(res.meta);
let attributes: object_store::Attributes = res
.attributes
.into_iter()
.map(|(k, v)| (from_attribute(k), v))
.collect();
let data = bytes::Bytes::from(res.payload.into_vec());
if opts.head || rr == range {
let stream = futures::stream::once(futures::future::ready(Ok(data)));
return Ok(object_store::GetResult {
payload: object_store::GetResultPayload::Stream(stream.boxed()),
meta,
range,
attributes,
});
}
let stream =
create_get_range_stream(self.client.clone(), path.clone(), rr.clone(), range, data);
Ok(object_store::GetResult {
payload: object_store::GetResultPayload::Stream(stream),
meta,
range: rr,
attributes,
})
}
}
impl std::fmt::Display for ObjectStoreClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:ObjectStoreClient", STORE_NAME)
}
}
impl std::fmt::Debug for ObjectStoreClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:ObjectStoreClient", STORE_NAME)
}
}
#[async_trait]
impl ObjectStore for ObjectStoreClient {
async fn put_opts(
&self,
path: &Path,
payload: object_store::PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<object_store::PutResult> {
let data = bytes::Bytes::from(payload);
let mut opts = to_put_options(&opts);
let payload: Vec<u8> = if let Some(cipher) = &self.client.cipher {
let base_nonce: [u8; 12] = rand_bytes();
let mut data: Vec<u8> = data.into();
let mut aes_tags: Vec<ByteArray<16>> = Vec::new();
for (i, chunk) in data.chunks_mut(CHUNK_SIZE as usize).enumerate() {
let nonce = derive_gcm_nonce(&base_nonce, i as u64);
let tag = encrypt_chunk(cipher, Nonce::from_slice(&nonce), chunk, path)?;
aes_tags.push(tag);
}
opts.aes_nonce = Some(base_nonce.into());
opts.aes_tags = Some(aes_tags);
data
} else {
data.into()
};
let res = self
.client
.put_opts(path, Bytes::new(&payload), opts)
.await
.map_err(from_error)?;
Ok(object_store::PutResult {
e_tag: res.e_tag,
version: res.version,
})
}
async fn put_multipart_opts(
&self,
path: &Path,
opts: object_store::PutMultipartOptions,
) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
let upload_id = self
.client
.create_multipart(path)
.await
.map_err(from_error)?;
let mut opts = PutMultipartOptions {
tags: opts.tags.encoded().to_string(),
attributes: opts
.attributes
.iter()
.map(|(k, v)| (to_attribute(k), v.to_string()))
.collect(),
..Default::default()
};
if self.client.cipher.is_some() {
opts.aes_nonce = Some(rand_bytes().into());
opts.aes_tags = Some(Vec::new());
}
Ok(Box::new(MultipartUploader {
part_idx: 0,
parts_cache: Vec::new(),
opts,
state: Arc::new(UploadState {
client: self.client.clone(),
path: path.clone(),
id: upload_id,
}),
}))
}
async fn get_opts(
&self,
location: &Path,
mut opts: object_store::GetOptions,
) -> object_store::Result<object_store::GetResult> {
if let Some(cipher) = self.client.cipher() {
let meta = self.client.head(location).await.map_err(from_error)?;
let range = if let Some(r) = &opts.range {
r.as_range(meta.size)
.map_err(|err| object_store::Error::Generic {
store: STORE_NAME,
source: err.into(),
})?
} else {
0..meta.size
};
let rr = (range.start / CHUNK_SIZE) * CHUNK_SIZE
..meta
.size
.min((1 + range.end.saturating_sub(1) / CHUNK_SIZE) * CHUNK_SIZE);
if rr.end > rr.start {
opts.range = Some(object_store::GetRange::Bounded(rr.clone()));
}
let res = self.get_opts_inner(location, opts).await?;
let obj = res.meta.clone();
let attributes = res.attributes.clone();
let start_idx = rr.start / CHUNK_SIZE;
let start_offset = (range.start - rr.start) as usize;
let size = (range.end - range.start) as usize;
let stream = create_decryption_stream(
res,
cipher,
meta.aes_tags.unwrap(),
*meta.aes_nonce.unwrap(),
location.clone(),
start_idx as usize,
start_offset,
size,
);
return Ok(object_store::GetResult {
payload: object_store::GetResultPayload::Stream(stream),
meta: obj,
range,
attributes,
});
}
self.get_opts_inner(location, opts).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<bytes::Bytes>> {
if ranges.is_empty() {
return Ok(Vec::new());
}
if let Some(cipher) = self.client.cipher() {
let meta = self.client.head(location).await.map_err(from_error)?;
ranges_is_valid(ranges, meta.size)?;
let aes_tags = meta.aes_tags.ok_or_else(|| object_store::Error::Generic {
store: STORE_NAME,
source: format!("missing AES256 tags for path {location} for ranges {ranges:?}")
.into(),
})?;
let base_nonce = meta.aes_nonce.ok_or_else(|| object_store::Error::Generic {
store: STORE_NAME,
source: format!("missing AES256 nonce for path {location}").into(),
})?;
let mut result: Vec<bytes::Bytes> = Vec::with_capacity(ranges.len());
let mut chunk_cache: Option<(usize, Vec<u8>)> = None; for &Range { start, end } in ranges {
let mut buf = Vec::with_capacity((end - start) as usize);
let start_chunk = (start / CHUNK_SIZE) as usize;
let end_chunk = ((end - 1) / CHUNK_SIZE) as usize;
for idx in start_chunk..=end_chunk {
let chunk_start = if idx == start_chunk {
start % CHUNK_SIZE
} else {
0
};
let chunk_end = if idx == end_chunk {
(end - 1) % CHUNK_SIZE + 1
} else {
CHUNK_SIZE
};
match &chunk_cache {
Some((cached_idx, cached_chunk)) if *cached_idx == idx => {
buf.extend_from_slice(
&cached_chunk[chunk_start as usize..chunk_end as usize],
);
}
_ => {
let tag =
aes_tags
.get(idx)
.ok_or_else(|| object_store::Error::Generic {
store: STORE_NAME,
source: format!(
"missing AES256 tag for chunk {idx} for path {location}"
)
.into(),
})?;
let chunk = self
.client
.get_part(location, idx as u64)
.await
.map_err(from_error)?;
let mut chunk = chunk.into_vec();
let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
decrypt_chunk(
&cipher,
Nonce::from_slice(&nonce),
&mut chunk,
tag,
location,
)?;
buf.extend_from_slice(&chunk[chunk_start as usize..chunk_end as usize]);
chunk_cache = Some((idx, chunk));
}
}
}
result.push(buf.into());
}
return Ok(result);
}
let ranges: Vec<(u64, u64)> = ranges.iter().map(|r| (r.start, r.end)).collect();
let res = self
.client
.get_ranges(location, &ranges)
.await
.map_err(from_error)?;
Ok(res
.into_iter()
.map(|v| bytes::Bytes::from(v.into_vec()))
.collect())
}
fn delete_stream(
&self,
locations: BoxStream<'static, object_store::Result<Path>>,
) -> BoxStream<'static, object_store::Result<Path>> {
let client = self.client.clone();
locations
.map(move |location| {
let _client = client.clone();
async move {
let location = location?;
match _client.delete(&location).await.map_err(from_error) {
Ok(_) => Ok(location),
Err(err) => Err(err),
}
}
})
.buffered(8)
.boxed()
}
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
let prefix = prefix.cloned();
let client = self.client.clone();
try_stream! {
let res = client.list(prefix.as_ref()).await.map_err(from_error)?;
for object in res {
yield from_object_meta(object);
}
}
.boxed()
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
let prefix = prefix.cloned();
let offset = offset.clone();
let client = self.client.clone();
try_stream! {
let res = client.list_with_offset(prefix.as_ref(), &offset).await.map_err(from_error)?;
for object in res {
yield from_object_meta(object);
}
}
.boxed()
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<object_store::ListResult> {
let res = self
.client
.list_with_delimiter(prefix)
.await
.map_err(from_error)?;
Ok(object_store::ListResult {
objects: res.objects.into_iter().map(from_object_meta).collect(),
common_prefixes: res
.common_prefixes
.into_iter()
.map(|p| Path::parse(p).unwrap())
.collect(),
})
}
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> object_store::Result<()> {
match options.mode {
CopyMode::Overwrite => self.client.copy(from, to).await.map_err(from_error),
CopyMode::Create => self
.client
.copy_if_not_exists(from, to)
.await
.map_err(from_error),
}
}
async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> object_store::Result<()> {
match options.target_mode {
RenameTargetMode::Overwrite => self.client.rename(from, to).await.map_err(from_error),
RenameTargetMode::Create => self
.client
.rename_if_not_exists(from, to)
.await
.map_err(from_error),
}
}
}
fn encrypt_chunk(
cipher: &Aes256Gcm,
nonce: &Nonce<U12>,
chunk: &mut [u8],
path: &Path,
) -> Result<ByteArray<16>, object_store::Error> {
let tag = cipher
.encrypt_in_place_detached(nonce, &[], chunk)
.map_err(|err| object_store::Error::Generic {
store: STORE_NAME,
source: format!("AES256 encrypt failed for path {path}: {err:?}").into(),
})?;
let tag: [u8; 16] = tag.into();
Ok(tag.into())
}
fn decrypt_chunk(
cipher: &Aes256Gcm,
nonce: &Nonce<U12>,
chunk: &mut [u8],
tag: &ByteArray<16>,
path: &Path,
) -> Result<(), object_store::Error> {
cipher
.decrypt_in_place_detached(nonce, &[], chunk, Tag::from_slice(tag.as_slice()))
.map_err(|err| object_store::Error::Generic {
store: STORE_NAME,
source: format!("AES256 decrypt failed for path {path}: {err:?}").into(),
})
}
#[allow(clippy::too_many_arguments)]
fn create_get_range_stream(
client: Arc<Client>,
location: Path,
request_range: Range<u64>,
first_range: Range<u64>,
first_payload: bytes::Bytes,
) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
try_stream! {
yield first_payload;
let mut remaining_ranges = Vec::new();
let mut current = first_range.end;
while current < request_range.end {
let end = (current + CHUNK_SIZE).min(request_range.end);
remaining_ranges.push(current..end);
current = end;
}
for r in remaining_ranges {
let res = client.get_ranges(&location, &[(r.start, r.end)]).await.map_err(from_error)?;
for data in res {
yield bytes::Bytes::from(data.into_vec());
}
}
}
.boxed()
}
#[allow(clippy::too_many_arguments)]
fn create_decryption_stream(
res: object_store::GetResult,
cipher: Arc<Aes256Gcm>,
aes_tags: Vec<ByteArray<16>>,
base_nonce: [u8; 12],
location: Path,
start_idx: usize,
start_offset: usize,
size: usize,
) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
try_stream! {
let mut stream = res.into_stream();
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize * 2);
let mut idx = start_idx;
let mut remaining = size;
while let Some(data) = stream.next().await {
let data = data?;
if remaining == 0 {
break;
}
buf.extend_from_slice(&data);
while remaining > 0 && buf.len() >= CHUNK_SIZE as usize {
let mut chunk = buf.drain(..CHUNK_SIZE as usize).collect::<Vec<u8>>();
let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
store: STORE_NAME,
source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
})?;
let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut chunk, tag, &location)?;
if idx == start_idx && start_offset > 0 {
chunk.drain(..start_offset);
}
if chunk.len() > remaining {
chunk.truncate(remaining);
}
remaining = remaining.saturating_sub(chunk.len());
yield bytes::Bytes::from(chunk);
idx += 1;
if remaining == 0 {
return;
}
}
}
if remaining > 0 && !buf.is_empty() {
let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
store: STORE_NAME,
source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
})?;
let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut buf, tag, &location)?;
if idx == start_idx && start_offset > 0 {
buf.drain(..start_offset);
}
buf.truncate(remaining);
yield bytes::Bytes::from(buf);
}
}.boxed()
}
pub fn from_error(err: Error) -> object_store::Error {
match err {
Error::Generic { error } => object_store::Error::Generic {
store: STORE_NAME,
source: error.into(),
},
Error::NotFound { ref path } => object_store::Error::NotFound {
path: path.clone(),
source: Box::new(err),
},
Error::InvalidPath { path } => object_store::Error::InvalidPath {
source: object_store::path::Error::InvalidPath { path: path.into() },
},
Error::NotSupported { error } => object_store::Error::NotSupported {
source: error.into(),
},
Error::AlreadyExists { ref path } => object_store::Error::AlreadyExists {
path: path.clone(),
source: err.into(),
},
Error::Precondition { path, error } => object_store::Error::Precondition {
path,
source: error.into(),
},
Error::NotModified { path, error } => object_store::Error::NotModified {
path,
source: error.into(),
},
Error::NotImplemented {
operation,
implementer,
} => object_store::Error::NotImplemented {
operation,
implementer,
},
Error::PermissionDenied { path, error } => object_store::Error::Precondition {
path,
source: error.into(),
},
Error::Unauthenticated { path, error } => object_store::Error::Precondition {
path,
source: error.into(),
},
Error::UnknownConfigurationKey { key } => object_store::Error::UnknownConfigurationKey {
store: STORE_NAME,
key,
},
_ => object_store::Error::Generic {
store: STORE_NAME,
source: Box::new(err),
},
}
}
pub fn from_object_meta(val: ObjectMeta) -> object_store::ObjectMeta {
object_store::ObjectMeta {
location: Path::parse(val.location).unwrap(),
last_modified: DateTime::from_timestamp_millis(val.last_modified as i64)
.expect("invalid timestamp"),
size: val.size,
e_tag: val.e_tag,
version: val.version,
}
}
pub fn to_get_range(val: object_store::GetRange) -> GetRange {
match val {
object_store::GetRange::Bounded(v) => GetRange::Bounded(v.start, v.end),
object_store::GetRange::Offset(v) => GetRange::Offset(v),
object_store::GetRange::Suffix(v) => GetRange::Suffix(v),
}
}
pub fn from_attribute(val: Attribute) -> object_store::Attribute {
match val {
Attribute::ContentDisposition => object_store::Attribute::ContentDisposition,
Attribute::ContentEncoding => object_store::Attribute::ContentEncoding,
Attribute::ContentLanguage => object_store::Attribute::ContentLanguage,
Attribute::ContentType => object_store::Attribute::ContentType,
Attribute::CacheControl => object_store::Attribute::CacheControl,
Attribute::Metadata(v) => object_store::Attribute::Metadata(v.into()),
}
}
pub fn to_attribute(val: &object_store::Attribute) -> Attribute {
match val {
object_store::Attribute::ContentDisposition => Attribute::ContentDisposition,
object_store::Attribute::ContentEncoding => Attribute::ContentEncoding,
object_store::Attribute::ContentLanguage => Attribute::ContentLanguage,
object_store::Attribute::ContentType => Attribute::ContentType,
object_store::Attribute::CacheControl => Attribute::CacheControl,
object_store::Attribute::Metadata(v) => Attribute::Metadata(v.to_string()),
_ => panic!("unexpected attribute"),
}
}
pub fn to_put_options(opts: &object_store::PutOptions) -> PutOptions {
let mode: PutMode = match opts.mode {
object_store::PutMode::Overwrite => PutMode::Overwrite,
object_store::PutMode::Create => PutMode::Create,
object_store::PutMode::Update(ref v) => PutMode::Update(UpdateVersion {
e_tag: v.e_tag.clone(),
version: v.version.clone(),
}),
};
PutOptions {
mode,
tags: opts.tags.encoded().to_string(),
attributes: opts
.attributes
.iter()
.map(|(k, v)| (to_attribute(k), v.to_string()))
.collect(),
..Default::default()
}
}
fn ranges_is_valid(ranges: &[Range<u64>], len: u64) -> object_store::Result<()> {
for range in ranges {
if range.start >= len {
return Err(object_store::Error::Generic {
store: STORE_NAME,
source: format!("start {} is larger than length {}", range.start, len).into(),
});
}
if range.end <= range.start {
return Err(object_store::Error::Generic {
store: STORE_NAME,
source: format!("end {} is less than start {}", range.end, range.start).into(),
});
}
if range.end > len {
return Err(object_store::Error::Generic {
store: STORE_NAME,
source: format!("end {} is larger than length {}", range.end, len).into(),
});
}
}
Ok(())
}
fn derive_gcm_nonce(base: &[u8; 12], idx: u64) -> [u8; 12] {
let mut nonce = *base;
let mut ctr = [0u8; 8];
ctr.copy_from_slice(&nonce[4..12]);
let c = u64::from_le_bytes(ctr).wrapping_add(idx);
nonce[4..12].copy_from_slice(&c.to_le_bytes());
nonce
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::build_agent;
use ic_agent::{identity::BasicIdentity, Identity};
use ic_cose_types::cose::sha3_256;
use object_store::{integration::*, ObjectStoreExt};
#[tokio::test(flavor = "current_thread")]
#[ignore]
async fn test_client() {
let secret = [8u8; 32];
let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
let id = BasicIdentity::from_raw_key(&secret);
println!("id: {:?}", id.sender().unwrap().to_text());
let agent = build_agent("http://localhost:4943", Arc::new(id))
.await
.unwrap();
let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
let oc = ObjectStoreClient::new(cli.clone());
let path = Path::from("test/hello.txt");
let payload = "Hello Anda!".as_bytes().to_vec();
let res = oc
.put_opts(&path, payload.clone().into(), Default::default())
.await
.unwrap();
println!("put result: {:?}", res);
let res = oc.get_opts(&path, Default::default()).await.unwrap();
assert_eq!(res.meta.size as usize, payload.len());
let res = res.bytes().await.unwrap();
assert_eq!(res.to_vec(), payload);
let res = cli.get_opts(&path, Default::default()).await.unwrap();
assert_eq!(res.meta.size as usize, payload.len());
assert_ne!(&res.payload, &payload);
let aes_nonce = res.meta.aes_nonce.unwrap();
assert_eq!(aes_nonce.len(), 12);
let aes_tags = res.meta.aes_tags.unwrap();
assert_eq!(aes_tags.len(), 1);
let now = chrono::Utc::now();
let path = Path::from(format!("test/{}.bin", now.timestamp_millis()));
let count = 20000u64;
let len = count * 32;
let mut payload = Vec::with_capacity(len as usize);
{
let mut uploder = oc
.put_multipart_opts(&path, Default::default())
.await
.unwrap();
for i in 0..count {
let data = sha3_256(&i.to_be_bytes()).to_vec();
payload.extend_from_slice(&data);
uploder
.put_part(object_store::PutPayload::from(data))
.await
.unwrap();
}
uploder.complete().await.unwrap();
}
let res = oc.get_opts(&path, Default::default()).await.unwrap();
assert_eq!(res.meta.size as usize, payload.len());
let res = res.bytes().await.unwrap();
assert_eq!(res.to_vec(), payload);
let res = cli.get_opts(&path, Default::default()).await.unwrap();
assert_eq!(res.meta.size as usize, payload.len());
assert_ne!(&res.payload, &payload);
let aes_nonce = res.meta.aes_nonce.unwrap();
assert_eq!(aes_nonce.len(), 12);
let aes_tags = res.meta.aes_tags.unwrap();
assert_eq!(aes_tags.len(), len.div_ceil(CHUNK_SIZE) as usize);
let ranges = vec![0u64..1000, 100..100000, len - CHUNK_SIZE - 1..len];
let rt = oc.get_ranges(&path, &ranges).await.unwrap();
assert_eq!(rt.len(), ranges.len());
for (i, Range { start, end }) in ranges.into_iter().enumerate() {
let res = oc
.get_opts(
&path,
object_store::GetOptions {
range: Some(object_store::GetRange::Bounded(start..end)),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(res.meta.location, path);
assert_eq!(res.meta.size as usize, payload.len());
let data = res.bytes().await.unwrap();
assert_eq!(rt[i].len(), data.len());
assert_eq!(&data, &payload[start as usize..end as usize]);
}
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
#[ignore]
async fn integration_test() {
let secret = [8u8; 32];
let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
let id = BasicIdentity::from_raw_key(&secret);
println!("id: {:?}", id.sender().unwrap().to_text());
let agent = build_agent("http://localhost:4943", Arc::new(id))
.await
.unwrap();
let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
let storage = ObjectStoreClient::new(cli.clone());
let location = Path::from(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&storage, Some(location))
.await
.unwrap_err();
if let object_store::Error::NotFound { path, .. } = err {
assert!(path.ends_with(NON_EXISTENT_NAME));
} else {
panic!("unexpected error type: {err:?}");
}
put_get_delete_list(&storage).await;
put_get_attributes(&storage).await;
get_opts(&storage).await;
put_opts(&storage, true).await;
list_uses_directories_correctly(&storage).await;
list_with_delimiter(&storage).await;
rename_and_copy(&storage).await;
copy_if_not_exists(&storage).await;
copy_rename_nonexistent_object(&storage).await;
multipart_out_of_order(&storage).await;
let objs = storage.list(None).collect::<Vec<_>>().await;
for obj in objs {
let obj = obj.unwrap();
storage
.delete(&obj.location)
.await
.expect("failed to delete object");
}
stream_get(&storage).await;
}
}