wascc_s3/
lib.rs

1//! # S3 implementation of the waSCC blob store capability provider API
2//!
3//! Provides an implementation of the wascc:blobstore contract for S3 and
4//! S3-compatible (e.g. Minio) products.
5
6#[macro_use]
7extern crate wascc_codec as codec;
8
9#[macro_use]
10extern crate log;
11
12use codec::capabilities::{
13    CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
14    OP_GET_CAPABILITY_DESCRIPTOR,
15};
16use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
17use codec::deserialize;
18use codec::{blobstore::*, serialize};
19use rusoto_s3::S3Client;
20use std::error::Error;
21use std::{
22    collections::HashMap,
23    sync::{Arc, RwLock},
24};
25use wascc_codec::core::CapabilityConfiguration;
26
27mod s3;
28
29#[cfg(not(feature = "static_plugin"))]
30capability_provider!(S3Provider, S3Provider::new);
31
32const CAPABILITY_ID: &str = "wascc:blobstore";
33const SYSTEM_ACTOR: &str = "system";
34const VERSION: &str = env!("CARGO_PKG_VERSION");
35const REVISION: u32 = 2; // Increment for each crates publish
36
37#[derive(Debug, PartialEq)]
38struct FileUpload {
39    container: String,
40    id: String,
41    total_bytes: u64,
42    expected_chunks: u64,
43    chunks: Vec<FileChunk>,
44}
45
46impl FileUpload {
47    pub fn is_complete(&self) -> bool {
48        self.chunks.len() == self.expected_chunks as usize
49    }
50}
51
52/// AWS S3 implementation of the `wascc:blobstore` specification
53pub struct S3Provider {
54    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
55    clients: RwLock<HashMap<String, Arc<S3Client>>>,
56    uploads: RwLock<HashMap<String, FileUpload>>,
57}
58
59impl Default for S3Provider {
60    fn default() -> Self {
61        match env_logger::try_init() {
62            Ok(_) => {}
63            Err(_) => {}
64        }
65
66        S3Provider {
67            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
68            clients: RwLock::new(HashMap::new()),
69            uploads: RwLock::new(HashMap::new()),
70        }
71    }
72}
73
74impl S3Provider {
75    /// Creates a new S3 provider
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    fn configure(
81        &self,
82        config: CapabilityConfiguration,
83    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
84        self.clients.write().unwrap().insert(
85            config.module.clone(),
86            Arc::new(s3::client_for_config(&config)?),
87        );
88
89        Ok(vec![])
90    }
91    fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
92        self.clients.write().unwrap().remove(actor);
93
94        Ok(vec![])
95    }
96
97    fn create_container(
98        &self,
99        actor: &str,
100        container: Container,
101    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
102        let mut rt = tokio::runtime::Runtime::new().unwrap();
103        rt.block_on(s3::create_bucket(
104            &self.clients.read().unwrap()[actor],
105            &container.id,
106        ))?;
107
108        Ok(vec![])
109    }
110
111    fn remove_container(
112        &self,
113        actor: &str,
114        container: Container,
115    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
116        let mut rt = tokio::runtime::Runtime::new().unwrap();
117        rt.block_on(s3::remove_bucket(
118            &self.clients.read().unwrap()[actor],
119            &container.id,
120        ))?;
121
122        Ok(vec![])
123    }
124
125    fn upload_chunk(
126        &self,
127        actor: &str,
128        chunk: FileChunk,
129    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
130        let key = upload_key(&chunk.container, &chunk.id, &actor);
131        self.uploads
132            .write()
133            .unwrap()
134            .entry(key.clone())
135            .and_modify(|u| {
136                u.chunks.push(chunk);
137            });
138        let complete = self.uploads.read().unwrap()[&key].is_complete();
139        if complete {
140            let mut rt = tokio::runtime::Runtime::new().unwrap();
141            rt.block_on(s3::complete_upload(
142                &self.clients.read().unwrap()[actor],
143                &self.uploads.read().unwrap()[&key],
144            ))?;
145            self.uploads.write().unwrap().remove(&key);
146        }
147        Ok(vec![])
148    }
149
150    fn start_upload(
151        &self,
152        actor: &str,
153        chunk: FileChunk,
154    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
155        let key = upload_key(&chunk.container, &chunk.id, &actor);
156
157        let upload = FileUpload {
158            chunks: vec![],
159            container: chunk.container.to_string(),
160            id: chunk.id.to_string(),
161            total_bytes: chunk.total_bytes,
162            expected_chunks: expected_chunks(chunk.total_bytes, chunk.chunk_size),
163        };
164
165        self.uploads.write().unwrap().insert(key, upload);
166
167        Ok(vec![])
168    }
169
170    fn remove_object(
171        &self,
172        actor: &str,
173        blob: Blob,
174    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
175        let mut rt = tokio::runtime::Runtime::new().unwrap();
176        rt.block_on(s3::remove_object(
177            &self.clients.read().unwrap()[actor],
178            &blob.container,
179            &blob.id,
180        ))?;
181
182        Ok(vec![])
183    }
184
185    fn get_object_info(
186        &self,
187        actor: &str,
188        blob: Blob,
189    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
190        let mut rt = tokio::runtime::Runtime::new().unwrap();
191        let info = rt.block_on(s3::head_object(
192            &self.clients.read().unwrap()[actor],
193            &blob.container,
194            &blob.id,
195        ));
196
197        let blob = if let Ok(ob) = info {
198            Blob {
199                id: blob.id.to_string(),
200                container: blob.container.to_string(),
201                byte_size: ob.content_length.unwrap() as u64,
202            }
203        } else {
204            Blob {
205                id: "none".to_string(),
206                container: "none".to_string(),
207                byte_size: 0,
208            }
209        };
210
211        Ok(serialize(&blob)?)
212    }
213
214    fn list_objects(
215        &self,
216        actor: &str,
217        container: Container,
218    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
219        let mut rt = tokio::runtime::Runtime::new().unwrap();
220        let objects = rt.block_on(s3::list_objects(
221            &self.clients.read().unwrap()[actor],
222            &container.id,
223        ))?;
224        let blobs = if let Some(v) = objects {
225            v.iter()
226                .map(|ob| Blob {
227                    id: ob.key.clone().unwrap(),
228                    container: container.id.to_string(),
229                    byte_size: ob.size.unwrap() as u64,
230                })
231                .collect()
232        } else {
233            vec![]
234        };
235        let bloblist = BlobList { blobs };
236        Ok(serialize(&bloblist)?)
237    }
238
239    fn start_download(
240        &self,
241        actor: &str,
242        request: StreamRequest,
243    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
244        let actor = actor.to_string();
245
246        let d = self.dispatcher.clone();
247        let c = self.clients.read().unwrap()[&actor].clone();
248        let container = request.container.to_string();
249        let chunk_size = request.chunk_size;
250        let id = request.id.to_string();
251        let ctx = request.context.clone();
252
253        let byte_size = {
254            let mut rt = tokio::runtime::Runtime::new().unwrap();
255            let info = rt.block_on(s3::head_object(&c, &container, &id)).unwrap();
256            drop(rt);
257            info.content_length.unwrap() as u64
258        };
259
260        std::thread::spawn(move || {
261            let actor = actor.to_string();
262
263            let chunk_count = expected_chunks(byte_size, chunk_size);
264            let mut rt = tokio::runtime::Runtime::new().unwrap();
265            rt.block_on(async {
266                for idx in 0..chunk_count {
267                    dispatch_chunk(
268                        idx,
269                        d.clone(),
270                        c.clone(),
271                        container.to_string(),
272                        id.to_string(),
273                        chunk_size,
274                        byte_size,
275                        actor.clone(),
276                        ctx.clone(),
277                    )
278                    .await;
279                }
280            });
281        });
282
283        Ok(vec![])
284    }
285
286    fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
287        use OperationDirection::{ToActor, ToProvider};
288        Ok(serialize(
289            CapabilityDescriptor::builder()
290                .id(CAPABILITY_ID)
291                .name("waSCC Blob Store Provider (S3)")
292                .long_description(
293                    "A waSCC blob store capability provider exposing an S3 client to actors",
294                )
295                .version(VERSION)
296                .revision(REVISION)
297                .with_operation(
298                    OP_CREATE_CONTAINER,
299                    ToProvider,
300                    "Creates a new container/bucket",
301                )
302                .with_operation(
303                    OP_REMOVE_CONTAINER,
304                    ToProvider,
305                    "Removes a container/bucket",
306                )
307                .with_operation(
308                    OP_LIST_OBJECTS,
309                    ToProvider,
310                    "Lists objects within a container",
311                )
312                .with_operation(
313                    OP_UPLOAD_CHUNK,
314                    ToProvider,
315                    "Uploads a chunk of a blob to an item in a container. Must start upload first",
316                )
317                .with_operation(
318                    OP_START_UPLOAD,
319                    ToProvider,
320                    "Starts the chunked upload of a blob",
321                )
322                .with_operation(
323                    OP_START_DOWNLOAD,
324                    ToProvider,
325                    "Starts the chunked download of a blob",
326                )
327                .with_operation(
328                    OP_GET_OBJECT_INFO,
329                    ToProvider,
330                    "Retrieves metadata about a blob",
331                )
332                .with_operation(
333                    OP_RECEIVE_CHUNK,
334                    ToActor,
335                    "Receives a chunk of a blob for download",
336                )
337                .build(),
338        )?)
339    }
340}
341
342async fn dispatch_chunk(
343    idx: u64,
344    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
345    client: Arc<S3Client>,
346    container: String,
347    id: String,
348    chunk_size: u64,
349    byte_size: u64,
350    actor: String,
351    context: Option<String>,
352) {
353    // range header spec: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
354    // tl;dr - ranges are _inclusive_, but start at 0.
355    // idx 0, start 0, end chunk_size-1
356    let start = idx * chunk_size;
357    let mut end = start + chunk_size - 1;
358    if end > byte_size {
359        end = byte_size - 1;
360    }
361
362    let bytes = s3::get_blob_range(&client, &container, &id, start, end)
363        .await
364        .unwrap();
365
366    let fc = FileChunk {
367        sequence_no: idx + 1,
368        container,
369        id,
370        chunk_size,
371        total_bytes: byte_size,
372        chunk_bytes: bytes,
373        context,
374    };
375    match dispatcher
376        .read()
377        .unwrap()
378        .dispatch(&actor, OP_RECEIVE_CHUNK, &serialize(&fc).unwrap())
379    {
380        Ok(_) => {}
381        Err(_) => error!("Failed to dispatch block to actor {}", actor),
382    }
383}
384
385impl CapabilityProvider for S3Provider {
386    // Invoked by the runtime host to give this provider plugin the ability to communicate
387    // with actors
388    fn configure_dispatch(
389        &self,
390        dispatcher: Box<dyn Dispatcher>,
391    ) -> Result<(), Box<dyn Error + Sync + Send>> {
392        trace!("Dispatcher received.");
393        let mut lock = self.dispatcher.write().unwrap();
394        *lock = dispatcher;
395
396        Ok(())
397    }
398
399    // Invoked by host runtime to allow an actor to make use of the capability
400    // All providers MUST handle the "configure" message, even if no work will be done
401    fn handle_call(
402        &self,
403        actor: &str,
404        op: &str,
405        msg: &[u8],
406    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
407        trace!("Received host call from {}, operation - {}", actor, op);
408
409        match op {
410            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
411            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
412            OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
413            OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
414            OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
415            OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
416            OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
417            OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
418            OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
419            OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
420            OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
421
422            _ => Err("bad dispatch".into()),
423        }
424    }
425}
426
427fn expected_chunks(total_bytes: u64, chunk_size: u64) -> u64 {
428    let mut chunks = total_bytes / chunk_size;
429    if total_bytes % chunk_size != 0 {
430        chunks = chunks + 1
431    }
432    chunks
433}
434
435fn upload_key(container: &str, blob_id: &str, actor: &str) -> String {
436    format!("{}-{}-{}", actor, container, blob_id)
437}
438
439#[cfg(test)]
440mod test {
441    use super::*;
442    use crossbeam_utils::sync::WaitGroup;
443    use std::collections::HashMap;
444
445    // ***! These tests MUST be run in the presence of a minio server
446    // The easiest option is just to run the default minio docker image as a
447    // service
448
449    #[test]
450    fn test_create_and_remove_bucket() {
451        let provider = S3Provider::new();
452        provider.configure(gen_config("testar")).unwrap();
453        let container = Container {
454            id: "addremovebucket".to_string(),
455        };
456        let res = provider.handle_call(
457            "testar",
458            OP_CREATE_CONTAINER,
459            &serialize(&container).unwrap(),
460        );
461        assert!(res.is_ok());
462        let res2 = provider.handle_call(
463            "testar",
464            OP_REMOVE_CONTAINER,
465            &serialize(container).unwrap(),
466        );
467        assert!(res2.is_ok());
468    }
469
470    #[test]
471    fn test_upload_and_download() {
472        let provider = S3Provider::new();
473        provider.configure(gen_config("testupanddown")).unwrap();
474        let wg = WaitGroup::new();
475        let dispatcher = Box::new(TestDispatcher::new(wg.clone(), expected_chunks(10427, 100)));
476        provider.configure_dispatch(dispatcher).unwrap();
477
478        let container = Container {
479            id: "updownbucket".to_string(),
480        };
481        let _res = provider.handle_call(
482            "testupanddown",
483            OP_CREATE_CONTAINER,
484            &serialize(&container).unwrap(),
485        );
486
487        let mut data: Vec<u8> = Vec::new();
488        for _ in 0..10427 {
489            data.push(42);
490        }
491
492        let chunk_list: Vec<FileChunk> = data
493            .chunks(100)
494            .enumerate()
495            .map(|(idx, v)| FileChunk {
496                chunk_bytes: v.to_vec(),
497                chunk_size: 100,
498                container: "updownbucket".to_string(),
499                id: "updowntestfile".to_string(),
500                total_bytes: data.len() as u64,
501                sequence_no: idx as u64 + 1,
502                context: None,
503            })
504            .collect();
505
506        let first_chunk = FileChunk {
507            chunk_bytes: vec![],
508            chunk_size: 100,
509            container: "updownbucket".to_string(),
510            id: "updowntestfile".to_string(),
511            total_bytes: data.len() as u64,
512            sequence_no: 0,
513            context: None,
514        };
515
516        let _ = provider
517            .handle_call(
518                "testupanddown",
519                OP_START_UPLOAD,
520                &serialize(&first_chunk).unwrap(),
521            )
522            .unwrap();
523
524        for chunk in chunk_list {
525            let _ = provider
526                .handle_call("testupanddown", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap())
527                .unwrap();
528        }
529        let req = StreamRequest {
530            chunk_size: 100,
531            container: "updownbucket".to_string(),
532            id: "updowntestfile".to_string(),
533            context: Some("test1".to_string()),
534        };
535        let _ = provider
536            .handle_call(
537                "testupanddown",
538                OP_START_DOWNLOAD,
539                &serialize(&req).unwrap(),
540            )
541            .unwrap();
542
543        wg.wait();
544        assert!(true);
545    }
546
547    #[test]
548    fn test_upload() {
549        let provider = S3Provider::new();
550        provider.configure(gen_config("testupload")).unwrap();
551
552        let container = Container {
553            id: "uploadbucket".to_string(),
554        };
555        let _res = provider.handle_call(
556            "testupload",
557            OP_CREATE_CONTAINER,
558            &serialize(&container).unwrap(),
559        );
560
561        let mut data: Vec<u8> = Vec::new();
562        for _ in 0..10427 {
563            data.push(42);
564        }
565
566        let chunk_list: Vec<FileChunk> = data
567            .chunks(100)
568            .enumerate()
569            .map(|(idx, v)| FileChunk {
570                chunk_bytes: v.to_vec(),
571                chunk_size: 100,
572                container: "uploadbucket".to_string(),
573                id: "testfile".to_string(),
574                total_bytes: data.len() as u64,
575                sequence_no: idx as u64 + 1,
576                context: None,
577            })
578            .collect();
579
580        let first_chunk = FileChunk {
581            chunk_bytes: vec![],
582            chunk_size: 100,
583            container: "uploadbucket".to_string(),
584            id: "testfile".to_string(),
585            total_bytes: data.len() as u64,
586            sequence_no: 0,
587            context: None,
588        };
589
590        let _ = provider.handle_call(
591            "testupload",
592            OP_START_UPLOAD,
593            &serialize(&first_chunk).unwrap(),
594        );
595
596        for chunk in chunk_list {
597            let _ = provider.handle_call("testupload", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap());
598        }
599
600        let list = provider
601            .handle_call(
602                "testupload",
603                OP_LIST_OBJECTS,
604                &serialize(&container).unwrap(),
605            )
606            .unwrap();
607        let object_list: BlobList = deserialize(&list).unwrap();
608        assert_eq!(1, object_list.blobs.len());
609        assert_eq!("testfile", object_list.blobs[0].id);
610
611        let blob = Blob {
612            container: "uploadbucket".to_string(),
613            id: "testfile".to_string(),
614            byte_size: 0,
615        };
616
617        let info = provider
618            .handle_call("testupload", OP_GET_OBJECT_INFO, &serialize(&blob).unwrap())
619            .unwrap();
620        let objinfo: Blob = deserialize(&info).unwrap();
621        assert_eq!(10427, objinfo.byte_size);
622        let _ = provider
623            .handle_call("testupload", OP_REMOVE_OBJECT, &serialize(&blob).unwrap())
624            .unwrap();
625        let _ = provider
626            .handle_call(
627                "testupload",
628                OP_REMOVE_CONTAINER,
629                &serialize(&container).unwrap(),
630            )
631            .unwrap();
632    }
633
634    fn gen_config(module: &str) -> CapabilityConfiguration {
635        CapabilityConfiguration {
636            module: module.to_string(),
637            values: minio_config(),
638        }
639    }
640
641    fn minio_config() -> HashMap<String, String> {
642        let mut hm = HashMap::new();
643        hm.insert("ENDPOINT".to_string(), "http://localhost:9000".to_string());
644        hm.insert("REGION".to_string(), "us-east-1".to_string());
645        hm.insert("AWS_ACCESS_KEY".to_string(), "minioadmin".to_string());
646        hm.insert(
647            "AWS_SECRET_ACCESS_KEY".to_string(),
648            "minioadmin".to_string(),
649        );
650
651        hm
652    }
653
654    struct TestDispatcher {
655        chunks: RwLock<Vec<FileChunk>>,
656        wg: RwLock<Option<WaitGroup>>,
657        expected_chunks: u64,
658    }
659
660    impl TestDispatcher {
661        fn new(wg: WaitGroup, expected_chunks: u64) -> TestDispatcher {
662            TestDispatcher {
663                chunks: RwLock::new(vec![]),
664                wg: RwLock::new(Some(wg)),
665                expected_chunks,
666            }
667        }
668    }
669
670    impl Dispatcher for TestDispatcher {
671        fn dispatch(
672            &self,
673            _actor: &str,
674            _op: &str,
675            msg: &[u8],
676        ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
677            let fc: FileChunk = deserialize(msg)?;
678            self.chunks.write().unwrap().push(fc);
679            if self.chunks.read().unwrap().len() == self.expected_chunks as usize {
680                *self.wg.write().unwrap() = None;
681            }
682            Ok(vec![])
683        }
684    }
685}