Skip to main content

worker/r2/
mod.rs

1use std::{collections::HashMap, convert::TryInto, ops::Deref};
2
3pub use builder::*;
4
5use js_sys::{JsString, Reflect, Uint8Array};
6use wasm_bindgen::{JsCast, JsValue};
7use wasm_bindgen_futures::JsFuture;
8use worker_sys::{
9    FixedLengthStream as EdgeFixedLengthStream, R2Bucket as EdgeR2Bucket, R2Checksums,
10    R2MultipartUpload as EdgeR2MultipartUpload, R2Object as EdgeR2Object,
11    R2ObjectBody as EdgeR2ObjectBody, R2Objects as EdgeR2Objects,
12    R2UploadedPart as EdgeR2UploadedPart,
13};
14
15use crate::{
16    env::EnvBinding, ByteStream, Date, Error, FixedLengthStream, Headers, ResponseBody, Result,
17};
18
19mod builder;
20
21/// An instance of the R2 bucket binding.
22#[derive(Debug, Clone)]
23pub struct Bucket {
24    inner: EdgeR2Bucket,
25}
26
27impl Bucket {
28    /// Retrieves the [Object] for the given key containing only object metadata, if the key exists.
29    pub async fn head(&self, key: impl Into<String>) -> Result<Option<Object>> {
30        let head_promise = self.inner.head(key.into())?;
31        let value = JsFuture::from(head_promise).await?;
32
33        if value.is_null() {
34            return Ok(None);
35        }
36
37        Ok(Some(Object {
38            inner: ObjectInner::NoBody(value.into()),
39        }))
40    }
41
42    /// Retrieves the [Object] for the given key containing object metadata and the object body if
43    /// the key exists. In the event that a precondition specified in options fails, get() returns
44    /// an [Object] with no body.
45    pub fn get(&self, key: impl Into<String>) -> GetOptionsBuilder<'_> {
46        GetOptionsBuilder {
47            edge_bucket: &self.inner,
48            key: key.into(),
49            only_if: None,
50            range: None,
51        }
52    }
53
54    /// Stores the given `value` and metadata under the associated `key`. Once the write succeeds,
55    /// returns an [Object] containing metadata about the stored Object.
56    ///
57    /// R2 writes are strongly consistent. Once the future resolves, all subsequent read operations
58    /// will see this key value pair globally.
59    pub fn put(&self, key: impl Into<String>, value: impl Into<Data>) -> PutOptionsBuilder<'_> {
60        PutOptionsBuilder {
61            edge_bucket: &self.inner,
62            key: key.into(),
63            value: value.into(),
64            http_metadata: None,
65            custom_metadata: None,
66            checksum: None,
67            checksum_algorithm: "md5".into(),
68        }
69    }
70
71    /// Deletes the given value and metadata under the associated key. Once the delete succeeds,
72    /// returns void.
73    ///
74    /// R2 deletes are strongly consistent. Once the Promise resolves, all subsequent read
75    /// operations will no longer see this key value pair globally.
76    pub async fn delete(&self, key: impl Into<String>) -> Result<()> {
77        let delete_promise = self.inner.delete(key.into())?;
78        JsFuture::from(delete_promise).await?;
79        Ok(())
80    }
81
82    /// Deletes the given values and metadata under the associated keys. Once
83    /// the delete succeeds, returns void.
84    ///
85    /// R2 deletes are strongly consistent. Once the Promise resolves, all
86    /// subsequent read operations will no longer see the provided key value
87    /// pairs globally.
88    ///
89    /// Up to 1000 keys may be deleted per call.
90    pub async fn delete_multiple(&self, keys: Vec<impl Deref<Target = str>>) -> Result<()> {
91        let fut: JsFuture = self
92            .inner
93            .delete_multiple(keys.into_iter().map(|key| JsValue::from(&*key)).collect())?
94            .into();
95        fut.await?;
96        Ok(())
97    }
98
99    /// Returns an [Objects] containing a list of [Objects]s contained within the bucket. By
100    /// default, returns the first 1000 entries.
101    pub fn list(&self) -> ListOptionsBuilder<'_> {
102        ListOptionsBuilder {
103            edge_bucket: &self.inner,
104            limit: None,
105            prefix: None,
106            cursor: None,
107            start_after: None,
108            delimiter: None,
109            include: None,
110        }
111    }
112
113    /// Creates a multipart upload.
114    ///
115    /// Returns a [MultipartUpload] value representing the newly created multipart upload.
116    /// Once the multipart upload has been created, the multipart upload can be immediately
117    /// interacted with globally, either through the Workers API, or through the S3 API.
118    pub fn create_multipart_upload(
119        &self,
120        key: impl Into<String>,
121    ) -> CreateMultipartUploadOptionsBuilder<'_> {
122        CreateMultipartUploadOptionsBuilder {
123            edge_bucket: &self.inner,
124            key: key.into(),
125            http_metadata: None,
126            custom_metadata: None,
127        }
128    }
129
130    /// Returns an object representing a multipart upload with the given `key` and `uploadId`.
131    ///
132    /// The operation does not perform any checks to ensure the validity of the `uploadId`,
133    /// nor does it verify the existence of a corresponding active multipart upload.
134    /// This is done to minimize latency before being able to call subsequent operations on the returned object.
135    pub fn resume_multipart_upload(
136        &self,
137        key: impl Into<String>,
138        upload_id: impl Into<String>,
139    ) -> Result<MultipartUpload> {
140        Ok(MultipartUpload {
141            inner: self
142                .inner
143                .resume_multipart_upload(key.into(), upload_id.into())?
144                .into(),
145        })
146    }
147}
148
149impl EnvBinding for Bucket {
150    const TYPE_NAME: &'static str = "R2Bucket";
151}
152
153impl JsCast for Bucket {
154    fn instanceof(val: &JsValue) -> bool {
155        val.is_instance_of::<EdgeR2Bucket>()
156    }
157
158    fn unchecked_from_js(val: JsValue) -> Self {
159        Self { inner: val.into() }
160    }
161
162    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
163        unsafe { &*(val as *const JsValue as *const Self) }
164    }
165}
166
167impl From<Bucket> for JsValue {
168    fn from(bucket: Bucket) -> Self {
169        JsValue::from(bucket.inner)
170    }
171}
172
173impl AsRef<JsValue> for Bucket {
174    fn as_ref(&self) -> &JsValue {
175        &self.inner
176    }
177}
178
179/// [Object] is created when you [put](Bucket::put) an object into a [Bucket]. [Object] represents
180/// the metadata of an object based on the information provided by the uploader. Every object that
181/// you [put](Bucket::put) into a [Bucket] will have an [Object] created.
182#[derive(Debug)]
183pub struct Object {
184    inner: ObjectInner,
185}
186
187impl Object {
188    pub fn key(&self) -> String {
189        match &self.inner {
190            ObjectInner::NoBody(inner) => inner.key().unwrap(),
191            ObjectInner::Body(inner) => inner.key().unwrap(),
192        }
193    }
194
195    pub fn version(&self) -> String {
196        match &self.inner {
197            ObjectInner::NoBody(inner) => inner.version().unwrap(),
198            ObjectInner::Body(inner) => inner.version().unwrap(),
199        }
200    }
201
202    pub fn size(&self) -> u64 {
203        let size = match &self.inner {
204            ObjectInner::NoBody(inner) => inner.size().unwrap(),
205            ObjectInner::Body(inner) => inner.size().unwrap(),
206        };
207        size.round() as u64
208    }
209
210    pub fn etag(&self) -> String {
211        match &self.inner {
212            ObjectInner::NoBody(inner) => inner.etag().unwrap(),
213            ObjectInner::Body(inner) => inner.etag().unwrap(),
214        }
215    }
216
217    pub fn http_etag(&self) -> String {
218        match &self.inner {
219            ObjectInner::NoBody(inner) => inner.http_etag().unwrap(),
220            ObjectInner::Body(inner) => inner.http_etag().unwrap(),
221        }
222    }
223
224    pub fn uploaded(&self) -> Date {
225        match &self.inner {
226            ObjectInner::NoBody(inner) => inner.uploaded().unwrap(),
227            ObjectInner::Body(inner) => inner.uploaded().unwrap(),
228        }
229        .into()
230    }
231
232    pub fn http_metadata(&self) -> HttpMetadata {
233        match &self.inner {
234            ObjectInner::NoBody(inner) => inner.http_metadata().unwrap(),
235            ObjectInner::Body(inner) => inner.http_metadata().unwrap(),
236        }
237        .into()
238    }
239
240    pub fn checksum(&self) -> R2Checksums {
241        match &self.inner {
242            ObjectInner::NoBody(inner) => inner.checksums().unwrap(),
243            ObjectInner::Body(inner) => inner.checksums().unwrap(),
244        }
245        .into()
246    }
247
248    pub fn custom_metadata(&self) -> Result<HashMap<String, String>> {
249        let metadata = match &self.inner {
250            ObjectInner::NoBody(inner) => inner.custom_metadata().unwrap(),
251            ObjectInner::Body(inner) => inner.custom_metadata().unwrap(),
252        };
253
254        let keys = js_sys::Object::keys(&metadata).to_vec();
255        let mut map = HashMap::with_capacity(keys.len());
256
257        for key in keys {
258            let key = key.unchecked_into::<JsString>();
259            let value = Reflect::get(&metadata, &key)?.dyn_into::<JsString>()?;
260            map.insert(key.into(), value.into());
261        }
262
263        Ok(map)
264    }
265
266    pub fn range(&self) -> Result<Range> {
267        match &self.inner {
268            ObjectInner::NoBody(inner) => inner.range().unwrap(),
269            ObjectInner::Body(inner) => inner.range().unwrap(),
270        }
271        .try_into()
272    }
273
274    pub fn body(&self) -> Option<ObjectBody<'_>> {
275        match &self.inner {
276            ObjectInner::NoBody(_) => None,
277            ObjectInner::Body(body) => Some(ObjectBody { inner: body }),
278        }
279    }
280
281    pub fn body_used(&self) -> Option<bool> {
282        match &self.inner {
283            ObjectInner::NoBody(_) => None,
284            ObjectInner::Body(inner) => Some(inner.body_used().unwrap()),
285        }
286    }
287
288    pub fn write_http_metadata(&self, headers: Headers) -> Result<()> {
289        match &self.inner {
290            ObjectInner::NoBody(inner) => inner.write_http_metadata(headers.0)?,
291            ObjectInner::Body(inner) => inner.write_http_metadata(headers.0)?,
292        };
293
294        Ok(())
295    }
296}
297
298/// The data contained within an [Object].
299#[derive(Debug)]
300pub struct ObjectBody<'body> {
301    inner: &'body EdgeR2ObjectBody,
302}
303
304impl ObjectBody<'_> {
305    /// Reads the data in the [Object] via a [ByteStream].
306    pub fn stream(self) -> Result<ByteStream> {
307        if self.inner.body_used()? {
308            return Err(Error::BodyUsed);
309        }
310
311        let stream = self.inner.body()?;
312        let stream = wasm_streams::ReadableStream::from_raw(stream.unchecked_into());
313        Ok(ByteStream {
314            inner: stream.into_stream(),
315        })
316    }
317
318    /// Returns a [ResponseBody] containing the data in the [Object].
319    ///
320    /// This function can be used to hand off the [Object] data to the workers runtime for streaming
321    /// to the client in a [crate::Response]. This ensures that the worker does not consume CPU time
322    /// while the streaming occurs, which can be significant if instead [ObjectBody::stream] is used.
323    pub fn response_body(self) -> Result<ResponseBody> {
324        if self.inner.body_used()? {
325            return Err(Error::BodyUsed);
326        }
327
328        Ok(ResponseBody::Stream(self.inner.body()?))
329    }
330
331    pub async fn bytes(self) -> Result<Vec<u8>> {
332        let js_buffer = JsFuture::from(self.inner.array_buffer()?).await?;
333        let js_buffer = Uint8Array::new(&js_buffer);
334        let mut bytes = vec![0; js_buffer.length() as usize];
335        js_buffer.copy_to(&mut bytes);
336
337        Ok(bytes)
338    }
339
340    pub async fn text(self) -> Result<String> {
341        String::from_utf8(self.bytes().await?).map_err(|e| Error::RustError(e.to_string()))
342    }
343}
344
345/// [UploadedPart] represents a part that has been uploaded.
346/// [UploadedPart] objects are returned from [upload_part](MultipartUpload::upload_part) operations
347/// and must be passed to the [complete](MultipartUpload::complete) operation.
348#[derive(Debug)]
349pub struct UploadedPart {
350    inner: EdgeR2UploadedPart,
351}
352
353impl UploadedPart {
354    pub fn new(part_number: u16, etag: String) -> Self {
355        let obj = js_sys::Object::new();
356        Reflect::set(
357            &obj,
358            &JsValue::from_str("partNumber"),
359            &JsValue::from_f64(part_number as f64),
360        )
361        .unwrap();
362        Reflect::set(&obj, &JsValue::from_str("etag"), &JsValue::from_str(&etag)).unwrap();
363
364        let val: JsValue = obj.into();
365        Self { inner: val.into() }
366    }
367
368    pub fn part_number(&self) -> u16 {
369        self.inner.part_number().unwrap()
370    }
371
372    pub fn etag(&self) -> String {
373        self.inner.etag().unwrap()
374    }
375}
376
377#[derive(Debug)]
378pub struct MultipartUpload {
379    inner: EdgeR2MultipartUpload,
380}
381
382impl MultipartUpload {
383    /// Uploads a single part with the specified part number to this multipart upload.
384    ///
385    /// Returns an [UploadedPart] object containing the etag and part number.
386    /// These [UploadedPart] objects are required when completing the multipart upload.
387    ///
388    /// Getting hold of a value of this type does not guarantee that there is an active
389    /// underlying multipart upload corresponding to that object.
390    ///
391    /// A multipart upload can be completed or aborted at any time, either through the S3 API,
392    /// or by a parallel invocation of your Worker.
393    /// Therefore it is important to add the necessary error handling code around each operation
394    /// on the [MultipartUpload] object in case the underlying multipart upload no longer exists.
395    pub async fn upload_part(
396        &self,
397        part_number: u16,
398        value: impl Into<Data>,
399    ) -> Result<UploadedPart> {
400        let uploaded_part =
401            JsFuture::from(self.inner.upload_part(part_number, value.into().into())?).await?;
402        Ok(UploadedPart {
403            inner: uploaded_part.into(),
404        })
405    }
406
407    /// Request the upload id.
408    pub async fn upload_id(&self) -> String {
409        self.inner.upload_id().unwrap()
410    }
411
412    /// Aborts the multipart upload.
413    pub async fn abort(&self) -> Result<()> {
414        JsFuture::from(self.inner.abort()?).await?;
415        Ok(())
416    }
417
418    /// Completes the multipart upload with the given parts.
419    /// When the future is ready, the object is immediately accessible globally by any subsequent read operation.
420    pub async fn complete(
421        self,
422        uploaded_parts: impl IntoIterator<Item = UploadedPart>,
423    ) -> Result<Object> {
424        let object = JsFuture::from(
425            self.inner.complete(
426                uploaded_parts
427                    .into_iter()
428                    .map(|part| part.inner.into())
429                    .collect(),
430            )?,
431        )
432        .await?;
433        Ok(Object {
434            inner: ObjectInner::Body(object.into()),
435        })
436    }
437}
438
439/// A series of [Object]s returned by [list](Bucket::list).
440#[derive(Debug)]
441pub struct Objects {
442    inner: EdgeR2Objects,
443}
444
445impl Objects {
446    /// An [Vec] of [Object] matching the [list](Bucket::list) request.
447    pub fn objects(&self) -> Vec<Object> {
448        self.inner
449            .objects()
450            .unwrap()
451            .into_iter()
452            .map(|raw| Object {
453                inner: ObjectInner::NoBody(raw),
454            })
455            .collect()
456    }
457
458    /// If true, indicates there are more results to be retrieved for the current
459    /// [list](Bucket::list) request.
460    pub fn truncated(&self) -> bool {
461        self.inner.truncated().unwrap()
462    }
463
464    /// A token that can be passed to future [list](Bucket::list) calls to resume listing from that
465    /// point. Only present if truncated is true.
466    pub fn cursor(&self) -> Option<String> {
467        self.inner.cursor().unwrap()
468    }
469
470    /// If a delimiter has been specified, contains all prefixes between the specified prefix and
471    /// the next occurrence of the delimiter.
472    ///
473    /// For example, if no prefix is provided and the delimiter is '/', `foo/bar/baz` would return
474    /// `foo` as a delimited prefix. If `foo/` was passed as a prefix with the same structure and
475    /// delimiter, `foo/bar` would be returned as a delimited prefix.
476    pub fn delimited_prefixes(&self) -> Vec<String> {
477        self.inner
478            .delimited_prefixes()
479            .unwrap()
480            .into_iter()
481            .map(Into::into)
482            .collect()
483    }
484}
485
486#[derive(Debug, Clone)]
487pub(crate) enum ObjectInner {
488    NoBody(EdgeR2Object),
489    Body(EdgeR2ObjectBody),
490}
491
492#[derive(Debug)]
493pub enum Data {
494    ReadableStream(web_sys::ReadableStream),
495    Stream(FixedLengthStream),
496    Text(String),
497    Bytes(Vec<u8>),
498    Empty,
499}
500
501impl From<web_sys::ReadableStream> for Data {
502    fn from(stream: web_sys::ReadableStream) -> Self {
503        Data::ReadableStream(stream)
504    }
505}
506
507impl From<FixedLengthStream> for Data {
508    fn from(stream: FixedLengthStream) -> Self {
509        Data::Stream(stream)
510    }
511}
512
513impl From<String> for Data {
514    fn from(value: String) -> Self {
515        Data::Text(value)
516    }
517}
518
519impl From<Vec<u8>> for Data {
520    fn from(value: Vec<u8>) -> Self {
521        Data::Bytes(value)
522    }
523}
524
525impl From<Data> for JsValue {
526    fn from(data: Data) -> Self {
527        match data {
528            Data::ReadableStream(stream) => stream.into(),
529            Data::Stream(stream) => {
530                let stream_sys: EdgeFixedLengthStream = stream.into();
531                stream_sys.readable().into()
532            }
533            Data::Text(text) => JsString::from(text).into(),
534            Data::Bytes(bytes) => {
535                let arr = Uint8Array::new_with_length(bytes.len() as u32);
536                arr.copy_from(&bytes);
537                arr.into()
538            }
539            Data::Empty => JsValue::NULL,
540        }
541    }
542}