ic_object_store/
client.rs

1use aes_gcm::{aes::cipher::consts::U12, AeadInPlace, Aes256Gcm, Key, Nonce, Tag};
2use async_stream::try_stream;
3use async_trait::async_trait;
4use candid::{
5    utils::{encode_args, ArgumentEncoder},
6    CandidType, Decode, Principal,
7};
8use chrono::DateTime;
9use futures::{stream::BoxStream, StreamExt};
10use ic_agent::Agent;
11use ic_cose_types::{BoxError, CanisterCaller};
12use ic_oss_types::{format_error, object_store::*};
13use serde_bytes::{ByteArray, ByteBuf, Bytes};
14use std::{collections::BTreeSet, ops::Range, sync::Arc};
15
16pub use object_store::{self, path::Path, DynObjectStore, MultipartUpload, ObjectStore};
17
18use crate::rand_bytes;
19
20pub static STORE_NAME: &str = "ICObjectStore";
21
22/// Client for interacting with the IC Object Store canister.
23///
24/// Handles communication with the canister and optional AES-256 encryption.
25///
26/// # Fields
27/// - `agent`: IC agent for making calls to the canister
28/// - `canister`: Principal of the target canister
29/// - `cipher`: Optional AES-256-GCM cipher for encryption/decryption
30#[derive(Clone)]
31pub struct Client {
32    agent: Arc<Agent>,
33    canister: Principal,
34    cipher: Option<Arc<Aes256Gcm>>,
35}
36
37impl std::fmt::Debug for Client {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}:Client({})", STORE_NAME, self.canister)
40    }
41}
42
43impl Client {
44    /// Creates a new Client instance with optional AES-256 encryption
45    pub fn new(agent: Arc<Agent>, canister: Principal, aes_secret: Option<[u8; 32]>) -> Client {
46        use aes_gcm::KeyInit;
47
48        let cipher = aes_secret.map(|secret| {
49            let key = Key::<Aes256Gcm>::from(secret);
50            Arc::new(Aes256Gcm::new(&key))
51        });
52
53        Client {
54            agent,
55            canister,
56            cipher,
57        }
58    }
59}
60
61impl ObjectStoreSDK for Client {
62    fn canister(&self) -> &Principal {
63        &self.canister
64    }
65
66    fn cipher(&self) -> Option<Arc<Aes256Gcm>> {
67        self.cipher.clone()
68    }
69}
70
71impl CanisterCaller for Client {
72    async fn canister_query<
73        In: ArgumentEncoder + Send,
74        Out: CandidType + for<'a> candid::Deserialize<'a>,
75    >(
76        &self,
77        canister: &Principal,
78        method: &str,
79        args: In,
80    ) -> Result<Out, BoxError> {
81        let input = encode_args(args)?;
82        let res = self
83            .agent
84            .query(canister, method)
85            .with_arg(input)
86            .call()
87            .await?;
88        let output = Decode!(res.as_slice(), Out)?;
89        Ok(output)
90    }
91
92    async fn canister_update<
93        In: ArgumentEncoder + Send,
94        Out: CandidType + for<'a> candid::Deserialize<'a>,
95    >(
96        &self,
97        canister: &Principal,
98        method: &str,
99        args: In,
100    ) -> Result<Out, BoxError> {
101        let input = encode_args(args)?;
102        let res = self
103            .agent
104            .update(canister, method)
105            .with_arg(input)
106            .call_and_wait()
107            .await?;
108        let output = Decode!(res.as_slice(), Out)?;
109        Ok(output)
110    }
111}
112
113#[async_trait]
114pub trait ObjectStoreSDK: CanisterCaller + Sized {
115    fn canister(&self) -> &Principal;
116    fn cipher(&self) -> Option<Arc<Aes256Gcm>>;
117
118    /// Retrieves the current state of the object store
119    async fn get_state(&self) -> Result<StateInfo, String> {
120        self.canister_query(self.canister(), "get_state", ())
121            .await
122            .map_err(format_error)?
123    }
124
125    async fn is_member(&self, member_kind: &str, user: &Principal) -> Result<bool, String> {
126        self.canister_query(self.canister(), "is_member", (member_kind, user))
127            .await
128            .map_err(format_error)?
129    }
130
131    /// Adds managers to the canister (requires controller privileges)
132    async fn admin_add_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
133        self.canister_update(self.canister(), "admin_add_managers", (args,))
134            .await
135            .map_err(format_error)?
136    }
137
138    /// Removes managers from the canister (requires controller privileges)
139    async fn admin_remove_managers(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
140        self.canister_update(self.canister(), "admin_remove_managers", (args,))
141            .await
142            .map_err(format_error)?
143    }
144
145    /// Adds auditors to the canister (requires controller privileges)
146    async fn admin_add_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
147        self.canister_update(self.canister(), "admin_add_auditors", (args,))
148            .await
149            .map_err(format_error)?
150    }
151
152    /// Removes auditors from the canister (requires controller privileges)
153    async fn admin_remove_auditors(&self, args: &BTreeSet<Principal>) -> Result<(), String> {
154        self.canister_update(self.canister(), "admin_remove_auditors", (args,))
155            .await
156            .map_err(format_error)?
157    }
158
159    /// Stores data at specified path with options
160    async fn put_opts(&self, path: &Path, payload: &Bytes, opts: PutOptions) -> Result<PutResult> {
161        if payload.len() > MAX_PAYLOAD_SIZE as usize {
162            return Err(Error::Precondition {
163                path: path.as_ref().to_string(),
164                error: format!(
165                    "payload size {} exceeds max size {}",
166                    payload.len(),
167                    MAX_PAYLOAD_SIZE
168                ),
169            });
170        }
171
172        self.canister_update(self.canister(), "put_opts", (path.as_ref(), payload, opts))
173            .await
174            .map_err(|error| Error::Generic {
175                error: format_error(error),
176            })?
177    }
178
179    /// Deletes data at specified path
180    async fn delete(&self, path: &Path) -> Result<()> {
181        self.canister_update(self.canister(), "delete", (path.as_ref(),))
182            .await
183            .map_err(|error| Error::Generic {
184                error: format_error(error),
185            })?
186    }
187
188    /// Copies data from one path to another
189    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
190        self.canister_update(self.canister(), "copy", (from.as_ref(), to.as_ref()))
191            .await
192            .map_err(|error| Error::Generic {
193                error: format_error(error),
194            })?
195    }
196
197    /// Copies data only if destination doesn't exist
198    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
199        self.canister_update(
200            self.canister(),
201            "copy_if_not_exists",
202            (from.as_ref(), to.as_ref()),
203        )
204        .await
205        .map_err(|error| Error::Generic {
206            error: format_error(error),
207        })?
208    }
209
210    /// Renames/moves data from one path to another
211    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
212        self.canister_update(self.canister(), "rename", (from.as_ref(), to.as_ref()))
213            .await
214            .map_err(|error| Error::Generic {
215                error: format_error(error),
216            })?
217    }
218
219    /// Renames/moves data only if destination doesn't exist
220    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
221        self.canister_update(
222            self.canister(),
223            "rename_if_not_exists",
224            (from.as_ref(), to.as_ref()),
225        )
226        .await
227        .map_err(|error| Error::Generic {
228            error: format_error(error),
229        })?
230    }
231
232    /// Initiates a multipart upload
233    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
234        self.canister_update(self.canister(), "create_multipart", (path.as_ref(),))
235            .await
236            .map_err(|error| Error::Generic {
237                error: format_error(error),
238            })?
239    }
240
241    /// Uploads a part in a multipart upload
242    async fn put_part(
243        &self,
244        path: &Path,
245        id: &MultipartId,
246        part_idx: u64,
247        payload: &Bytes,
248    ) -> Result<PartId> {
249        self.canister_update(
250            self.canister(),
251            "put_part",
252            (path.as_ref(), id, part_idx, payload),
253        )
254        .await
255        .map_err(|error| Error::Generic {
256            error: format_error(error),
257        })?
258    }
259
260    /// Completes a multipart upload
261    async fn complete_multipart(
262        &self,
263        path: &Path,
264        id: &MultipartId,
265        opts: &PutMultipartOptions,
266    ) -> Result<PutResult> {
267        self.canister_update(
268            self.canister(),
269            "complete_multipart",
270            (path.as_ref(), id, opts),
271        )
272        .await
273        .map_err(|error| Error::Generic {
274            error: format_error(error),
275        })?
276    }
277
278    /// Aborts a multipart upload
279    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
280        self.canister_update(self.canister(), "abort_multipart", (path.as_ref(), id))
281            .await
282            .map_err(|error| Error::Generic {
283                error: format_error(error),
284            })?
285    }
286
287    /// Retrieves a specific part of data
288    async fn get_part(&self, path: &Path, part_idx: u64) -> Result<ByteBuf> {
289        self.canister_query(self.canister(), "get_part", (path.as_ref(), part_idx))
290            .await
291            .map_err(|error| Error::Generic {
292                error: format_error(error),
293            })?
294    }
295
296    /// Retrieves data with options (range, if_match, etc.)
297    async fn get_opts(&self, path: &Path, opts: GetOptions) -> Result<GetResult> {
298        self.canister_query(self.canister(), "get_opts", (path.as_ref(), opts))
299            .await
300            .map_err(|error| Error::Generic {
301                error: format_error(error),
302            })?
303    }
304
305    /// Retrieves multiple ranges of data
306    async fn get_ranges(&self, path: &Path, ranges: &[(u64, u64)]) -> Result<Vec<ByteBuf>> {
307        if ranges.is_empty() {
308            return Ok(Vec::new());
309        }
310
311        self.canister_query(self.canister(), "get_ranges", (path.as_ref(), ranges))
312            .await
313            .map_err(|error| Error::Generic {
314                error: format_error(error),
315            })?
316    }
317
318    /// Retrieves metadata for a path
319    async fn head(&self, path: &Path) -> Result<ObjectMeta> {
320        self.canister_query(self.canister(), "head", (path.as_ref(),))
321            .await
322            .map_err(|error| Error::Generic {
323                error: format_error(error),
324            })?
325    }
326
327    /// Lists objects under a prefix
328    async fn list(&self, prefix: Option<&Path>) -> Result<Vec<ObjectMeta>> {
329        self.canister_query(self.canister(), "list", (prefix.map(|p| p.as_ref()),))
330            .await
331            .map_err(|error| Error::Generic {
332                error: format_error(error),
333            })?
334    }
335
336    /// Lists objects with an offset
337    async fn list_with_offset(
338        &self,
339        prefix: Option<&Path>,
340        offset: &Path,
341    ) -> Result<Vec<ObjectMeta>> {
342        self.canister_query(
343            self.canister(),
344            "list_with_offset",
345            (prefix.map(|p| p.as_ref()), offset.as_ref()),
346        )
347        .await
348        .map_err(|error| Error::Generic {
349            error: format_error(error),
350        })?
351    }
352
353    /// Lists objects with directory delimiter
354    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
355        self.canister_query(
356            self.canister(),
357            "list_with_delimiter",
358            (prefix.map(|p| p.as_ref()),),
359        )
360        .await
361        .map_err(|error| Error::Generic {
362            error: format_error(error),
363        })?
364    }
365}
366
367/// Handles multipart upload operations
368#[derive(Debug)]
369pub struct MultipartUploader {
370    part_idx: u64,
371    parts_cache: Vec<u8>,
372    opts: PutMultipartOptions,
373    state: Arc<UploadState>,
374}
375
376/// Internal state for tracking upload progress
377struct UploadState {
378    client: Arc<Client>,
379    path: Path,
380    id: MultipartId,
381}
382
383impl std::fmt::Debug for UploadState {
384    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385        write!(f, "{}:UploadState({}, {})", STORE_NAME, self.path, self.id)
386    }
387}
388
389#[async_trait]
390impl MultipartUpload for MultipartUploader {
391    /// Adds a part to the upload, buffering until chunk size is reached
392    fn put_part(&mut self, payload: object_store::PutPayload) -> object_store::UploadPart {
393        let payload = bytes::Bytes::from(payload);
394        self.parts_cache.extend_from_slice(&payload);
395        if self.parts_cache.len() < CHUNK_SIZE as usize {
396            return Box::pin(futures::future::ready(Ok(())));
397        }
398
399        let mut parts: Vec<object_store::UploadPart> = Vec::new();
400        while self.parts_cache.len() >= CHUNK_SIZE as usize {
401            let state = self.state.clone();
402            let mut chunk = self
403                .parts_cache
404                .drain(..CHUNK_SIZE as usize)
405                .collect::<Vec<u8>>();
406
407            if let Some(cipher) = &self.state.client.cipher {
408                let nonce = derive_gcm_nonce(
409                    self.opts.aes_nonce.as_ref().as_ref().unwrap(),
410                    self.part_idx,
411                );
412                match encrypt_chunk(cipher, Nonce::from_slice(&nonce), &mut chunk, &state.path) {
413                    Ok(tag) => {
414                        self.opts.aes_tags.as_mut().unwrap().push(tag);
415                    }
416                    Err(err) => {
417                        return Box::pin(futures::future::ready(Err(err)));
418                    }
419                }
420            }
421
422            let part_idx = self.part_idx;
423            self.part_idx += 1;
424            parts.push(Box::pin(async move {
425                let _ = state
426                    .client
427                    .put_part(&state.path, &state.id, part_idx, Bytes::new(&chunk))
428                    .await
429                    .map_err(from_error)?;
430                Ok(())
431            }))
432        }
433
434        Box::pin(async move {
435            for part in parts {
436                part.await?;
437            }
438
439            Ok(())
440        })
441    }
442
443    /// Finalizes the multipart upload and returns result
444    async fn complete(&mut self) -> object_store::Result<object_store::PutResult> {
445        for part in self.parts_cache.chunks_mut(CHUNK_SIZE as usize) {
446            let part_idx = self.part_idx;
447            self.part_idx += 1;
448
449            if let Some(cipher) = &self.state.client.cipher {
450                let nonce =
451                    derive_gcm_nonce(self.opts.aes_nonce.as_ref().as_ref().unwrap(), part_idx);
452                match encrypt_chunk(cipher, Nonce::from_slice(&nonce), part, &self.state.path) {
453                    Ok(tag) => {
454                        self.opts.aes_tags.as_mut().unwrap().push(tag);
455                    }
456                    Err(err) => {
457                        return Err(err);
458                    }
459                }
460            }
461
462            let _ = self
463                .state
464                .client
465                .put_part(&self.state.path, &self.state.id, part_idx, Bytes::new(part))
466                .await
467                .map_err(from_error)?;
468        }
469
470        self.parts_cache.clear();
471        let res = self
472            .state
473            .client
474            .complete_multipart(&self.state.path, &self.state.id, &self.opts)
475            .await
476            .map_err(from_error)?;
477        Ok(object_store::PutResult {
478            e_tag: res.e_tag,
479            version: res.version,
480        })
481    }
482
483    /// Aborts the multipart upload and cleans up resources
484    async fn abort(&mut self) -> object_store::Result<()> {
485        self.state
486            .client
487            .abort_multipart(&self.state.path, &self.state.id)
488            .await
489            .map_err(from_error)
490    }
491}
492
493/// Main client for interacting with the object store
494#[derive(Clone)]
495pub struct ObjectStoreClient {
496    client: Arc<Client>,
497}
498
499impl ObjectStoreClient {
500    pub fn new(client: Arc<Client>) -> ObjectStoreClient {
501        ObjectStoreClient { client }
502    }
503
504    pub async fn get_state(&self) -> Result<StateInfo, String> {
505        self.client.get_state().await
506    }
507
508    async fn get_opts_inner(
509        &self,
510        path: &Path,
511        opts: object_store::GetOptions,
512    ) -> object_store::Result<object_store::GetResult> {
513        let options = GetOptions {
514            if_match: opts.if_match,
515            if_none_match: opts.if_none_match,
516            if_modified_since: opts.if_modified_since.map(|v| v.timestamp_millis() as u64),
517            if_unmodified_since: opts
518                .if_unmodified_since
519                .map(|v| v.timestamp_millis() as u64),
520            range: opts.range.clone().map(to_get_range),
521            version: opts.version,
522            head: opts.head,
523        };
524
525        let res: GetResult = self
526            .client
527            .get_opts(path, options)
528            .await
529            .map_err(from_error)?;
530
531        // 请求的 range
532        let rr = if let Some(r) = &opts.range {
533            r.as_range(res.meta.size)
534                .map_err(|err| object_store::Error::Generic {
535                    store: STORE_NAME,
536                    source: err.into(),
537                })?
538        } else {
539            0..res.meta.size
540        };
541        // 第一次请求返回的 range
542        let range = res.range.0..res.range.1;
543        let meta = from_object_meta(res.meta);
544        let attributes: object_store::Attributes = res
545            .attributes
546            .into_iter()
547            .map(|(k, v)| (from_attribute(k), v))
548            .collect();
549        let data = bytes::Bytes::from(res.payload.into_vec());
550        if opts.head || rr == range {
551            let stream = futures::stream::once(futures::future::ready(Ok(data)));
552            return Ok(object_store::GetResult {
553                payload: object_store::GetResultPayload::Stream(stream.boxed()),
554                meta,
555                range,
556                attributes,
557            });
558        }
559
560        let stream =
561            create_get_range_stream(self.client.clone(), path.clone(), rr.clone(), range, data);
562        Ok(object_store::GetResult {
563            payload: object_store::GetResultPayload::Stream(stream),
564            meta,
565            range: rr,
566            attributes,
567        })
568    }
569}
570
571impl std::fmt::Display for ObjectStoreClient {
572    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573        write!(f, "{}:ObjectStoreClient", STORE_NAME)
574    }
575}
576
577impl std::fmt::Debug for ObjectStoreClient {
578    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579        write!(f, "{}:ObjectStoreClient", STORE_NAME)
580    }
581}
582
583#[async_trait]
584impl ObjectStore for ObjectStoreClient {
585    /// Uploads an object with options
586    async fn put_opts(
587        &self,
588        path: &Path,
589        payload: object_store::PutPayload,
590        opts: object_store::PutOptions,
591    ) -> object_store::Result<object_store::PutResult> {
592        let data = bytes::Bytes::from(payload);
593        let mut opts = to_put_options(&opts);
594        let payload: Vec<u8> = if let Some(cipher) = &self.client.cipher {
595            let base_nonce: [u8; 12] = rand_bytes();
596            let mut data: Vec<u8> = data.into();
597            let mut aes_tags: Vec<ByteArray<16>> = Vec::new();
598            for (i, chunk) in data.chunks_mut(CHUNK_SIZE as usize).enumerate() {
599                let nonce = derive_gcm_nonce(&base_nonce, i as u64);
600                let tag = encrypt_chunk(cipher, Nonce::from_slice(&nonce), chunk, path)?;
601                aes_tags.push(tag);
602            }
603            opts.aes_nonce = Some(base_nonce.into());
604            opts.aes_tags = Some(aes_tags);
605            data
606        } else {
607            data.into()
608        };
609
610        let res = self
611            .client
612            .put_opts(path, Bytes::new(&payload), opts)
613            .await
614            .map_err(from_error)?;
615        Ok(object_store::PutResult {
616            e_tag: res.e_tag,
617            version: res.version,
618        })
619    }
620
621    /// Initiates a multipart upload with options
622    async fn put_multipart_opts(
623        &self,
624        path: &Path,
625        opts: object_store::PutMultipartOptions,
626    ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
627        let upload_id = self
628            .client
629            .create_multipart(path)
630            .await
631            .map_err(from_error)?;
632        let mut opts = PutMultipartOptions {
633            tags: opts.tags.encoded().to_string(),
634            attributes: opts
635                .attributes
636                .iter()
637                .map(|(k, v)| (to_attribute(k), v.to_string()))
638                .collect(),
639            ..Default::default()
640        };
641
642        if self.client.cipher.is_some() {
643            opts.aes_nonce = Some(rand_bytes().into());
644            opts.aes_tags = Some(Vec::new());
645        }
646
647        Ok(Box::new(MultipartUploader {
648            part_idx: 0,
649            parts_cache: Vec::new(),
650            opts,
651            state: Arc::new(UploadState {
652                client: self.client.clone(),
653                path: path.clone(),
654                id: upload_id,
655            }),
656        }))
657    }
658
659    async fn get_opts(
660        &self,
661        location: &Path,
662        mut opts: object_store::GetOptions,
663    ) -> object_store::Result<object_store::GetResult> {
664        if let Some(cipher) = self.client.cipher() {
665            let meta = self.client.head(location).await.map_err(from_error)?;
666
667            // 原始 range
668            let range = if let Some(r) = &opts.range {
669                r.as_range(meta.size)
670                    .map_err(|err| object_store::Error::Generic {
671                        store: STORE_NAME,
672                        source: err.into(),
673                    })?
674            } else {
675                0..meta.size
676            };
677
678            // 调整 range,确保读取到包含原始 range 的完整的 chunks,用于解密
679            let rr = (range.start / CHUNK_SIZE) * CHUNK_SIZE
680                ..meta
681                    .size
682                    .min((1 + range.end.saturating_sub(1) / CHUNK_SIZE) * CHUNK_SIZE);
683
684            if rr.end > rr.start {
685                opts.range = Some(object_store::GetRange::Bounded(rr.clone()));
686            }
687
688            let res = self.get_opts_inner(location, opts).await?;
689            let obj = res.meta.clone();
690
691            let attributes = res.attributes.clone();
692            let start_idx = rr.start / CHUNK_SIZE;
693            let start_offset = (range.start - rr.start) as usize;
694            let size = (range.end - range.start) as usize;
695
696            let stream = create_decryption_stream(
697                res,
698                cipher,
699                meta.aes_tags.unwrap(),
700                *meta.aes_nonce.unwrap(),
701                location.clone(),
702                start_idx as usize,
703                start_offset,
704                size,
705            );
706
707            return Ok(object_store::GetResult {
708                payload: object_store::GetResultPayload::Stream(stream),
709                meta: obj,
710                range,
711                attributes,
712            });
713        }
714
715        self.get_opts_inner(location, opts).await
716    }
717
718    /// Retrieves a byte range from an object
719    async fn get_range(
720        &self,
721        path: &Path,
722        range: Range<u64>,
723    ) -> object_store::Result<bytes::Bytes> {
724        #[allow(clippy::single_range_in_vec_init)]
725        let mut res = self.get_ranges(path, &[range.start..range.end]).await?;
726        res.pop().ok_or_else(|| object_store::Error::NotFound {
727            path: path.as_ref().to_string(),
728            source: "get_ranges result should not be empty".into(),
729        })
730    }
731
732    /// Retrieves multiple byte ranges from an object
733    async fn get_ranges(
734        &self,
735        location: &Path,
736        ranges: &[Range<u64>],
737    ) -> object_store::Result<Vec<bytes::Bytes>> {
738        if ranges.is_empty() {
739            return Ok(Vec::new());
740        }
741
742        if let Some(cipher) = self.client.cipher() {
743            let meta = self.client.head(location).await.map_err(from_error)?;
744            ranges_is_valid(ranges, meta.size)?;
745            let aes_tags = meta.aes_tags.ok_or_else(|| object_store::Error::Generic {
746                store: STORE_NAME,
747                source: format!("missing AES256 tags for path {location} for ranges {ranges:?}")
748                    .into(),
749            })?;
750            let base_nonce = meta.aes_nonce.ok_or_else(|| object_store::Error::Generic {
751                store: STORE_NAME,
752                source: format!("missing AES256 nonce for path {location}").into(),
753            })?;
754
755            let mut result: Vec<bytes::Bytes> = Vec::with_capacity(ranges.len());
756            let mut chunk_cache: Option<(usize, Vec<u8>)> = None; // cache the last chunk read
757            for &Range { start, end } in ranges {
758                let mut buf = Vec::with_capacity((end - start) as usize);
759                // Calculate the chunk indices we need to read
760                let start_chunk = (start / CHUNK_SIZE) as usize;
761                let end_chunk = ((end - 1) / CHUNK_SIZE) as usize;
762
763                for idx in start_chunk..=end_chunk {
764                    // Calculate the byte range within this chunk
765                    let chunk_start = if idx == start_chunk {
766                        start % CHUNK_SIZE
767                    } else {
768                        0
769                    };
770
771                    let chunk_end = if idx == end_chunk {
772                        (end - 1) % CHUNK_SIZE + 1
773                    } else {
774                        CHUNK_SIZE
775                    };
776
777                    match &chunk_cache {
778                        Some((cached_idx, cached_chunk)) if *cached_idx == idx => {
779                            buf.extend_from_slice(
780                                &cached_chunk[chunk_start as usize..chunk_end as usize],
781                            );
782                        }
783                        _ => {
784                            let tag =
785                                aes_tags
786                                    .get(idx)
787                                    .ok_or_else(|| object_store::Error::Generic {
788                                        store: STORE_NAME,
789                                        source: format!(
790                                    "missing AES256 tag for chunk {idx} for path {location}"
791                                )
792                                        .into(),
793                                    })?;
794                            let chunk = self
795                                .client
796                                .get_part(location, idx as u64)
797                                .await
798                                .map_err(from_error)?;
799                            let mut chunk = chunk.into_vec();
800                            let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
801                            decrypt_chunk(
802                                &cipher,
803                                Nonce::from_slice(&nonce),
804                                &mut chunk,
805                                tag,
806                                location,
807                            )?;
808                            buf.extend_from_slice(&chunk[chunk_start as usize..chunk_end as usize]);
809                            chunk_cache = Some((idx, chunk));
810                        }
811                    }
812                }
813                result.push(buf.into());
814            }
815
816            return Ok(result);
817        }
818
819        let ranges: Vec<(u64, u64)> = ranges.iter().map(|r| (r.start, r.end)).collect();
820        let res = self
821            .client
822            .get_ranges(location, &ranges)
823            .await
824            .map_err(from_error)?;
825
826        Ok(res
827            .into_iter()
828            .map(|v| bytes::Bytes::from(v.into_vec()))
829            .collect())
830    }
831
832    /// Retrieves object metadata
833    async fn head(&self, location: &Path) -> object_store::Result<object_store::ObjectMeta> {
834        let res = self.client.head(location).await.map_err(from_error)?;
835        Ok(from_object_meta(res))
836    }
837
838    /// Deletes an object
839    async fn delete(&self, location: &Path) -> object_store::Result<()> {
840        self.client.delete(location).await.map_err(from_error)
841    }
842
843    /// Lists objects under a prefix
844    fn list(
845        &self,
846        prefix: Option<&Path>,
847    ) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
848        let prefix = prefix.cloned();
849        let client = self.client.clone();
850        try_stream! {
851            let res =  client.list(prefix.as_ref()).await.map_err(from_error)?;
852            for object in res {
853                yield from_object_meta(object);
854            }
855        }
856        .boxed()
857    }
858
859    /// Lists objects starting from an offset
860    fn list_with_offset(
861        &self,
862        prefix: Option<&Path>,
863        offset: &Path,
864    ) -> BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
865        let prefix = prefix.cloned();
866        let offset = offset.clone();
867        let client = self.client.clone();
868        try_stream! {
869            let res = client.list_with_offset(prefix.as_ref(), &offset).await.map_err(from_error)?;
870            for object in res {
871                yield from_object_meta(object);
872            }
873        }
874        .boxed()
875    }
876
877    /// Lists objects with directory delimiter
878    async fn list_with_delimiter(
879        &self,
880        prefix: Option<&Path>,
881    ) -> object_store::Result<object_store::ListResult> {
882        let res = self
883            .client
884            .list_with_delimiter(prefix)
885            .await
886            .map_err(from_error)?;
887
888        Ok(object_store::ListResult {
889            objects: res.objects.into_iter().map(from_object_meta).collect(),
890            common_prefixes: res
891                .common_prefixes
892                .into_iter()
893                .map(|p| Path::parse(p).unwrap())
894                .collect(),
895        })
896    }
897
898    /// Copies an object to a new location
899    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
900        self.client.copy(from, to).await.map_err(from_error)
901    }
902
903    /// Copies an object only if destination doesn't exist
904    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
905        self.client
906            .copy_if_not_exists(from, to)
907            .await
908            .map_err(from_error)
909    }
910
911    /// Renames an object
912    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
913        self.client.rename(from, to).await.map_err(from_error)
914    }
915
916    /// Renames an object only if destination doesn't exist
917    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
918        self.client
919            .rename_if_not_exists(from, to)
920            .await
921            .map_err(from_error)
922    }
923}
924
925fn encrypt_chunk(
926    cipher: &Aes256Gcm,
927    nonce: &Nonce<U12>,
928    chunk: &mut [u8],
929    path: &Path,
930) -> Result<ByteArray<16>, object_store::Error> {
931    let tag = cipher
932        .encrypt_in_place_detached(nonce, &[], chunk)
933        .map_err(|err| object_store::Error::Generic {
934            store: STORE_NAME,
935            source: format!("AES256 encrypt failed for path {path}: {err:?}").into(),
936        })?;
937    let tag: [u8; 16] = tag.into();
938    Ok(tag.into())
939}
940
941fn decrypt_chunk(
942    cipher: &Aes256Gcm,
943    nonce: &Nonce<U12>,
944    chunk: &mut [u8],
945    tag: &ByteArray<16>,
946    path: &Path,
947) -> Result<(), object_store::Error> {
948    cipher
949        .decrypt_in_place_detached(nonce, &[], chunk, Tag::from_slice(tag.as_slice()))
950        .map_err(|err| object_store::Error::Generic {
951            store: STORE_NAME,
952            source: format!("AES256 decrypt failed for path {path}: {err:?}").into(),
953        })
954}
955
956#[allow(clippy::too_many_arguments)]
957fn create_get_range_stream(
958    client: Arc<Client>,
959    location: Path,
960    request_range: Range<u64>,
961    first_range: Range<u64>,
962    first_payload: bytes::Bytes,
963) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
964    try_stream! {
965        yield first_payload;
966
967        // 计算需要请求的剩余范围
968        let mut remaining_ranges = Vec::new();
969        let mut current = first_range.end;
970        while current < request_range.end {
971            let end = (current + CHUNK_SIZE).min(request_range.end);
972            remaining_ranges.push(current..end);
973            current = end;
974        }
975
976        // 批量请求剩余数据
977        for r in remaining_ranges {
978            let res = client.get_ranges(&location, &[(r.start, r.end)]).await.map_err(from_error)?;
979            for data in res {
980                yield bytes::Bytes::from(data.into_vec());
981            }
982        }
983    }
984    .boxed()
985}
986
987#[allow(clippy::too_many_arguments)]
988fn create_decryption_stream(
989    res: object_store::GetResult,
990    cipher: Arc<Aes256Gcm>,
991    aes_tags: Vec<ByteArray<16>>,
992    base_nonce: [u8; 12],
993    location: Path,
994    start_idx: usize,
995    start_offset: usize,
996    size: usize,
997) -> BoxStream<'static, object_store::Result<bytes::Bytes>> {
998    try_stream! {
999        let mut stream = res.into_stream();
1000        // 预分配足够大的缓冲区以减少重新分配次数
1001        let mut buf = Vec::with_capacity(CHUNK_SIZE as usize * 2);
1002        let mut idx = start_idx;
1003        let mut remaining = size;
1004
1005        while let Some(data) = stream.next().await {
1006            let data = data?;
1007            if remaining == 0 {
1008                // 已满足请求大小,提前结束
1009                break;
1010            }
1011            buf.extend_from_slice(&data);
1012
1013            while remaining > 0 && buf.len() >= CHUNK_SIZE as usize {
1014                let mut chunk = buf.drain(..CHUNK_SIZE as usize).collect::<Vec<u8>>();
1015
1016                let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
1017                    store: STORE_NAME,
1018                    source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
1019                })?;
1020
1021                let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
1022                decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut chunk, tag, &location)?;
1023                // 首块去掉起始偏移
1024                if idx == start_idx && start_offset > 0 {
1025                    chunk.drain(..start_offset);
1026                }
1027
1028                if chunk.len() > remaining {
1029                    chunk.truncate(remaining);
1030                }
1031
1032                remaining = remaining.saturating_sub(chunk.len());
1033                yield bytes::Bytes::from(chunk);
1034
1035                idx += 1;
1036                if remaining == 0 {
1037                    // 已满足请求大小,提前结束
1038                    return;
1039                }
1040            }
1041        }
1042
1043        if remaining > 0 && !buf.is_empty() {
1044            let tag = aes_tags.get(idx).ok_or_else(|| object_store::Error::Generic {
1045                store: STORE_NAME,
1046                source: format!("missing AES256 tag for chunk {idx} for path {location}").into(),
1047            })?;
1048            let nonce = derive_gcm_nonce(&base_nonce, idx as u64);
1049            decrypt_chunk(&cipher, Nonce::from_slice(&nonce), &mut buf, tag, &location)?;
1050            if idx == start_idx && start_offset > 0 {
1051                buf.drain(..start_offset);
1052            }
1053
1054            buf.truncate(remaining);
1055            yield bytes::Bytes::from(buf);
1056        }
1057    }.boxed()
1058}
1059
1060/// Converts custom Error type to object_store::Error
1061///
1062/// Maps each error variant to its corresponding object_store error,
1063/// preserving relevant context like path and error message.
1064pub fn from_error(err: Error) -> object_store::Error {
1065    match err {
1066        Error::Generic { error } => object_store::Error::Generic {
1067            store: STORE_NAME,
1068            source: error.into(),
1069        },
1070        Error::NotFound { ref path } => object_store::Error::NotFound {
1071            path: path.clone(),
1072            source: Box::new(err),
1073        },
1074        Error::InvalidPath { path } => object_store::Error::InvalidPath {
1075            source: object_store::path::Error::InvalidPath { path: path.into() },
1076        },
1077        Error::NotSupported { error } => object_store::Error::NotSupported {
1078            source: error.into(),
1079        },
1080        Error::AlreadyExists { ref path } => object_store::Error::AlreadyExists {
1081            path: path.clone(),
1082            source: err.into(),
1083        },
1084        Error::Precondition { path, error } => object_store::Error::Precondition {
1085            path,
1086            source: error.into(),
1087        },
1088        Error::NotModified { path, error } => object_store::Error::NotModified {
1089            path,
1090            source: error.into(),
1091        },
1092        Error::NotImplemented => object_store::Error::NotImplemented,
1093        Error::PermissionDenied { path, error } => object_store::Error::Precondition {
1094            path,
1095            source: error.into(),
1096        },
1097        Error::Unauthenticated { path, error } => object_store::Error::Precondition {
1098            path,
1099            source: error.into(),
1100        },
1101        Error::UnknownConfigurationKey { key } => object_store::Error::UnknownConfigurationKey {
1102            store: STORE_NAME,
1103            key,
1104        },
1105        _ => object_store::Error::Generic {
1106            store: STORE_NAME,
1107            source: Box::new(err),
1108        },
1109    }
1110}
1111
1112/// Converts internal ObjectMeta to object_store::ObjectMeta
1113///
1114/// # Arguments
1115/// * `val` - The source ObjectMeta to convert
1116///
1117/// # Returns
1118/// Converted object_store::ObjectMeta with equivalent fields
1119pub fn from_object_meta(val: ObjectMeta) -> object_store::ObjectMeta {
1120    object_store::ObjectMeta {
1121        location: Path::parse(val.location).unwrap(),
1122        last_modified: DateTime::from_timestamp_millis(val.last_modified as i64)
1123            .expect("invalid timestamp"),
1124        size: val.size,
1125        e_tag: val.e_tag,
1126        version: val.version,
1127    }
1128}
1129
1130/// Converts object_store::GetRange to internal GetRange format
1131///
1132/// # Arguments
1133/// * `val` - The source GetRange to convert
1134///
1135/// # Returns
1136/// Converted GetRange with equivalent range type and values
1137pub fn to_get_range(val: object_store::GetRange) -> GetRange {
1138    match val {
1139        object_store::GetRange::Bounded(v) => GetRange::Bounded(v.start, v.end),
1140        object_store::GetRange::Offset(v) => GetRange::Offset(v),
1141        object_store::GetRange::Suffix(v) => GetRange::Suffix(v),
1142    }
1143}
1144
1145/// Converts internal Attribute to object_store::Attribute
1146///
1147/// Maps each attribute variant to its corresponding object_store attribute,
1148/// handling metadata conversion as well.
1149pub fn from_attribute(val: Attribute) -> object_store::Attribute {
1150    match val {
1151        Attribute::ContentDisposition => object_store::Attribute::ContentDisposition,
1152        Attribute::ContentEncoding => object_store::Attribute::ContentEncoding,
1153        Attribute::ContentLanguage => object_store::Attribute::ContentLanguage,
1154        Attribute::ContentType => object_store::Attribute::ContentType,
1155        Attribute::CacheControl => object_store::Attribute::CacheControl,
1156        Attribute::Metadata(v) => object_store::Attribute::Metadata(v.into()),
1157    }
1158}
1159
1160/// Converts object_store::Attribute to internal Attribute type
1161///
1162/// Maps standard object store attributes to internal representation,
1163/// handling metadata conversion as well.
1164///
1165/// # Panics
1166/// Will panic if an unexpected attribute variant is encountered
1167pub fn to_attribute(val: &object_store::Attribute) -> Attribute {
1168    match val {
1169        object_store::Attribute::ContentDisposition => Attribute::ContentDisposition,
1170        object_store::Attribute::ContentEncoding => Attribute::ContentEncoding,
1171        object_store::Attribute::ContentLanguage => Attribute::ContentLanguage,
1172        object_store::Attribute::ContentType => Attribute::ContentType,
1173        object_store::Attribute::CacheControl => Attribute::CacheControl,
1174        object_store::Attribute::Metadata(v) => Attribute::Metadata(v.to_string()),
1175        _ => panic!("unexpected attribute"),
1176    }
1177}
1178
1179/// Converts object_store::PutOptions to internal PutOptions format
1180///
1181/// Maps standard object store put options to internal representation,
1182/// handling mode, tags, and attributes conversion.
1183pub fn to_put_options(opts: &object_store::PutOptions) -> PutOptions {
1184    let mode: PutMode = match opts.mode {
1185        object_store::PutMode::Overwrite => PutMode::Overwrite,
1186        object_store::PutMode::Create => PutMode::Create,
1187        object_store::PutMode::Update(ref v) => PutMode::Update(UpdateVersion {
1188            e_tag: v.e_tag.clone(),
1189            version: v.version.clone(),
1190        }),
1191    };
1192    PutOptions {
1193        mode,
1194        tags: opts.tags.encoded().to_string(),
1195        attributes: opts
1196            .attributes
1197            .iter()
1198            .map(|(k, v)| (to_attribute(k), v.to_string()))
1199            .collect(),
1200        ..Default::default()
1201    }
1202}
1203
1204fn ranges_is_valid(ranges: &[Range<u64>], len: u64) -> object_store::Result<()> {
1205    for range in ranges {
1206        if range.start >= len {
1207            return Err(object_store::Error::Generic {
1208                store: STORE_NAME,
1209                source: format!("start {} is larger than length {}", range.start, len).into(),
1210            });
1211        }
1212        if range.end <= range.start {
1213            return Err(object_store::Error::Generic {
1214                store: STORE_NAME,
1215                source: format!("end {} is less than start {}", range.end, range.start).into(),
1216            });
1217        }
1218        if range.end > len {
1219            return Err(object_store::Error::Generic {
1220                store: STORE_NAME,
1221                source: format!("end {} is larger than length {}", range.end, len).into(),
1222            });
1223        }
1224    }
1225    Ok(())
1226}
1227
1228// 为每个分块从基准 nonce 派生唯一的 GCM nonce(后 8 字节作为计数器)
1229fn derive_gcm_nonce(base: &[u8; 12], idx: u64) -> [u8; 12] {
1230    let mut nonce = *base;
1231    let mut ctr = [0u8; 8];
1232    ctr.copy_from_slice(&nonce[4..12]);
1233    let c = u64::from_le_bytes(ctr).wrapping_add(idx);
1234    nonce[4..12].copy_from_slice(&c.to_le_bytes());
1235    nonce
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use super::*;
1241    use crate::agent::build_agent;
1242    use ic_agent::{identity::BasicIdentity, Identity};
1243    use ic_cose_types::cose::sha3_256;
1244    use object_store::integration::*;
1245
1246    #[tokio::test(flavor = "current_thread")]
1247    #[ignore]
1248    async fn test_client() {
1249        let secret = [8u8; 32];
1250        let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
1251        let id = BasicIdentity::from_raw_key(&secret);
1252        println!("id: {:?}", id.sender().unwrap().to_text());
1253        // jjn6g-sh75l-r3cxb-wxrkl-frqld-6p6qq-d4ato-wske5-op7s5-n566f-bqe
1254
1255        let agent = build_agent("http://localhost:4943", Arc::new(id))
1256            .await
1257            .unwrap();
1258        let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
1259        let oc = ObjectStoreClient::new(cli.clone());
1260
1261        let path = Path::from("test/hello.txt");
1262        let payload = "Hello Anda!".as_bytes().to_vec();
1263        let res = oc
1264            .put_opts(&path, payload.clone().into(), Default::default())
1265            .await
1266            .unwrap();
1267        println!("put result: {:?}", res);
1268
1269        let res = oc.get_opts(&path, Default::default()).await.unwrap();
1270        assert_eq!(res.meta.size as usize, payload.len());
1271        let res = res.bytes().await.unwrap();
1272        assert_eq!(res.to_vec(), payload);
1273
1274        let res = cli.get_opts(&path, Default::default()).await.unwrap();
1275        assert_eq!(res.meta.size as usize, payload.len());
1276        assert_ne!(&res.payload, &payload);
1277        let aes_nonce = res.meta.aes_nonce.unwrap();
1278        assert_eq!(aes_nonce.len(), 12);
1279        let aes_tags = res.meta.aes_tags.unwrap();
1280        assert_eq!(aes_tags.len(), 1);
1281
1282        let now = chrono::Utc::now();
1283        let path = Path::from(format!("test/{}.bin", now.timestamp_millis()));
1284        let count = 20000u64;
1285        let len = count * 32;
1286        let mut payload = Vec::with_capacity(len as usize);
1287        {
1288            let mut uploder = oc
1289                .put_multipart_opts(&path, Default::default())
1290                .await
1291                .unwrap();
1292
1293            for i in 0..count {
1294                let data = sha3_256(&i.to_be_bytes()).to_vec();
1295                payload.extend_from_slice(&data);
1296                uploder
1297                    .put_part(object_store::PutPayload::from(data))
1298                    .await
1299                    .unwrap();
1300            }
1301
1302            uploder.complete().await.unwrap();
1303        }
1304        let res = oc.get_opts(&path, Default::default()).await.unwrap();
1305        assert_eq!(res.meta.size as usize, payload.len());
1306        let res = res.bytes().await.unwrap();
1307        assert_eq!(res.to_vec(), payload);
1308
1309        let res = cli.get_opts(&path, Default::default()).await.unwrap();
1310        assert_eq!(res.meta.size as usize, payload.len());
1311        assert_ne!(&res.payload, &payload);
1312        let aes_nonce = res.meta.aes_nonce.unwrap();
1313        assert_eq!(aes_nonce.len(), 12);
1314        let aes_tags = res.meta.aes_tags.unwrap();
1315        assert_eq!(aes_tags.len(), len.div_ceil(CHUNK_SIZE) as usize);
1316
1317        let ranges = vec![0u64..1000, 100..100000, len - CHUNK_SIZE - 1..len];
1318
1319        let rt = oc.get_ranges(&path, &ranges).await.unwrap();
1320        assert_eq!(rt.len(), ranges.len());
1321
1322        for (i, Range { start, end }) in ranges.into_iter().enumerate() {
1323            let res = oc
1324                .get_opts(
1325                    &path,
1326                    object_store::GetOptions {
1327                        range: Some(object_store::GetRange::Bounded(start..end)),
1328                        ..Default::default()
1329                    },
1330                )
1331                .await
1332                .unwrap();
1333            assert_eq!(res.meta.location, path);
1334            assert_eq!(res.meta.size as usize, payload.len());
1335            let data = res.bytes().await.unwrap();
1336            assert_eq!(rt[i].len(), data.len());
1337            assert_eq!(&data, &payload[start as usize..end as usize]);
1338        }
1339    }
1340
1341    const NON_EXISTENT_NAME: &str = "nonexistentname";
1342
1343    #[tokio::test]
1344    #[ignore]
1345    async fn integration_test() {
1346        // Should be run in a clean environment
1347        // dfx canister call ic_object_store_canister admin_clear '()'
1348        let secret = [8u8; 32];
1349        let canister = Principal::from_text("6at64-oyaaa-aaaap-anvza-cai").unwrap();
1350        let id = BasicIdentity::from_raw_key(&secret);
1351        println!("id: {:?}", id.sender().unwrap().to_text());
1352        // jjn6g-sh75l-r3cxb-wxrkl-frqld-6p6qq-d4ato-wske5-op7s5-n566f-bqe
1353        // # Add managers
1354        // dfx canister call ic_object_store_canister admin_add_managers "(vec {principal \"jjn6g-sh75l-r3cxb-wxrkl-frqld-6p6qq-d4ato-wske5-op7s5-n566f-bqe\"})"
1355
1356        // It will take a long time to run this test.
1357        // test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 396.77s
1358
1359        let agent = build_agent("http://localhost:4943", Arc::new(id))
1360            .await
1361            .unwrap();
1362        let cli = Arc::new(Client::new(Arc::new(agent), canister, Some(secret)));
1363        let storage = ObjectStoreClient::new(cli.clone());
1364
1365        let location = Path::from(NON_EXISTENT_NAME);
1366
1367        let err = get_nonexistent_object(&storage, Some(location))
1368            .await
1369            .unwrap_err();
1370        if let object_store::Error::NotFound { path, .. } = err {
1371            assert!(path.ends_with(NON_EXISTENT_NAME));
1372        } else {
1373            panic!("unexpected error type: {err:?}");
1374        }
1375
1376        put_get_delete_list(&storage).await;
1377        put_get_attributes(&storage).await;
1378        get_opts(&storage).await;
1379        put_opts(&storage, true).await;
1380        list_uses_directories_correctly(&storage).await;
1381        list_with_delimiter(&storage).await;
1382        rename_and_copy(&storage).await;
1383        copy_if_not_exists(&storage).await;
1384        copy_rename_nonexistent_object(&storage).await;
1385        // multipart_race_condition(&storage, true).await; // TODO: fix this test?
1386        multipart_out_of_order(&storage).await;
1387
1388        let objs = storage.list(None).collect::<Vec<_>>().await;
1389        for obj in objs {
1390            let obj = obj.unwrap();
1391            storage
1392                .delete(&obj.location)
1393                .await
1394                .expect("failed to delete object");
1395        }
1396        stream_get(&storage).await;
1397    }
1398}