wasmcloud_s3/
lib.rs

1//! # S3 implementation of the wasmCloud blob store capability provider API
2//!
3//! Provides an implementation of the wasmcloud:blobstore contract for S3 and
4//! S3-compatible (e.g. Minio) products.
5
6#[macro_use]
7extern crate wasmcloud_provider_core as codec;
8use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
9use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
10use codec::{deserialize, serialize};
11use log::{error, trace};
12use rusoto_s3::S3Client;
13use std::error::Error;
14use std::{
15    collections::HashMap,
16    sync::{Arc, RwLock},
17};
18use wasmcloud_actor_blobstore::*;
19use wasmcloud_actor_core::CapabilityConfiguration;
20mod s3;
21
22#[cfg(not(feature = "static_plugin"))]
23capability_provider!(S3Provider, S3Provider::new);
24
25#[allow(unused)]
26const CAPABILITY_ID: &str = "wasmcloud:blobstore";
27const SYSTEM_ACTOR: &str = "system";
28
29#[derive(Debug, PartialEq)]
30struct FileUpload {
31    container: String,
32    id: String,
33    total_bytes: u64,
34    expected_chunks: u64,
35    chunks: Vec<FileChunk>,
36}
37
38impl FileUpload {
39    pub fn is_complete(&self) -> bool {
40        self.chunks.len() == self.expected_chunks as usize
41    }
42}
43
44/// AWS S3 implementation of the `wasmcloud:blobstore` specification
45#[derive(Clone)]
46pub struct S3Provider {
47    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
48    clients: Arc<RwLock<HashMap<String, Arc<S3Client>>>>,
49    uploads: Arc<RwLock<HashMap<String, FileUpload>>>,
50}
51
52impl Default for S3Provider {
53    fn default() -> Self {
54        match env_logger::try_init() {
55            Ok(_) => {}
56            Err(_) => {}
57        }
58
59        S3Provider {
60            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
61            clients: Arc::new(RwLock::new(HashMap::new())),
62            uploads: Arc::new(RwLock::new(HashMap::new())),
63        }
64    }
65}
66
67impl S3Provider {
68    /// Creates a new S3 provider
69    pub fn new() -> Self {
70        Self::default()
71    }
72
73    fn configure(
74        &self,
75        config: CapabilityConfiguration,
76    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
77        self.clients.write().unwrap().insert(
78            config.module.clone(),
79            Arc::new(s3::client_for_config(&config)?),
80        );
81
82        Ok(vec![])
83    }
84    fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
85        self.clients.write().unwrap().remove(actor);
86
87        Ok(vec![])
88    }
89
90    fn create_container(
91        &self,
92        actor: &str,
93        args: CreateContainerArgs,
94    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
95        let rt = tokio::runtime::Runtime::new().unwrap();
96        rt.block_on(s3::create_bucket(
97            &self.clients.read().unwrap()[actor],
98            &args.id,
99        ))?;
100
101        serialize(Container::new(args.id))
102    }
103
104    fn remove_container(
105        &self,
106        actor: &str,
107        args: RemoveContainerArgs,
108    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
109        let rt = tokio::runtime::Runtime::new().unwrap();
110        serialize(
111            rt.block_on(s3::remove_bucket(
112                &self.clients.read().unwrap()[actor],
113                &args.id,
114            ))
115            .map_or_else(
116                |e| BlobstoreResult {
117                    success: false,
118                    error: Some(e.to_string()),
119                },
120                |_| BlobstoreResult {
121                    success: true,
122                    error: None,
123                },
124            ),
125        )
126    }
127
128    fn upload_chunk(
129        &self,
130        actor: &str,
131        chunk: FileChunk,
132    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
133        let key = upload_key(&chunk.container.id, &chunk.id, &actor);
134        self.uploads
135            .write()
136            .unwrap()
137            .entry(key.clone())
138            .and_modify(|u| {
139                u.chunks.push(chunk);
140            });
141        if self.uploads.read().unwrap()[&key].is_complete() {
142            let rt = tokio::runtime::Runtime::new().unwrap();
143            rt.block_on(s3::complete_upload(
144                &self.clients.read().unwrap()[actor],
145                &self.uploads.read().unwrap()[&key],
146            ))?;
147            self.uploads.write().unwrap().remove(&key);
148        }
149        Ok(vec![])
150    }
151
152    fn start_upload(
153        &self,
154        actor: &str,
155        chunk: FileChunk,
156    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
157        let key = upload_key(&chunk.container.id, &chunk.id, &actor);
158
159        let upload = FileUpload {
160            chunks: vec![],
161            container: chunk.container.id,
162            id: chunk.id.to_string(),
163            total_bytes: chunk.total_bytes,
164            expected_chunks: expected_chunks(chunk.total_bytes, chunk.chunk_size),
165        };
166
167        self.uploads.write().unwrap().insert(key, upload);
168
169        serialize(BlobstoreResult {
170            success: true,
171            error: None,
172        })
173    }
174
175    fn remove_object(
176        &self,
177        actor: &str,
178        args: RemoveObjectArgs,
179    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
180        let rt = tokio::runtime::Runtime::new().unwrap();
181        serialize(
182            rt.block_on(s3::remove_object(
183                &self.clients.read().unwrap()[actor],
184                &args.container_id,
185                &args.id,
186            ))
187            .map_or_else(
188                |e| BlobstoreResult {
189                    success: false,
190                    error: Some(e.to_string()),
191                },
192                |_| BlobstoreResult {
193                    success: true,
194                    error: None,
195                },
196            ),
197        )
198    }
199
200    fn get_object_info(
201        &self,
202        actor: &str,
203        args: GetObjectInfoArgs,
204    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
205        let rt = tokio::runtime::Runtime::new().unwrap();
206        let info = rt.block_on(s3::head_object(
207            &self.clients.read().unwrap()[actor],
208            &args.container_id,
209            &args.blob_id,
210        ));
211
212        serialize(&info.map_or_else(
213            |_| Blob {
214                id: "none".to_string(),
215                container: Container::new("none".to_string()),
216                byte_size: 0,
217            },
218            |ob| Blob {
219                id: args.blob_id.to_string(),
220                container: Container::new(args.container_id),
221                byte_size: ob.content_length.unwrap() as u64,
222            },
223        ))
224    }
225
226    fn list_objects(
227        &self,
228        actor: &str,
229        args: ListObjectsArgs,
230    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
231        let rt = tokio::runtime::Runtime::new().unwrap();
232        let objects = rt.block_on(s3::list_objects(
233            &self.clients.read().unwrap()[actor],
234            &args.container_id,
235        ))?;
236        let blobs = if let Some(v) = objects {
237            v.iter()
238                .map(|ob| Blob {
239                    id: ob.key.clone().unwrap(),
240                    container: Container::new(args.container_id.clone()),
241                    byte_size: ob.size.unwrap() as u64,
242                })
243                .collect()
244        } else {
245            vec![]
246        };
247        serialize(&BlobList { blobs })
248    }
249
250    fn start_download(
251        &self,
252        actor: &str,
253        args: StartDownloadArgs,
254    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
255        let actor = actor.to_string();
256
257        let d = self.dispatcher.clone();
258        let c = self.clients.read().unwrap()[&actor].clone();
259        let container = args.container_id.to_string();
260        let chunk_size = args.chunk_size;
261        let id = args.blob_id.to_string();
262        let ctx = args.context.clone();
263
264        let byte_size = {
265            let rt = tokio::runtime::Runtime::new().unwrap();
266            let info = rt.block_on(s3::head_object(&c, &container, &id)).unwrap();
267            drop(rt);
268            info.content_length.unwrap() as u64
269        };
270
271        std::thread::spawn(move || {
272            let actor = actor.to_string();
273
274            let chunk_count = expected_chunks(byte_size, chunk_size);
275            let rt = tokio::runtime::Runtime::new().unwrap();
276            rt.block_on(async {
277                for idx in 0..chunk_count {
278                    dispatch_chunk(
279                        idx,
280                        d.clone(),
281                        c.clone(),
282                        container.to_string(),
283                        id.to_string(),
284                        chunk_size,
285                        byte_size,
286                        actor.clone(),
287                        ctx.clone(),
288                    )
289                    .await;
290                }
291            });
292        });
293
294        serialize(BlobstoreResult {
295            success: true,
296            error: None,
297        })
298    }
299}
300
301async fn dispatch_chunk(
302    idx: u64,
303    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
304    client: Arc<S3Client>,
305    container: String,
306    id: String,
307    chunk_size: u64,
308    byte_size: u64,
309    actor: String,
310    context: Option<String>,
311) {
312    // range header spec: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
313    // tl;dr - ranges are _inclusive_, but start at 0.
314    // idx 0, start 0, end chunk_size-1
315    let start = idx * chunk_size;
316    let mut end = start + chunk_size - 1;
317    if end > byte_size {
318        end = byte_size - 1;
319    }
320
321    let bytes = s3::get_blob_range(&client, &container, &id, start, end)
322        .await
323        .unwrap();
324
325    let fc = FileChunk {
326        sequence_no: idx + 1,
327        container: Container::new(container),
328        id,
329        chunk_size,
330        total_bytes: byte_size,
331        chunk_bytes: bytes,
332        context,
333    };
334    match dispatcher
335        .read()
336        .unwrap()
337        .dispatch(&actor, OP_RECEIVE_CHUNK, &serialize(&fc).unwrap())
338    {
339        Ok(_) => {}
340        Err(_) => error!("Failed to dispatch block to actor {}", actor),
341    }
342}
343
344impl CapabilityProvider for S3Provider {
345    // Invoked by the runtime host to give this provider plugin the ability to communicate
346    // with actors
347    fn configure_dispatch(
348        &self,
349        dispatcher: Box<dyn Dispatcher>,
350    ) -> Result<(), Box<dyn Error + Sync + Send>> {
351        trace!("Dispatcher received.");
352        let mut lock = self.dispatcher.write().unwrap();
353        *lock = dispatcher;
354
355        Ok(())
356    }
357
358    // Invoked by host runtime to allow an actor to make use of the capability
359    // All providers MUST handle the "configure" message, even if no work will be done
360    fn handle_call(
361        &self,
362        actor: &str,
363        op: &str,
364        msg: &[u8],
365    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
366        trace!("Received host call from {}, operation - {}", actor, op);
367
368        match op {
369            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
370            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
371            OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
372            OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
373            OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
374            OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
375            OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
376            OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
377            OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
378            OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
379
380            _ => Err("bad dispatch".into()),
381        }
382    }
383
384    /// No cleanup needed
385    fn stop(&self) {
386        ()
387    }
388}
389
390fn expected_chunks(total_bytes: u64, chunk_size: u64) -> u64 {
391    let mut chunks = total_bytes / chunk_size;
392    if total_bytes % chunk_size != 0 {
393        chunks = chunks + 1
394    }
395    chunks
396}
397
398fn upload_key(container: &str, blob_id: &str, actor: &str) -> String {
399    format!("{}-{}-{}", actor, container, blob_id)
400}
401
402#[cfg(test)]
403mod test {
404    use super::*;
405    use crossbeam_utils::sync::WaitGroup;
406    use std::collections::HashMap;
407
408    // ***! These tests MUST be run in the presence of a minio server
409    // The easiest option is just to run the default minio docker image as a
410    // service. The following command will get you up and running for these tests.
411    //
412    // (Don't use this setup for production)
413    // docker run -p 9000:9000 --name minio \
414    //   --env MINIO_ACCESS_KEY="minioadmin" \
415    //   --env MINIO_SECRET_KEY="minioadmin" bitnami/minio:latest
416
417    #[test]
418    fn test_create_and_remove_bucket() {
419        let provider = S3Provider::new();
420        provider.configure(gen_config("testar")).unwrap();
421        let container_args = CreateContainerArgs {
422            id: "addremovebucket".to_string(),
423        };
424        let res = provider.handle_call(
425            "testar",
426            OP_CREATE_CONTAINER,
427            &serialize(&container_args).unwrap(),
428        );
429        assert!(res.is_ok());
430        let remove_args = RemoveContainerArgs {
431            id: container_args.id,
432        };
433        let res2 = provider.handle_call(
434            "testar",
435            OP_REMOVE_CONTAINER,
436            &serialize(remove_args).unwrap(),
437        );
438        assert!(res2.is_ok());
439    }
440
441    #[test]
442    fn test_upload_and_download() {
443        let provider = S3Provider::new();
444        provider.configure(gen_config("testupanddown")).unwrap();
445        let wg = WaitGroup::new();
446        let dispatcher = Box::new(TestDispatcher::new(wg.clone(), expected_chunks(10427, 100)));
447        provider.configure_dispatch(dispatcher).unwrap();
448
449        let container_args = CreateContainerArgs {
450            id: "updownbucket".to_string(),
451        };
452        let _res = provider.handle_call(
453            "testupanddown",
454            OP_CREATE_CONTAINER,
455            &serialize(&container_args).unwrap(),
456        );
457
458        let mut data: Vec<u8> = Vec::new();
459        for _ in 0..10427 {
460            data.push(42);
461        }
462
463        let chunk_list: Vec<FileChunk> = data
464            .chunks(100)
465            .enumerate()
466            .map(|(idx, v)| FileChunk {
467                chunk_bytes: v.to_vec(),
468                chunk_size: 100,
469                container: Container::new("updownbucket".to_string()),
470                id: "updowntestfile".to_string(),
471                total_bytes: data.len() as u64,
472                sequence_no: idx as u64 + 1,
473                context: None,
474            })
475            .collect();
476
477        let first_chunk = FileChunk {
478            chunk_bytes: vec![],
479            chunk_size: 100,
480            container: Container::new("updownbucket".to_string()),
481            id: "updowntestfile".to_string(),
482            total_bytes: data.len() as u64,
483            sequence_no: 0,
484            context: None,
485        };
486
487        let _ = provider
488            .handle_call(
489                "testupanddown",
490                OP_START_UPLOAD,
491                &serialize(&first_chunk).unwrap(),
492            )
493            .unwrap();
494
495        for chunk in chunk_list {
496            let _ = provider
497                .handle_call("testupanddown", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap())
498                .unwrap();
499        }
500        let req = StartDownloadArgs {
501            chunk_size: 100,
502            container_id: "updownbucket".to_string(),
503            blob_id: "updowntestfile".to_string(),
504            context: Some("test1".to_string()),
505        };
506        let _ = provider
507            .handle_call(
508                "testupanddown",
509                OP_START_DOWNLOAD,
510                &serialize(&req).unwrap(),
511            )
512            .unwrap();
513
514        wg.wait();
515        assert!(true);
516    }
517
518    #[test]
519    fn test_upload() {
520        let provider = S3Provider::new();
521        provider.configure(gen_config("testupload")).unwrap();
522
523        let container_args = CreateContainerArgs {
524            id: "uploadbucket".to_string(),
525        };
526        let _res = provider.handle_call(
527            "testupload",
528            OP_CREATE_CONTAINER,
529            &serialize(&container_args).unwrap(),
530        );
531
532        let mut data: Vec<u8> = Vec::new();
533        for _ in 0..10427 {
534            data.push(42);
535        }
536
537        let chunk_list: Vec<FileChunk> = data
538            .chunks(100)
539            .enumerate()
540            .map(|(idx, v)| FileChunk {
541                chunk_bytes: v.to_vec(),
542                chunk_size: 100,
543                container: Container::new("uploadbucket".to_string()),
544                id: "testfile".to_string(),
545                total_bytes: data.len() as u64,
546                sequence_no: idx as u64 + 1,
547                context: None,
548            })
549            .collect();
550
551        let first_chunk = FileChunk {
552            chunk_bytes: vec![],
553            chunk_size: 100,
554            container: Container::new("uploadbucket".to_string()),
555            id: "testfile".to_string(),
556            total_bytes: data.len() as u64,
557            sequence_no: 0,
558            context: None,
559        };
560
561        let _ = provider.handle_call(
562            "testupload",
563            OP_START_UPLOAD,
564            &serialize(&first_chunk).unwrap(),
565        );
566
567        for chunk in chunk_list {
568            let _ = provider.handle_call("testupload", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap());
569        }
570
571        let list_objects = ListObjectsArgs {
572            container_id: container_args.id,
573        };
574
575        let list = provider
576            .handle_call(
577                "testupload",
578                OP_LIST_OBJECTS,
579                &serialize(&list_objects).unwrap(),
580            )
581            .unwrap();
582        let object_list: BlobList = deserialize(&list).unwrap();
583        assert_eq!(1, object_list.blobs.len());
584        assert_eq!("testfile", object_list.blobs[0].id);
585
586        let get_info_args = GetObjectInfoArgs {
587            container_id: "uploadbucket".to_string(),
588            blob_id: "testfile".to_string(),
589        };
590
591        let info = provider
592            .handle_call(
593                "testupload",
594                OP_GET_OBJECT_INFO,
595                &serialize(&get_info_args).unwrap(),
596            )
597            .unwrap();
598        let objinfo: Blob = deserialize(&info).unwrap();
599        assert_eq!(10427, objinfo.byte_size);
600        let remove_args = RemoveObjectArgs {
601            id: get_info_args.blob_id,
602            container_id: get_info_args.container_id,
603        };
604        let _ = provider
605            .handle_call(
606                "testupload",
607                OP_REMOVE_OBJECT,
608                &serialize(&remove_args).unwrap(),
609            )
610            .unwrap();
611        let remove_container_args = RemoveContainerArgs {
612            id: remove_args.container_id,
613        };
614        let _ = provider
615            .handle_call(
616                "testupload",
617                OP_REMOVE_CONTAINER,
618                &serialize(&remove_container_args).unwrap(),
619            )
620            .unwrap();
621    }
622
623    fn gen_config(module: &str) -> CapabilityConfiguration {
624        CapabilityConfiguration {
625            module: module.to_string(),
626            values: minio_config(),
627        }
628    }
629
630    fn minio_config() -> HashMap<String, String> {
631        let mut hm = HashMap::new();
632        hm.insert("ENDPOINT".to_string(), "http://localhost:9000".to_string());
633        hm.insert("REGION".to_string(), "us-east-1".to_string());
634        hm.insert("AWS_ACCESS_KEY".to_string(), "minioadmin".to_string());
635        hm.insert(
636            "AWS_SECRET_ACCESS_KEY".to_string(),
637            "minioadmin".to_string(),
638        );
639
640        hm
641    }
642
643    struct TestDispatcher {
644        chunks: RwLock<Vec<FileChunk>>,
645        wg: RwLock<Option<WaitGroup>>,
646        expected_chunks: u64,
647    }
648
649    impl TestDispatcher {
650        fn new(wg: WaitGroup, expected_chunks: u64) -> TestDispatcher {
651            TestDispatcher {
652                chunks: RwLock::new(vec![]),
653                wg: RwLock::new(Some(wg)),
654                expected_chunks,
655            }
656        }
657    }
658
659    impl Dispatcher for TestDispatcher {
660        fn dispatch(
661            &self,
662            _actor: &str,
663            _op: &str,
664            msg: &[u8],
665        ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
666            let fc: FileChunk = deserialize(msg)?;
667            self.chunks.write().unwrap().push(fc);
668            if self.chunks.read().unwrap().len() == self.expected_chunks as usize {
669                *self.wg.write().unwrap() = None;
670            }
671            Ok(vec![])
672        }
673    }
674}