1use aes_gcm::{aes::cipher::consts::U12, AeadInPlace, Aes256Gcm, Key, Nonce, Tag};
2use async_stream::try_stream;
3use async_trait::async_trait;
4use candid::{
5 utils::{encode_args, ArgumentEncoder},
6 CandidType, Decode, Principal,
7};
8use chrono::DateTime;
9use futures::{stream::BoxStream, StreamExt};
10use ic_agent::Agent;
11use ic_cose_types::{BoxError, CanisterCaller};
12use ic_oss_types::{format_error, object_store::*};
13use serde_bytes::{ByteArray, ByteBuf, Bytes};
14use std::{collections::BTreeSet, ops::Range, sync::Arc};
15
16pub use object_store::{self, path::Path, DynObjectStore, MultipartUpload, ObjectStore};
17
18use crate::rand_bytes;
19
20pub static STORE_NAME: &str = "ICObjectStore";
21
22#[derive(Clone)]
31pub struct Client {
32 agent: Arc<Agent>,
33 canister: Principal,
34 cipher: Option<Arc<Aes256Gcm>>,
35}
36
37impl std::fmt::Debug for Client {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}:Client({})", STORE_NAME, self.canister)
40 }
41}
42
43impl Client {
44 pub fn new(agent: Arc<Agent>, canister: Principal, aes_secret: Option<[u8; 32]>) -> Client {
46 use aes_gcm::KeyInit;
47
48 let cipher = aes_secret.map(|secret| {
49 let key = Key::<Aes256Gcm>::from(secret);
50 Arc::new(Aes256Gcm::new(&key))
51 });
52
53 Client {
54 agent,
55 canister,
56 cipher,
57 }
58 }
59}
60
61impl ObjectStoreSDK for Client {
62 fn canister(&self) -> &Principal {
63 &self.canister
64 }
65
66 fn cipher(&self) -> Option<Arc<Aes256Gcm>> {
67 self.cipher.clone()
68 }
69}
70
71impl CanisterCaller for Client {
72 async fn canister_query<
73 In: ArgumentEncoder + Send,
74 Out: CandidType + for<'a> candid::Deserialize<'a>,
75 >(
76 &self,
77 canister: &Principal,
78 method: &str,
79 args: In,
80 ) -> Result<Out, BoxError> {
81 let input = encode_args(args)?;
82 let res = self
83 .agent
84 .query(canister, method)
85 .with_arg(input)
86 .call()
87 .await?;
88 let output = Decode!(res.as_slice(), Out)?;
89 Ok(output)
90 }
91
92 async fn canister_update<
93 In: ArgumentEncoder + Send,
94 Out: CandidType + for<'a> candid::Deserialize<'a>,
95 >(
96 &self,
97 canister: &Principal,
98 method: &str,
99 args: In,
100 ) -> Result<Out, BoxError> {
101 let input = encode_args(args)?;
102 let res = self
103 .agent
104 .update(canister, method)
105 .with_arg(input)
106 .call_and_wait()
107 .await?;
108 let output = Decode!(res.as_slice(), Out)?;
109 Ok(output)
110 }
111}
112
113#[async_trait]
114pub trait ObjectStoreSDK: CanisterCaller + Sized {
115 fn canister(&self) -> &Principal;
116 fn cipher(&self) -> Option<Arc<Aes256Gcm>>;
117
118 async fn get_state(&self) -> Result<StateInfo, String> {
120 self.canister_query(self.canister(), "get_state", ())
121 .await
122 .map_err(format_error)?
123 }
124
125 async fn is_member(&self, member_kind: &str, user: &Principal) -> Result<bool, String> {
126 self.canister_query(self.canister(), "is_member", (member_kind, user))
127 .await
128 .map_err(format_error)?
129 }
130
131 async fn admin_add_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
133 self.canister_update(self.canister(), "admin_add_managers", (args,))
134 .await
135 .map_err(format_error)?
136 }
137
138 async fn admin_remove_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
140 self.canister_update(self.canister(), "admin_remove_managers", (args,))
141 .await
142 .map_err(format_error)?
143 }
144
145 async fn admin_add_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
147 self.canister_update(self.canister(), "admin_add_auditors", (args,))
148 .await
149 .map_err(format_error)?
150 }
151
152 async fn admin_remove_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
154 self.canister_update(self.canister(), "admin_remove_auditors", (args,))
155 .await
156 .map_err(format_error)?
157 }
158
159 async fn put_opts(&self, path: &Path, payload: &Bytes, opts: PutOptions) -> Result<PutResult> {
161 if payload.len() > MAX_PAYLOAD_SIZE as usize {
162 return Err(Error::Precondition {
163 path: path.as_ref().to_string(),
164 error: format!(
165 "payload size {} exceeds max size {}",
166 payload.len(),
167 MAX_PAYLOAD_SIZE
168 ),
169 });
170 }
171
172 self.canister_update(self.canister(), "put_opts", (path.as_ref(), payload, opts))
173 .await
174 .map_err(|error| Error::Generic {
175 error: format_error(error),
176 })?
177 }
178
179 async fn delete(&self, path: &Path) -> Result<()> {
181 self.canister_update(self.canister(), "delete", (path.as_ref(),))
182 .await
183 .map_err(|error| Error::Generic {
184 error: format_error(error),
185 })?
186 }
187
188 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
190 self.canister_update(self.canister(), "copy", (from.as_ref(), to.as_ref()))
191 .await
192 .map_err(|error| Error::Generic {
193 error: format_error(error),
194 })?
195 }
196
197 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
199 self.canister_update(
200 self.canister(),
201 "copy_if_not_exists",
202 (from.as_ref(), to.as_ref()),
203 )
204 .await
205 .map_err(|error| Error::Generic {
206 error: format_error(error),
207 })?
208 }
209
210 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
212 self.canister_update(self.canister(), "rename", (from.as_ref(), to.as_ref()))
213 .await
214 .map_err(|error| Error::Generic {
215 error: format_error(error),
216 })?
217 }
218
219 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
221 self.canister_update(
222 self.canister(),
223 "rename_if_not_exists",
224 (from.as_ref(), to.as_ref()),
225 )
226 .await
227 .map_err(|error| Error::Generic {
228 error: format_error(error),
229 })?
230 }
231
232 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
234 self.canister_update(self.canister(), "create_multipart", (path.as_ref(),))
235 .await
236 .map_err(|error| Error::Generic {
237 error: format_error(error),
238 })?
239 }
240
241 async fn put_part(
243 &self,
244 path: &Path,
245 id: &MultipartId,
246 part_idx: u64,
247 payload: &Bytes,
248 ) -> Result<PartId> {
249 self.canister_update(
250 self.canister(),
251 "put_part",
252 (path.as_ref(), id, part_idx, payload),
253 )
254 .await
255 .map_err(|error| Error::Generic {
256 error: format_error(error),
257 })?
258 }
259
260 async fn complete_multipart(
262 &self,
263 path: &Path,
264 id: &MultipartId,
265 opts: &PutMultipartOptions,
266 ) -> Result<PutResult> {
267 self.canister_update(
268 self.canister(),
269 "complete_multipart",
270 (path.as_ref(), id, opts),
271 )
272 .await
273 .map_err(|error| Error::Generic {
274 error: format_error(error),
275 })?
276 }
277
278 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
280 self.canister_update(self.canister(), "abort_multipart", (path.as_ref(), id))
281 .await
282 .map_err(|error| Error::Generic {
283 error: format_error(error),
284 })?
285 }
286
287 async fn get_part(&self, path: &Path, part_idx: u64) -> Result<ByteBuf> {
289 self.canister_query(self.canister(), "get_part", (path.as_ref(), part_idx))
290 .await
291 .map_err(|error| Error::Generic {
292 error: format_error(error),
293 })?
294 }
295
296 async fn get_opts(&self, path: &Path, opts: GetOptions) -> Result<GetResult> {
298 self.canister_query(self.canister(), "get_opts", (path.as_ref(), opts))
299 .await
300 .map_err(|error| Error::Generic {
301 error: format_error(error),
302 })?
303 }
304
305 async fn get_ranges(&self, path: &Path, ranges: &[(u64, u64)]) -> Result<Vec<ByteBuf>> {
307 if ranges.is_empty() {
308 return Ok(Vec::new());
309 }
310
311 self.canister_query(self.canister(), "get_ranges", (path.as_ref(), ranges))
312 .await
313 .map_err(|error| Error::Generic {
314 error: format_error(error),
315 })?
316 }
317
318 async fn head(&self, path: &Path) -> Result<ObjectMeta> {
320 self.canister_query(self.canister(), "head", (path.as_ref(),))
321 .await
322 .map_err(|error| Error::Generic {
323 error: format_error(error),
324 })?
325 }
326
327 async fn list(&self, prefix: Option<&Path>) -> Result<Vec<ObjectMeta>> {
329 self.canister_query(self.canister(), "list", (prefix.map(|p| p.as_ref()),))
330 .await
331 .map_err(|error| Error::Generic {
332 error: format_error(error),
333 })?
334 }
335
336 async fn list_with_offset(
338 &self,
339 prefix: Option<&Path>,
340 offset: &Path,
341 ) -> Result<Vec<ObjectMeta>> {
342 self.canister_query(
343 self.canister(),
344 "list_with_offset",
345 (prefix.map(|p| p.as_ref()), offset.as_ref()),
346 )
347 .await
348 .map_err(|error| Error::Generic {
349 error: format_error(error),
350 })?
351 }
352
353 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
355 self.canister_query(
356 self.canister(),
357 "list_with_delimiter",
358 (prefix.map(|p| p.as_ref()),),
359 )
360 .await
361 .map_err(|error| Error::Generic {
362 error: format_error(error),
363 })?
364 }
365}
366
367#[derive(Debug)]
369pub struct MultipartUploader {
370 part_idx: u64,
371 parts_cache: Vec<u8>,
372 opts: PutMultipartOptions,
373 state: Arc<UploadState>,
374}
375
376struct UploadState {
378 client: Arc<Client>,
379 path: Path,
380 id: MultipartId,
381}
382
383impl std::fmt::Debug for UploadState {
384 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385 write!(f, "{}:UploadState({}, {})", STORE_NAME, self.path, self.id)
386 }
387}
388
389#[async_trait]
390impl MultipartUpload for MultipartUploader {
391 fn put_part(&mut self, payload: object_store::PutPayload) -> object_store::UploadPart {
393 let payload = bytes::Bytes::from(payload);
394 self.parts_cache.extend_from_slice(&payload);
395 if self.parts_cache.len() < CHUNK_SIZE as usize {
396 return Box::pin(futures::future::ready(Ok(())));
397 }
398
399 let mut parts: Vec<object_store::UploadPart> = Vec::new();
400 while self.parts_cache.len() >= CHUNK_SIZE as usize {
401 let state = self.state.clone();
402 let mut chunk = self
403 .parts_cache
404 .drain(..CHUNK_SIZE as usize)
405 .collect::<Vec<u8>>();
406
407 if let Some(cipher) = &self.state.client.cipher {
408 let nonce = derive_gcm_nonce(
409 self.opts.aes_nonce.as_ref().as_ref().unwrap(),
410 self.part_idx,
411 );
412 match encrypt_chunk(cipher, Nonce::from_slice(&nonce), &mut chunk, &state.path) {
413 Ok(tag) => {
414 self.opts.aes_tags.as_mut().unwrap().push(tag);
415 }
416 Err(err) => {
417 return Box::pin(futures::future::ready(Err(err)));
418 }
419 }
420 }
421
422 let part_idx = self.part_idx;
423 self.part_idx += 1;
424 parts.push(Box::pin(async move {
425 let _ = state
426 .client
427 .put_part(&state.path, &state.id, part_idx, Bytes::new(&chunk))
428 .await
429 .map_err(from_error)?;
430 Ok(())
431 }))
432 }
433
434 Box::pin(async move {
435 for part in parts {
436 part.await?;
437 }
438
439 Ok(())
440 })
441 }
442
443 async fn complete(&mut self) -> object_store::Result<object_store::PutResult> {
445 for part in self.parts_cache.chunks_mut(CHUNK_SIZE as usize) {
446 let part_idx = self.part_idx;
447 self.part_idx += 1;
448
449 if let Some(cipher) = &self.state.client.cipher {
450 let nonce =
451 derive_gcm_nonce(self.opts.aes_nonce.as_ref().as_ref().unwrap(), part_idx);
452 match encrypt_chunk(cipher, Nonce::from_slice(&nonce), part, &self.state.path) {
453 Ok(tag) => {
454 self.opts.aes_tags.as_mut().unwrap().push(tag);
455 }
456 Err(err) => {
457 return Err(err);
458 }
459 }
460 }
461
462 let _ = self
463 .state
464 .client
465 .put_part(&self.state.path, &self.state.id, part_idx, Bytes::new(part))
466 .await
467 .map_err(from_error)?;
468 }
469
470 self.parts_cache.clear();
471 let res = self
472 .state
473 .client
474 .complete_multipart(&self.state.path, &self.state.id, &self.opts)
475 .await
476 .map_err(from_error)?;
477 Ok(object_store::PutResult {
478 e_tag: res.e_tag,
479 version: res.version,
480 })
481 }
482
483 async fn abort(&mut self) -> object_store::Result<()> {
485 self.state
486 .client
487 .abort_multipart(&self.state.path, &self.state.id)
488 .await
489 .map_err(from_error)
490 }
491}
492
493#[derive(Clone)]
495pub struct ObjectStoreClient {
496 client: Arc<Client>,
497}
498
499impl ObjectStoreClient {
500 pub fn new(client: Arc<Client>) -> ObjectStoreClient {
501 ObjectStoreClient { client }
502 }
503
504 pub async fn get_state(&self) -> Result<StateInfo, String> {
505 self.client.get_state().await
506 }
507
508 async fn get_opts_inner(
509 &self,
510 path: &Path,
511 opts: object_store::GetOptions,
512 ) -> object_store::Result<object_store::GetResult> {
513 let options = GetOptions {
514 if_match: opts.if_match,
515 if_none_match: opts.if_none_match,
516 if_modified_since: opts.if_modified_since.map(|v| v.timestamp_millis() as u64),
517 if_unmodified_since: opts
518 .if_unmodified_since
519 .map(|v| v.timestamp_millis() as u64),
520 range: opts.range.clone().map(to_get_range),
521 version: opts.version,
522 head: opts.head,
523 };
524
525 let res: GetResult = self
526 .client
527 .get_opts(path, options)
528 .await
529 .map_err(from_error)?;
530
531 let rr = if let Some(r) = &opts.range {
533 r.as_range(res.meta.size)
534 .map_err(|err| object_store::Error::Generic {
535 store: STORE_NAME,
536 source: err.into(),
537 })?
538 } else {
539 0..res.meta.size
540 };
541 let range = res.range.0..res.range.1;
543 let meta = from_object_meta(res.meta);
544 let attributes: object_store::Attributes = res
545 .attributes
546 .into_iter()
547 .map(|(k, v)| (from_attribute(k), v))
548 .collect();
549 let data = bytes::Bytes::from(res.payload.into_vec());
550 if opts.head || rr == range {
551 let stream = futures::stream::once(futures::future::ready(Ok(data)));
552 return Ok(object_store::GetResult {
553 payload: object_store::GetResultPayload::Stream(stream.boxed()),
554 meta,
555 range,
556 attributes,
557 });
558 }
559
560 let stream =
561 create_get_range_stream(self.client.clone(), path.clone(), rr.clone(), range, data);
562 Ok(object_store::GetResult {
563 payload: object_store::GetResultPayload::Stream(stream),
564 meta,
565 range: rr,
566 attributes,
567 })
568 }
569}
570
571impl std::fmt::Display for ObjectStoreClient {
572 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573 write!(f, "{}:ObjectStoreClient", STORE_NAME)
574 }
575}
576
577impl std::fmt::Debug for ObjectStoreClient {
578 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579 write!(f, "{}:ObjectStoreClient", STORE_NAME)
580 }
581}
582
583#[async_trait]
584impl ObjectStore for ObjectStoreClient {
585 async fn put_opts(
587 &self,
588 path: &Path,
589 payload: object_store::PutPayload,
590 opts: object_store::PutOptions,
591 ) -> object_store::Result<object_store::PutResult> {
592 let data = bytes::Bytes::from(payload);
593 let mut opts = to_put_options(&opts);
594 let payload: Vec<u8> = if let Some(cipher) = &self.client.cipher {
595 let base_nonce: [u8; 12] = rand_bytes();
596 let mut data: Vec<u8> = data.into();
597 let mut aes_tags: Vec<ByteArray<16>> = Vec::new();
598 for (i, chunk) in data.chunks_mut(CHUNK_SIZE as usize).enumerate() {
599 let nonce = derive_gcm_nonce(&base_nonce, i as u64);
600 let tag = encrypt_chunk(cipher, Nonce::from_slice(&nonce), chunk, path)?;
601 aes_tags.push(tag);
602 }
603 opts.aes_nonce = Some(base_nonce.into());
604 opts.aes_tags = Some(aes_tags);
605 data
606 } else {
607 data.into()
608 };
609
610 let res = self
611 .client
612 .put_opts(path, Bytes::new(&payload), opts)
613 .await
614 .map_err(from_error)?;
615 Ok(object_store::PutResult {
616 e_tag: res.e_tag,
617 version: res.version,
618 })
619 }
620
621 async fn put_multipart_opts(
623 &self,
624 path: &Path,
625 opts: object_store::PutMultipartOptions,
626 ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
627 let upload_id = self
628 .client
629 .create_multipart(path)
630 .await
631 .map_err(from_error)?;
632 let mut opts = PutMultipartOptions {
633 tags: opts.tags.encoded().to_string(),
634 attributes: opts
635 .attributes
636 .iter()
637 .map(|(k, v)| (to_attribute(k), v.to_string()))
638 .collect(),
639 ..Default::default()
640 };
641
642 if self.client.cipher.is_some() {
643 opts.aes_nonce = Some(rand_bytes().into());
644 opts.aes_tags = Some(Vec::new());
645 }
646
647 Ok(Box::new(MultipartUploader {
648 part_idx: 0,
649 parts_cache: Vec::new(),
650 opts,
651 state: Arc::new(UploadState {
652 client: self.client.clone(),
653 path: path.clone(),
654 id: upload_id,
655 }),
656 }))
657 }
658
659 async fn get_opts(
660 &self,
661 location: &Path,
662 mut opts: object_store::GetOptions,
663 ) -> object_store::Result<object_store::GetResult> {
664 if let Some(cipher) = self.client.cipher() {
665 let meta = self.client.head(location).await.map_err(from_error)?;
666
667 let range = if let Some(r) = &opts.range {
669 r.as_range(meta.size)
670 .map_err(|err| object_store::Error::Generic {
671 store: STORE_NAME,
672 source: err.into(),
673 })?
674 } else {
675 0..meta.size
676 };
677
678 let rr = (range.start / CHUNK_SIZE) * CHUNK_SIZE
680 ..meta
681 .size
682 .min((1 + range.end.saturating_sub(1) / CHUNK_SIZE) * CHUNK_SIZE);
683
684 if rr.end > rr.start {
685 opts.range = Some(object_store::GetRange::Bounded(rr.clone()));
686 }
687
688 let res = self.get_opts_inner(location, opts).await?;
689 let obj = res.meta.clone();
690
691 let attributes = res.attributes.clone();
692 let start_idx = rr.start / CHUNK_SIZE;
693 let start_offset = (range.start - rr.start) as usize;
694 let size = (range.end - range.start) as usize;
695
696 let stream = create_decryption_stream(
697 res,
698 cipher,
699 meta.aes_tags.unwrap(),
700 *meta.aes_nonce.unwrap(),
701 location.clone(),
702 start_idx as usize,
703 start_offset,
704 size,
705 );
706
707 return Ok(object_store::GetResult {
708 payload: object_store::GetResultPayload::Stream(stream),
709 meta: obj,
710 range,
711 attributes,
712 });
713 }
714
715 self.get_opts_inner(location, opts).await
716 }
717
718 async fn get_range(
720 &self,
721 path: &Path,
722 range: Range<u64>,
723 ) -> object_store::Result<bytes::Bytes> {
724 #[allow(clippy::single_range_in_vec_init)]
725 let mut res = self.get_ranges(path, &[range.start..range.end]).await?;
726 res.pop().ok_or_else(|| object_store::Error::NotFound {
727 path: path.as_ref().to_string(),
728 source: "get_ranges result should not be empty".into(),
729 })
730 }
731
732 async fn get_ranges(
734 &self,
735 location: &Path,
736 ranges: &[Range<u64>],
737 ) -> object_store::Result<Vec<bytes::Bytes>> {
738 if ranges.is_empty() {
739 return Ok(Vec::new());
740 }
741
742 if let Some(cipher) = self.client.cipher() {
743 let meta = self.client.head(location).await.map_err(from_error)?;
744 ranges_is_valid(ranges, meta.size)?;
745 let aes_tags = meta.aes_tags.ok_or_else(|| object_store::Error::Generic {
746 store: STORE_NAME,
747 source: format!("missing AES256 tags for path {location} for ranges {ranges:?}")
748 .into(),
749 })?;
750 let base_nonce = meta.aes_nonce.ok_or_else(|| object_store::Error::Generic {
751 store: STORE_NAME,
752 source: format!("missing AES256 nonce for path {location}").into(),
753 })?;
754
755 let mut result: Vec<bytes::Bytes> = Vec::with_capacity(ranges.len());
756 let mut chunk_cache: Option<(usize, Vec<u8>)> = None; for &Range { start, end } in ranges {
758 let mut buf = Vec::with_capacity((end - start) as usize);
759 let start_chunk = (start / CHUNK_SIZE) as usize;
761 let end_chunk = ((end - 1) / CHUNK_SIZE) as usize;
762
763 for idx in start_chunk..=end_chunk {
764 let chunk_start = if idx == start_chunk {
766 start % CHUNK_SIZE
767 } else {
768 0
769 };
770
771 let chunk_end = if idx == end_chunk {
772 (end - 1) % CHUNK_SIZE + 1
773 } else {
774 CHUNK_SIZE
775 };
776
777 match &chunk_cache {
778 Some((cached_idx, cached_chunk)) if *cached_idx == idx => {
779 buf.extend_from_slice(
780 &cached_chunk[chunk_start as usize..chunk_end as usize],
781 );
782 }
783 _ => {
784 let tag =
785 aes_tags
786 .get(idx)
787 .ok_or_else(|| object_store::Error::Generic {
788 store: STORE_NAME,
789 source: format!(
790 "missing AES256 tag for chunk {idx} for path {location}"
791 )
792 .into(),
793 })?;
794 let chunk = self
795 .client
796 .get_part(location, idx as u64)
797 .await
798 .map_err(from_error)?;
799 let mut chunk = chunk.into_vec();
800 let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
801 decrypt_chunk(
802 &cipher,
803 Nonce::from_slice(&nonce),
804 &mut chunk,
805 tag,
806 location,
807 )?;
808 buf.extend_from_slice(&chunk[chunk_start as usize..chunk_end as usize]);
809 chunk_cache = Some((idx, chunk));
810 }
811 }
812 }
813 result.push(buf.into());
814 }
815
816 return Ok(result);
817 }
818
819 let ranges: Vec<(u64, u64)> = ranges.iter().map(|r| (r.start, r.end)).collect();
820 let res = self
821 .client
822 .get_ranges(location, &ranges)
823 .await
824 .map_err(from_error)?;
825
826 Ok(res
827 .into_iter()
828 .map(|v| bytes::Bytes::from(v.into_vec()))
829 .collect())
830 }
831
832 async fn head(&self, location: &Path) -> object_store::Result<object_store::ObjectMeta> {
834 let res = self.client.head(location).await.map_err(from_error)?;
835 Ok(from_object_meta(res))
836 }
837
838 async fn delete(&self, location: &Path) -> object_store::Result<()> {
840 self.client.delete(location).await.map_err(from_error)
841 }
842
843 fn list(
845 &self,
846 prefix: Option<&Path>,
847 ) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
848 let prefix = prefix.cloned();
849 let client = self.client.clone();
850 try_stream! {
851 let res = client.list(prefix.as_ref()).await.map_err(from_error)?;
852 for object in res {
853 yield from_object_meta(object);
854 }
855 }
856 .boxed()
857 }
858
859 fn list_with_offset(
861 &self,
862 prefix: Option<&Path>,
863 offset: &Path,
864 ) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
865 let prefix = prefix.cloned();
866 let offset = offset.clone();
867 let client = self.client.clone();
868 try_stream! {
869 let res = client.list_with_offset(prefix.as_ref(), &offset).await.map_err(from_error)?;
870 for object in res {
871 yield from_object_meta(object);
872 }
873 }
874 .boxed()
875 }
876
877 async fn list_with_delimiter(
879 &self,
880 prefix: Option<&Path>,
881 ) -> object_store::Result<object_store::ListResult> {
882 let res = self
883 .client
884 .list_with_delimiter(prefix)
885 .await
886 .map_err(from_error)?;
887
888 Ok(object_store::ListResult {
889 objects: res.objects.into_iter().map(from_object_meta).collect(),
890 common_prefixes: res
891 .common_prefixes
892 .into_iter()
893 .map(|p| Path::parse(p).unwrap())
894 .collect(),
895 })
896 }
897
898 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
900 self.client.copy(from, to).await.map_err(from_error)
901 }
902
903 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
905 self.client
906 .copy_if_not_exists(from, to)
907 .await
908 .map_err(from_error)
909 }
910
911 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
913 self.client.rename(from, to).await.map_err(from_error)
914 }
915
916 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
918 self.client
919 .rename_if_not_exists(from, to)
920 .await
921 .map_err(from_error)
922 }
923}
924
925fn encrypt_chunk(
926 cipher: &Aes256Gcm,
927 nonce: &Nonce<U12>,
928 chunk: &mut [u8],
929 path: &Path,
930) -> Result<ByteArray<16>, object_store::Error> {
931 let tag = cipher
932 .encrypt_in_place_detached(nonce, &[], chunk)
933 .map_err(|err| object_store::Error::Generic {
934 store: STORE_NAME,
935 source: format!("AES256 encrypt failed for path {path}: {err:?}").into(),
936 })?;
937 let tag: [u8; 16] = tag.into();
938 Ok(tag.into())
939}
940
941fn decrypt_chunk(
942 cipher: &Aes256Gcm,
943 nonce: &Nonce<U12>,
944 chunk: &mut [u8],
945 tag: &ByteArray<16>,
946 path: &Path,
947) -> Result<(), object_store::Error> {
948 cipher
949 .decrypt_in_place_detached(nonce, &[], chunk, Tag::from_slice(tag.as_slice()))
950 .map_err(|err| object_store::Error::Generic {
951 store: STORE_NAME,
952 source: format!("AES256 decrypt failed for path {path}: {err:?}").into(),
953 })
954}
955
956#[allow(clippy::too_many_arguments)]
957fn create_get_range_stream(
958 client: Arc<Client>,
959 location: Path,
960 request_range: Range<u64>,
961 first_range: Range<u64>,
962 first_payload: bytes::Bytes,
963) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
964 try_stream! {
965 yield first_payload;
966
967 let mut remaining_ranges = Vec::new();
969 let mut current = first_range.end;
970 while current < request_range.end {
971 let end = (current + CHUNK_SIZE).min(request_range.end);
972 remaining_ranges.push(current..end);
973 current = end;
974 }
975
976 for r in remaining_ranges {
978 let res = client.get_ranges(&location, &[(r.start, r.end)]).await.map_err(from_error)?;
979 for data in res {
980 yield bytes::Bytes::from(data.into_vec());
981 }
982 }
983 }
984 .boxed()
985}
986
987#[allow(clippy::too_many_arguments)]
988fn create_decryption_stream(
989 res: object_store::GetResult,
990 cipher: Arc<Aes256Gcm>,
991 aes_tags: Vec<ByteArray<16>>,
992 base_nonce: [u8; 12],
993 location: Path,
994 start_idx: usize,
995 start_offset: usize,
996 size: usize,
997) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
998 try_stream! {
999 let mut stream = res.into_stream();
1000 let mut buf = Vec::with_capacity(CHUNK_SIZE as usize * 2);
1002 let mut idx = start_idx;
1003 let mut remaining = size;
1004
1005 while let Some(data) = stream.next().await {
1006 let data = data?;
1007 if remaining == 0 {
1008 break;
1010 }
1011 buf.extend_from_slice(&data);
1012
1013 while remaining > 0 && buf.len() >= CHUNK_SIZE as usize {
1014 let mut chunk = buf.drain(..CHUNK_SIZE as usize).collect::<Vec<u8>>();
1015
1016 let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
1017 store: STORE_NAME,
1018 source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
1019 })?;
1020
1021 let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
1022 decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut chunk, tag, &location)?;
1023 if idx == start_idx && start_offset > 0 {
1025 chunk.drain(..start_offset);
1026 }
1027
1028 if chunk.len() > remaining {
1029 chunk.truncate(remaining);
1030 }
1031
1032 remaining = remaining.saturating_sub(chunk.len());
1033 yield bytes::Bytes::from(chunk);
1034
1035 idx += 1;
1036 if remaining == 0 {
1037 return;
1039 }
1040 }
1041 }
1042
1043 if remaining > 0 && !buf.is_empty() {
1044 let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
1045 store: STORE_NAME,
1046 source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
1047 })?;
1048 let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
1049 decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut buf, tag, &location)?;
1050 if idx == start_idx && start_offset > 0 {
1051 buf.drain(..start_offset);
1052 }
1053
1054 buf.truncate(remaining);
1055 yield bytes::Bytes::from(buf);
1056 }
1057 }.boxed()
1058}
1059
1060pub fn from_error(err: Error) -> object_store::Error {
1065 match err {
1066 Error::Generic { error } => object_store::Error::Generic {
1067 store: STORE_NAME,
1068 source: error.into(),
1069 },
1070 Error::NotFound { ref path } => object_store::Error::NotFound {
1071 path: path.clone(),
1072 source: Box::new(err),
1073 },
1074 Error::InvalidPath { path } => object_store::Error::InvalidPath {
1075 source: object_store::path::Error::InvalidPath { path: path.into() },
1076 },
1077 Error::NotSupported { error } => object_store::Error::NotSupported {
1078 source: error.into(),
1079 },
1080 Error::AlreadyExists { ref path } => object_store::Error::AlreadyExists {
1081 path: path.clone(),
1082 source: err.into(),
1083 },
1084 Error::Precondition { path, error } => object_store::Error::Precondition {
1085 path,
1086 source: error.into(),
1087 },
1088 Error::NotModified { path, error } => object_store::Error::NotModified {
1089 path,
1090 source: error.into(),
1091 },
1092 Error::NotImplemented => object_store::Error::NotImplemented,
1093 Error::PermissionDenied { path, error } => object_store::Error::Precondition {
1094 path,
1095 source: error.into(),
1096 },
1097 Error::Unauthenticated { path, error } => object_store::Error::Precondition {
1098 path,
1099 source: error.into(),
1100 },
1101 Error::UnknownConfigurationKey { key } => object_store::Error::UnknownConfigurationKey {
1102 store: STORE_NAME,
1103 key,
1104 },
1105 _ => object_store::Error::Generic {
1106 store: STORE_NAME,
1107 source: Box::new(err),
1108 },
1109 }
1110}
1111
1112pub fn from_object_meta(val: ObjectMeta) -> object_store::ObjectMeta {
1120 object_store::ObjectMeta {
1121 location: Path::parse(val.location).unwrap(),
1122 last_modified: DateTime::from_timestamp_millis(val.last_modified as i64)
1123 .expect("invalid timestamp"),
1124 size: val.size,
1125 e_tag: val.e_tag,
1126 version: val.version,
1127 }
1128}
1129
1130pub fn to_get_range(val: object_store::GetRange) -> GetRange {
1138 match val {
1139 object_store::GetRange::Bounded(v) => GetRange::Bounded(v.start, v.end),
1140 object_store::GetRange::Offset(v) => GetRange::Offset(v),
1141 object_store::GetRange::Suffix(v) => GetRange::Suffix(v),
1142 }
1143}
1144
1145pub fn from_attribute(val: Attribute) -> object_store::Attribute {
1150 match val {
1151 Attribute::ContentDisposition => object_store::Attribute::ContentDisposition,
1152 Attribute::ContentEncoding => object_store::Attribute::ContentEncoding,
1153 Attribute::ContentLanguage => object_store::Attribute::ContentLanguage,
1154 Attribute::ContentType => object_store::Attribute::ContentType,
1155 Attribute::CacheControl => object_store::Attribute::CacheControl,
1156 Attribute::Metadata(v) => object_store::Attribute::Metadata(v.into()),
1157 }
1158}
1159
1160pub fn to_attribute(val: &object_store::Attribute) -> Attribute {
1168 match val {
1169 object_store::Attribute::ContentDisposition => Attribute::ContentDisposition,
1170 object_store::Attribute::ContentEncoding => Attribute::ContentEncoding,
1171 object_store::Attribute::ContentLanguage => Attribute::ContentLanguage,
1172 object_store::Attribute::ContentType => Attribute::ContentType,
1173 object_store::Attribute::CacheControl => Attribute::CacheControl,
1174 object_store::Attribute::Metadata(v) => Attribute::Metadata(v.to_string()),
1175 _ => panic!("unexpected attribute"),
1176 }
1177}
1178
1179pub fn to_put_options(opts: &object_store::PutOptions) -> PutOptions {
1184 let mode: PutMode = match opts.mode {
1185 object_store::PutMode::Overwrite => PutMode::Overwrite,
1186 object_store::PutMode::Create => PutMode::Create,
1187 object_store::PutMode::Update(ref v) => PutMode::Update(UpdateVersion {
1188 e_tag: v.e_tag.clone(),
1189 version: v.version.clone(),
1190 }),
1191 };
1192 PutOptions {
1193 mode,
1194 tags: opts.tags.encoded().to_string(),
1195 attributes: opts
1196 .attributes
1197 .iter()
1198 .map(|(k, v)| (to_attribute(k), v.to_string()))
1199 .collect(),
1200 ..Default::default()
1201 }
1202}
1203
1204fn ranges_is_valid(ranges: &[Range<u64>], len: u64) -> object_store::Result<()> {
1205 for range in ranges {
1206 if range.start >= len {
1207 return Err(object_store::Error::Generic {
1208 store: STORE_NAME,
1209 source: format!("start {} is larger than length {}", range.start, len).into(),
1210 });
1211 }
1212 if range.end <= range.start {
1213 return Err(object_store::Error::Generic {
1214 store: STORE_NAME,
1215 source: format!("end {} is less than start {}", range.end, range.start).into(),
1216 });
1217 }
1218 if range.end > len {
1219 return Err(object_store::Error::Generic {
1220 store: STORE_NAME,
1221 source: format!("end {} is larger than length {}", range.end, len).into(),
1222 });
1223 }
1224 }
1225 Ok(())
1226}
1227
1228fn derive_gcm_nonce(base: &[u8; 12], idx: u64) -> [u8; 12] {
1230 let mut nonce = *base;
1231 let mut ctr = [0u8; 8];
1232 ctr.copy_from_slice(&nonce[4..12]);
1233 let c = u64::from_le_bytes(ctr).wrapping_add(idx);
1234 nonce[4..12].copy_from_slice(&c.to_le_bytes());
1235 nonce
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240 use super::*;
1241 use crate::agent::build_agent;
1242 use ic_agent::{identity::BasicIdentity, Identity};
1243 use ic_cose_types::cose::sha3_256;
1244 use object_store::integration::*;
1245
1246 #[tokio::test(flavor = "current_thread")]
1247 #[ignore]
1248 async fn test_client() {
1249 let secret = [8u8; 32];
1250 let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
1251 let id = BasicIdentity::from_raw_key(&secret);
1252 println!("id: {:?}", id.sender().unwrap().to_text());
1253 let agent = build_agent("http://localhost:4943", Arc::new(id))
1256 .await
1257 .unwrap();
1258 let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
1259 let oc = ObjectStoreClient::new(cli.clone());
1260
1261 let path = Path::from("test/hello.txt");
1262 let payload = "Hello Anda!".as_bytes().to_vec();
1263 let res = oc
1264 .put_opts(&path, payload.clone().into(), Default::default())
1265 .await
1266 .unwrap();
1267 println!("put result: {:?}", res);
1268
1269 let res = oc.get_opts(&path, Default::default()).await.unwrap();
1270 assert_eq!(res.meta.size as usize, payload.len());
1271 let res = res.bytes().await.unwrap();
1272 assert_eq!(res.to_vec(), payload);
1273
1274 let res = cli.get_opts(&path, Default::default()).await.unwrap();
1275 assert_eq!(res.meta.size as usize, payload.len());
1276 assert_ne!(&res.payload, &payload);
1277 let aes_nonce = res.meta.aes_nonce.unwrap();
1278 assert_eq!(aes_nonce.len(), 12);
1279 let aes_tags = res.meta.aes_tags.unwrap();
1280 assert_eq!(aes_tags.len(), 1);
1281
1282 let now = chrono::Utc::now();
1283 let path = Path::from(format!("test/{}.bin", now.timestamp_millis()));
1284 let count = 20000u64;
1285 let len = count * 32;
1286 let mut payload = Vec::with_capacity(len as usize);
1287 {
1288 let mut uploder = oc
1289 .put_multipart_opts(&path, Default::default())
1290 .await
1291 .unwrap();
1292
1293 for i in 0..count {
1294 let data = sha3_256(&i.to_be_bytes()).to_vec();
1295 payload.extend_from_slice(&data);
1296 uploder
1297 .put_part(object_store::PutPayload::from(data))
1298 .await
1299 .unwrap();
1300 }
1301
1302 uploder.complete().await.unwrap();
1303 }
1304 let res = oc.get_opts(&path, Default::default()).await.unwrap();
1305 assert_eq!(res.meta.size as usize, payload.len());
1306 let res = res.bytes().await.unwrap();
1307 assert_eq!(res.to_vec(), payload);
1308
1309 let res = cli.get_opts(&path, Default::default()).await.unwrap();
1310 assert_eq!(res.meta.size as usize, payload.len());
1311 assert_ne!(&res.payload, &payload);
1312 let aes_nonce = res.meta.aes_nonce.unwrap();
1313 assert_eq!(aes_nonce.len(), 12);
1314 let aes_tags = res.meta.aes_tags.unwrap();
1315 assert_eq!(aes_tags.len(), len.div_ceil(CHUNK_SIZE) as usize);
1316
1317 let ranges = vec![0u64..1000, 100..100000, len - CHUNK_SIZE - 1..len];
1318
1319 let rt = oc.get_ranges(&path, &ranges).await.unwrap();
1320 assert_eq!(rt.len(), ranges.len());
1321
1322 for (i, Range { start, end }) in ranges.into_iter().enumerate() {
1323 let res = oc
1324 .get_opts(
1325 &path,
1326 object_store::GetOptions {
1327 range: Some(object_store::GetRange::Bounded(start..end)),
1328 ..Default::default()
1329 },
1330 )
1331 .await
1332 .unwrap();
1333 assert_eq!(res.meta.location, path);
1334 assert_eq!(res.meta.size as usize, payload.len());
1335 let data = res.bytes().await.unwrap();
1336 assert_eq!(rt[i].len(), data.len());
1337 assert_eq!(&data, &payload[start as usize..end as usize]);
1338 }
1339 }
1340
1341 const NON_EXISTENT_NAME: &str = "nonexistentname";
1342
1343 #[tokio::test]
1344 #[ignore]
1345 async fn integration_test() {
1346 let secret = [8u8; 32];
1349 let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
1350 let id = BasicIdentity::from_raw_key(&secret);
1351 println!("id: {:?}", id.sender().unwrap().to_text());
1352 let agent = build_agent("http://localhost:4943", Arc::new(id))
1360 .await
1361 .unwrap();
1362 let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
1363 let storage = ObjectStoreClient::new(cli.clone());
1364
1365 let location = Path::from(NON_EXISTENT_NAME);
1366
1367 let err = get_nonexistent_object(&storage, Some(location))
1368 .await
1369 .unwrap_err();
1370 if let object_store::Error::NotFound { path, .. } = err {
1371 assert!(path.ends_with(NON_EXISTENT_NAME));
1372 } else {
1373 panic!("unexpected error type: {err:?}");
1374 }
1375
1376 put_get_delete_list(&storage).await;
1377 put_get_attributes(&storage).await;
1378 get_opts(&storage).await;
1379 put_opts(&storage, true).await;
1380 list_uses_directories_correctly(&storage).await;
1381 list_with_delimiter(&storage).await;
1382 rename_and_copy(&storage).await;
1383 copy_if_not_exists(&storage).await;
1384 copy_rename_nonexistent_object(&storage).await;
1385 multipart_out_of_order(&storage).await;
1387
1388 let objs = storage.list(None).collect::<Vec<_>>().await;
1389 for obj in objs {
1390 let obj = obj.unwrap();
1391 storage
1392 .delete(&obj.location)
1393 .await
1394 .expect("failed to delete object");
1395 }
1396 stream_get(&storage).await;
1397 }
1398}