1#[macro_use]
7extern crate wasmcloud_provider_core as codec;
8use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
9use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
10use codec::{deserialize, serialize};
11use log::{error, trace};
12use rusoto_s3::S3Client;
13use std::error::Error;
14use std::{
15 collections::HashMap,
16 sync::{Arc, RwLock},
17};
18use wasmcloud_actor_blobstore::*;
19use wasmcloud_actor_core::CapabilityConfiguration;
20mod s3;
21
22#[cfg(not(feature = "static_plugin"))]
23capability_provider!(S3Provider, S3Provider::new);
24
25#[allow(unused)]
26const CAPABILITY_ID: &str = "wasmcloud:blobstore";
27const SYSTEM_ACTOR: &str = "system";
28
29#[derive(Debug, PartialEq)]
30struct FileUpload {
31 container: String,
32 id: String,
33 total_bytes: u64,
34 expected_chunks: u64,
35 chunks: Vec<FileChunk>,
36}
37
38impl FileUpload {
39 pub fn is_complete(&self) -> bool {
40 self.chunks.len() == self.expected_chunks as usize
41 }
42}
43
44#[derive(Clone)]
46pub struct S3Provider {
47 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
48 clients: Arc<RwLock<HashMap<String, Arc<S3Client>>>>,
49 uploads: Arc<RwLock<HashMap<String, FileUpload>>>,
50}
51
52impl Default for S3Provider {
53 fn default() -> Self {
54 match env_logger::try_init() {
55 Ok(_) => {}
56 Err(_) => {}
57 }
58
59 S3Provider {
60 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
61 clients: Arc::new(RwLock::new(HashMap::new())),
62 uploads: Arc::new(RwLock::new(HashMap::new())),
63 }
64 }
65}
66
67impl S3Provider {
68 pub fn new() -> Self {
70 Self::default()
71 }
72
73 fn configure(
74 &self,
75 config: CapabilityConfiguration,
76 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
77 self.clients.write().unwrap().insert(
78 config.module.clone(),
79 Arc::new(s3::client_for_config(&config)?),
80 );
81
82 Ok(vec![])
83 }
84 fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
85 self.clients.write().unwrap().remove(actor);
86
87 Ok(vec![])
88 }
89
90 fn create_container(
91 &self,
92 actor: &str,
93 args: CreateContainerArgs,
94 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
95 let rt = tokio::runtime::Runtime::new().unwrap();
96 rt.block_on(s3::create_bucket(
97 &self.clients.read().unwrap()[actor],
98 &args.id,
99 ))?;
100
101 serialize(Container::new(args.id))
102 }
103
104 fn remove_container(
105 &self,
106 actor: &str,
107 args: RemoveContainerArgs,
108 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
109 let rt = tokio::runtime::Runtime::new().unwrap();
110 serialize(
111 rt.block_on(s3::remove_bucket(
112 &self.clients.read().unwrap()[actor],
113 &args.id,
114 ))
115 .map_or_else(
116 |e| BlobstoreResult {
117 success: false,
118 error: Some(e.to_string()),
119 },
120 |_| BlobstoreResult {
121 success: true,
122 error: None,
123 },
124 ),
125 )
126 }
127
128 fn upload_chunk(
129 &self,
130 actor: &str,
131 chunk: FileChunk,
132 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
133 let key = upload_key(&chunk.container.id, &chunk.id, &actor);
134 self.uploads
135 .write()
136 .unwrap()
137 .entry(key.clone())
138 .and_modify(|u| {
139 u.chunks.push(chunk);
140 });
141 if self.uploads.read().unwrap()[&key].is_complete() {
142 let rt = tokio::runtime::Runtime::new().unwrap();
143 rt.block_on(s3::complete_upload(
144 &self.clients.read().unwrap()[actor],
145 &self.uploads.read().unwrap()[&key],
146 ))?;
147 self.uploads.write().unwrap().remove(&key);
148 }
149 Ok(vec![])
150 }
151
152 fn start_upload(
153 &self,
154 actor: &str,
155 chunk: FileChunk,
156 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
157 let key = upload_key(&chunk.container.id, &chunk.id, &actor);
158
159 let upload = FileUpload {
160 chunks: vec![],
161 container: chunk.container.id,
162 id: chunk.id.to_string(),
163 total_bytes: chunk.total_bytes,
164 expected_chunks: expected_chunks(chunk.total_bytes, chunk.chunk_size),
165 };
166
167 self.uploads.write().unwrap().insert(key, upload);
168
169 serialize(BlobstoreResult {
170 success: true,
171 error: None,
172 })
173 }
174
175 fn remove_object(
176 &self,
177 actor: &str,
178 args: RemoveObjectArgs,
179 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
180 let rt = tokio::runtime::Runtime::new().unwrap();
181 serialize(
182 rt.block_on(s3::remove_object(
183 &self.clients.read().unwrap()[actor],
184 &args.container_id,
185 &args.id,
186 ))
187 .map_or_else(
188 |e| BlobstoreResult {
189 success: false,
190 error: Some(e.to_string()),
191 },
192 |_| BlobstoreResult {
193 success: true,
194 error: None,
195 },
196 ),
197 )
198 }
199
200 fn get_object_info(
201 &self,
202 actor: &str,
203 args: GetObjectInfoArgs,
204 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
205 let rt = tokio::runtime::Runtime::new().unwrap();
206 let info = rt.block_on(s3::head_object(
207 &self.clients.read().unwrap()[actor],
208 &args.container_id,
209 &args.blob_id,
210 ));
211
212 serialize(&info.map_or_else(
213 |_| Blob {
214 id: "none".to_string(),
215 container: Container::new("none".to_string()),
216 byte_size: 0,
217 },
218 |ob| Blob {
219 id: args.blob_id.to_string(),
220 container: Container::new(args.container_id),
221 byte_size: ob.content_length.unwrap() as u64,
222 },
223 ))
224 }
225
226 fn list_objects(
227 &self,
228 actor: &str,
229 args: ListObjectsArgs,
230 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
231 let rt = tokio::runtime::Runtime::new().unwrap();
232 let objects = rt.block_on(s3::list_objects(
233 &self.clients.read().unwrap()[actor],
234 &args.container_id,
235 ))?;
236 let blobs = if let Some(v) = objects {
237 v.iter()
238 .map(|ob| Blob {
239 id: ob.key.clone().unwrap(),
240 container: Container::new(args.container_id.clone()),
241 byte_size: ob.size.unwrap() as u64,
242 })
243 .collect()
244 } else {
245 vec![]
246 };
247 serialize(&BlobList { blobs })
248 }
249
250 fn start_download(
251 &self,
252 actor: &str,
253 args: StartDownloadArgs,
254 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
255 let actor = actor.to_string();
256
257 let d = self.dispatcher.clone();
258 let c = self.clients.read().unwrap()[&actor].clone();
259 let container = args.container_id.to_string();
260 let chunk_size = args.chunk_size;
261 let id = args.blob_id.to_string();
262 let ctx = args.context.clone();
263
264 let byte_size = {
265 let rt = tokio::runtime::Runtime::new().unwrap();
266 let info = rt.block_on(s3::head_object(&c, &container, &id)).unwrap();
267 drop(rt);
268 info.content_length.unwrap() as u64
269 };
270
271 std::thread::spawn(move || {
272 let actor = actor.to_string();
273
274 let chunk_count = expected_chunks(byte_size, chunk_size);
275 let rt = tokio::runtime::Runtime::new().unwrap();
276 rt.block_on(async {
277 for idx in 0..chunk_count {
278 dispatch_chunk(
279 idx,
280 d.clone(),
281 c.clone(),
282 container.to_string(),
283 id.to_string(),
284 chunk_size,
285 byte_size,
286 actor.clone(),
287 ctx.clone(),
288 )
289 .await;
290 }
291 });
292 });
293
294 serialize(BlobstoreResult {
295 success: true,
296 error: None,
297 })
298 }
299}
300
301async fn dispatch_chunk(
302 idx: u64,
303 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
304 client: Arc<S3Client>,
305 container: String,
306 id: String,
307 chunk_size: u64,
308 byte_size: u64,
309 actor: String,
310 context: Option<String>,
311) {
312 let start = idx * chunk_size;
316 let mut end = start + chunk_size - 1;
317 if end > byte_size {
318 end = byte_size - 1;
319 }
320
321 let bytes = s3::get_blob_range(&client, &container, &id, start, end)
322 .await
323 .unwrap();
324
325 let fc = FileChunk {
326 sequence_no: idx + 1,
327 container: Container::new(container),
328 id,
329 chunk_size,
330 total_bytes: byte_size,
331 chunk_bytes: bytes,
332 context,
333 };
334 match dispatcher
335 .read()
336 .unwrap()
337 .dispatch(&actor, OP_RECEIVE_CHUNK, &serialize(&fc).unwrap())
338 {
339 Ok(_) => {}
340 Err(_) => error!("Failed to dispatch block to actor {}", actor),
341 }
342}
343
344impl CapabilityProvider for S3Provider {
345 fn configure_dispatch(
348 &self,
349 dispatcher: Box<dyn Dispatcher>,
350 ) -> Result<(), Box<dyn Error + Sync + Send>> {
351 trace!("Dispatcher received.");
352 let mut lock = self.dispatcher.write().unwrap();
353 *lock = dispatcher;
354
355 Ok(())
356 }
357
358 fn handle_call(
361 &self,
362 actor: &str,
363 op: &str,
364 msg: &[u8],
365 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
366 trace!("Received host call from {}, operation - {}", actor, op);
367
368 match op {
369 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
370 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
371 OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
372 OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
373 OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
374 OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
375 OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
376 OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
377 OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
378 OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
379
380 _ => Err("bad dispatch".into()),
381 }
382 }
383
384 fn stop(&self) {
386 ()
387 }
388}
389
390fn expected_chunks(total_bytes: u64, chunk_size: u64) -> u64 {
391 let mut chunks = total_bytes / chunk_size;
392 if total_bytes % chunk_size != 0 {
393 chunks = chunks + 1
394 }
395 chunks
396}
397
398fn upload_key(container: &str, blob_id: &str, actor: &str) -> String {
399 format!("{}-{}-{}", actor, container, blob_id)
400}
401
402#[cfg(test)]
403mod test {
404 use super::*;
405 use crossbeam_utils::sync::WaitGroup;
406 use std::collections::HashMap;
407
408 #[test]
418 fn test_create_and_remove_bucket() {
419 let provider = S3Provider::new();
420 provider.configure(gen_config("testar")).unwrap();
421 let container_args = CreateContainerArgs {
422 id: "addremovebucket".to_string(),
423 };
424 let res = provider.handle_call(
425 "testar",
426 OP_CREATE_CONTAINER,
427 &serialize(&container_args).unwrap(),
428 );
429 assert!(res.is_ok());
430 let remove_args = RemoveContainerArgs {
431 id: container_args.id,
432 };
433 let res2 = provider.handle_call(
434 "testar",
435 OP_REMOVE_CONTAINER,
436 &serialize(remove_args).unwrap(),
437 );
438 assert!(res2.is_ok());
439 }
440
441 #[test]
442 fn test_upload_and_download() {
443 let provider = S3Provider::new();
444 provider.configure(gen_config("testupanddown")).unwrap();
445 let wg = WaitGroup::new();
446 let dispatcher = Box::new(TestDispatcher::new(wg.clone(), expected_chunks(10427, 100)));
447 provider.configure_dispatch(dispatcher).unwrap();
448
449 let container_args = CreateContainerArgs {
450 id: "updownbucket".to_string(),
451 };
452 let _res = provider.handle_call(
453 "testupanddown",
454 OP_CREATE_CONTAINER,
455 &serialize(&container_args).unwrap(),
456 );
457
458 let mut data: Vec<u8> = Vec::new();
459 for _ in 0..10427 {
460 data.push(42);
461 }
462
463 let chunk_list: Vec<FileChunk> = data
464 .chunks(100)
465 .enumerate()
466 .map(|(idx, v)| FileChunk {
467 chunk_bytes: v.to_vec(),
468 chunk_size: 100,
469 container: Container::new("updownbucket".to_string()),
470 id: "updowntestfile".to_string(),
471 total_bytes: data.len() as u64,
472 sequence_no: idx as u64 + 1,
473 context: None,
474 })
475 .collect();
476
477 let first_chunk = FileChunk {
478 chunk_bytes: vec![],
479 chunk_size: 100,
480 container: Container::new("updownbucket".to_string()),
481 id: "updowntestfile".to_string(),
482 total_bytes: data.len() as u64,
483 sequence_no: 0,
484 context: None,
485 };
486
487 let _ = provider
488 .handle_call(
489 "testupanddown",
490 OP_START_UPLOAD,
491 &serialize(&first_chunk).unwrap(),
492 )
493 .unwrap();
494
495 for chunk in chunk_list {
496 let _ = provider
497 .handle_call("testupanddown", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap())
498 .unwrap();
499 }
500 let req = StartDownloadArgs {
501 chunk_size: 100,
502 container_id: "updownbucket".to_string(),
503 blob_id: "updowntestfile".to_string(),
504 context: Some("test1".to_string()),
505 };
506 let _ = provider
507 .handle_call(
508 "testupanddown",
509 OP_START_DOWNLOAD,
510 &serialize(&req).unwrap(),
511 )
512 .unwrap();
513
514 wg.wait();
515 assert!(true);
516 }
517
518 #[test]
519 fn test_upload() {
520 let provider = S3Provider::new();
521 provider.configure(gen_config("testupload")).unwrap();
522
523 let container_args = CreateContainerArgs {
524 id: "uploadbucket".to_string(),
525 };
526 let _res = provider.handle_call(
527 "testupload",
528 OP_CREATE_CONTAINER,
529 &serialize(&container_args).unwrap(),
530 );
531
532 let mut data: Vec<u8> = Vec::new();
533 for _ in 0..10427 {
534 data.push(42);
535 }
536
537 let chunk_list: Vec<FileChunk> = data
538 .chunks(100)
539 .enumerate()
540 .map(|(idx, v)| FileChunk {
541 chunk_bytes: v.to_vec(),
542 chunk_size: 100,
543 container: Container::new("uploadbucket".to_string()),
544 id: "testfile".to_string(),
545 total_bytes: data.len() as u64,
546 sequence_no: idx as u64 + 1,
547 context: None,
548 })
549 .collect();
550
551 let first_chunk = FileChunk {
552 chunk_bytes: vec![],
553 chunk_size: 100,
554 container: Container::new("uploadbucket".to_string()),
555 id: "testfile".to_string(),
556 total_bytes: data.len() as u64,
557 sequence_no: 0,
558 context: None,
559 };
560
561 let _ = provider.handle_call(
562 "testupload",
563 OP_START_UPLOAD,
564 &serialize(&first_chunk).unwrap(),
565 );
566
567 for chunk in chunk_list {
568 let _ = provider.handle_call("testupload", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap());
569 }
570
571 let list_objects = ListObjectsArgs {
572 container_id: container_args.id,
573 };
574
575 let list = provider
576 .handle_call(
577 "testupload",
578 OP_LIST_OBJECTS,
579 &serialize(&list_objects).unwrap(),
580 )
581 .unwrap();
582 let object_list: BlobList = deserialize(&list).unwrap();
583 assert_eq!(1, object_list.blobs.len());
584 assert_eq!("testfile", object_list.blobs[0].id);
585
586 let get_info_args = GetObjectInfoArgs {
587 container_id: "uploadbucket".to_string(),
588 blob_id: "testfile".to_string(),
589 };
590
591 let info = provider
592 .handle_call(
593 "testupload",
594 OP_GET_OBJECT_INFO,
595 &serialize(&get_info_args).unwrap(),
596 )
597 .unwrap();
598 let objinfo: Blob = deserialize(&info).unwrap();
599 assert_eq!(10427, objinfo.byte_size);
600 let remove_args = RemoveObjectArgs {
601 id: get_info_args.blob_id,
602 container_id: get_info_args.container_id,
603 };
604 let _ = provider
605 .handle_call(
606 "testupload",
607 OP_REMOVE_OBJECT,
608 &serialize(&remove_args).unwrap(),
609 )
610 .unwrap();
611 let remove_container_args = RemoveContainerArgs {
612 id: remove_args.container_id,
613 };
614 let _ = provider
615 .handle_call(
616 "testupload",
617 OP_REMOVE_CONTAINER,
618 &serialize(&remove_container_args).unwrap(),
619 )
620 .unwrap();
621 }
622
623 fn gen_config(module: &str) -> CapabilityConfiguration {
624 CapabilityConfiguration {
625 module: module.to_string(),
626 values: minio_config(),
627 }
628 }
629
630 fn minio_config() -> HashMap<String, String> {
631 let mut hm = HashMap::new();
632 hm.insert("ENDPOINT".to_string(), "http://localhost:9000".to_string());
633 hm.insert("REGION".to_string(), "us-east-1".to_string());
634 hm.insert("AWS_ACCESS_KEY".to_string(), "minioadmin".to_string());
635 hm.insert(
636 "AWS_SECRET_ACCESS_KEY".to_string(),
637 "minioadmin".to_string(),
638 );
639
640 hm
641 }
642
643 struct TestDispatcher {
644 chunks: RwLock<Vec<FileChunk>>,
645 wg: RwLock<Option<WaitGroup>>,
646 expected_chunks: u64,
647 }
648
649 impl TestDispatcher {
650 fn new(wg: WaitGroup, expected_chunks: u64) -> TestDispatcher {
651 TestDispatcher {
652 chunks: RwLock::new(vec![]),
653 wg: RwLock::new(Some(wg)),
654 expected_chunks,
655 }
656 }
657 }
658
659 impl Dispatcher for TestDispatcher {
660 fn dispatch(
661 &self,
662 _actor: &str,
663 _op: &str,
664 msg: &[u8],
665 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
666 let fc: FileChunk = deserialize(msg)?;
667 self.chunks.write().unwrap().push(fc);
668 if self.chunks.read().unwrap().len() == self.expected_chunks as usize {
669 *self.wg.write().unwrap() = None;
670 }
671 Ok(vec![])
672 }
673 }
674}