1#[macro_use]
7extern crate wascc_codec as codec;
8
9#[macro_use]
10extern crate log;
11
12use codec::capabilities::{
13 CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
14 OP_GET_CAPABILITY_DESCRIPTOR,
15};
16use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
17use codec::deserialize;
18use codec::{blobstore::*, serialize};
19use rusoto_s3::S3Client;
20use std::error::Error;
21use std::{
22 collections::HashMap,
23 sync::{Arc, RwLock},
24};
25use wascc_codec::core::CapabilityConfiguration;
26
27mod s3;
28
29#[cfg(not(feature = "static_plugin"))]
30capability_provider!(S3Provider, S3Provider::new);
31
32const CAPABILITY_ID: &str = "wascc:blobstore";
33const SYSTEM_ACTOR: &str = "system";
34const VERSION: &str = env!("CARGO_PKG_VERSION");
35const REVISION: u32 = 2; #[derive(Debug, PartialEq)]
38struct FileUpload {
39 container: String,
40 id: String,
41 total_bytes: u64,
42 expected_chunks: u64,
43 chunks: Vec<FileChunk>,
44}
45
46impl FileUpload {
47 pub fn is_complete(&self) -> bool {
48 self.chunks.len() == self.expected_chunks as usize
49 }
50}
51
52pub struct S3Provider {
54 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
55 clients: RwLock<HashMap<String, Arc<S3Client>>>,
56 uploads: RwLock<HashMap<String, FileUpload>>,
57}
58
59impl Default for S3Provider {
60 fn default() -> Self {
61 match env_logger::try_init() {
62 Ok(_) => {}
63 Err(_) => {}
64 }
65
66 S3Provider {
67 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
68 clients: RwLock::new(HashMap::new()),
69 uploads: RwLock::new(HashMap::new()),
70 }
71 }
72}
73
74impl S3Provider {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 fn configure(
81 &self,
82 config: CapabilityConfiguration,
83 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
84 self.clients.write().unwrap().insert(
85 config.module.clone(),
86 Arc::new(s3::client_for_config(&config)?),
87 );
88
89 Ok(vec![])
90 }
91 fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
92 self.clients.write().unwrap().remove(actor);
93
94 Ok(vec![])
95 }
96
97 fn create_container(
98 &self,
99 actor: &str,
100 container: Container,
101 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
102 let mut rt = tokio::runtime::Runtime::new().unwrap();
103 rt.block_on(s3::create_bucket(
104 &self.clients.read().unwrap()[actor],
105 &container.id,
106 ))?;
107
108 Ok(vec![])
109 }
110
111 fn remove_container(
112 &self,
113 actor: &str,
114 container: Container,
115 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
116 let mut rt = tokio::runtime::Runtime::new().unwrap();
117 rt.block_on(s3::remove_bucket(
118 &self.clients.read().unwrap()[actor],
119 &container.id,
120 ))?;
121
122 Ok(vec![])
123 }
124
125 fn upload_chunk(
126 &self,
127 actor: &str,
128 chunk: FileChunk,
129 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
130 let key = upload_key(&chunk.container, &chunk.id, &actor);
131 self.uploads
132 .write()
133 .unwrap()
134 .entry(key.clone())
135 .and_modify(|u| {
136 u.chunks.push(chunk);
137 });
138 let complete = self.uploads.read().unwrap()[&key].is_complete();
139 if complete {
140 let mut rt = tokio::runtime::Runtime::new().unwrap();
141 rt.block_on(s3::complete_upload(
142 &self.clients.read().unwrap()[actor],
143 &self.uploads.read().unwrap()[&key],
144 ))?;
145 self.uploads.write().unwrap().remove(&key);
146 }
147 Ok(vec![])
148 }
149
150 fn start_upload(
151 &self,
152 actor: &str,
153 chunk: FileChunk,
154 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
155 let key = upload_key(&chunk.container, &chunk.id, &actor);
156
157 let upload = FileUpload {
158 chunks: vec![],
159 container: chunk.container.to_string(),
160 id: chunk.id.to_string(),
161 total_bytes: chunk.total_bytes,
162 expected_chunks: expected_chunks(chunk.total_bytes, chunk.chunk_size),
163 };
164
165 self.uploads.write().unwrap().insert(key, upload);
166
167 Ok(vec![])
168 }
169
170 fn remove_object(
171 &self,
172 actor: &str,
173 blob: Blob,
174 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
175 let mut rt = tokio::runtime::Runtime::new().unwrap();
176 rt.block_on(s3::remove_object(
177 &self.clients.read().unwrap()[actor],
178 &blob.container,
179 &blob.id,
180 ))?;
181
182 Ok(vec![])
183 }
184
185 fn get_object_info(
186 &self,
187 actor: &str,
188 blob: Blob,
189 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
190 let mut rt = tokio::runtime::Runtime::new().unwrap();
191 let info = rt.block_on(s3::head_object(
192 &self.clients.read().unwrap()[actor],
193 &blob.container,
194 &blob.id,
195 ));
196
197 let blob = if let Ok(ob) = info {
198 Blob {
199 id: blob.id.to_string(),
200 container: blob.container.to_string(),
201 byte_size: ob.content_length.unwrap() as u64,
202 }
203 } else {
204 Blob {
205 id: "none".to_string(),
206 container: "none".to_string(),
207 byte_size: 0,
208 }
209 };
210
211 Ok(serialize(&blob)?)
212 }
213
214 fn list_objects(
215 &self,
216 actor: &str,
217 container: Container,
218 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
219 let mut rt = tokio::runtime::Runtime::new().unwrap();
220 let objects = rt.block_on(s3::list_objects(
221 &self.clients.read().unwrap()[actor],
222 &container.id,
223 ))?;
224 let blobs = if let Some(v) = objects {
225 v.iter()
226 .map(|ob| Blob {
227 id: ob.key.clone().unwrap(),
228 container: container.id.to_string(),
229 byte_size: ob.size.unwrap() as u64,
230 })
231 .collect()
232 } else {
233 vec![]
234 };
235 let bloblist = BlobList { blobs };
236 Ok(serialize(&bloblist)?)
237 }
238
239 fn start_download(
240 &self,
241 actor: &str,
242 request: StreamRequest,
243 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
244 let actor = actor.to_string();
245
246 let d = self.dispatcher.clone();
247 let c = self.clients.read().unwrap()[&actor].clone();
248 let container = request.container.to_string();
249 let chunk_size = request.chunk_size;
250 let id = request.id.to_string();
251 let ctx = request.context.clone();
252
253 let byte_size = {
254 let mut rt = tokio::runtime::Runtime::new().unwrap();
255 let info = rt.block_on(s3::head_object(&c, &container, &id)).unwrap();
256 drop(rt);
257 info.content_length.unwrap() as u64
258 };
259
260 std::thread::spawn(move || {
261 let actor = actor.to_string();
262
263 let chunk_count = expected_chunks(byte_size, chunk_size);
264 let mut rt = tokio::runtime::Runtime::new().unwrap();
265 rt.block_on(async {
266 for idx in 0..chunk_count {
267 dispatch_chunk(
268 idx,
269 d.clone(),
270 c.clone(),
271 container.to_string(),
272 id.to_string(),
273 chunk_size,
274 byte_size,
275 actor.clone(),
276 ctx.clone(),
277 )
278 .await;
279 }
280 });
281 });
282
283 Ok(vec![])
284 }
285
286 fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
287 use OperationDirection::{ToActor, ToProvider};
288 Ok(serialize(
289 CapabilityDescriptor::builder()
290 .id(CAPABILITY_ID)
291 .name("waSCC Blob Store Provider (S3)")
292 .long_description(
293 "A waSCC blob store capability provider exposing an S3 client to actors",
294 )
295 .version(VERSION)
296 .revision(REVISION)
297 .with_operation(
298 OP_CREATE_CONTAINER,
299 ToProvider,
300 "Creates a new container/bucket",
301 )
302 .with_operation(
303 OP_REMOVE_CONTAINER,
304 ToProvider,
305 "Removes a container/bucket",
306 )
307 .with_operation(
308 OP_LIST_OBJECTS,
309 ToProvider,
310 "Lists objects within a container",
311 )
312 .with_operation(
313 OP_UPLOAD_CHUNK,
314 ToProvider,
315 "Uploads a chunk of a blob to an item in a container. Must start upload first",
316 )
317 .with_operation(
318 OP_START_UPLOAD,
319 ToProvider,
320 "Starts the chunked upload of a blob",
321 )
322 .with_operation(
323 OP_START_DOWNLOAD,
324 ToProvider,
325 "Starts the chunked download of a blob",
326 )
327 .with_operation(
328 OP_GET_OBJECT_INFO,
329 ToProvider,
330 "Retrieves metadata about a blob",
331 )
332 .with_operation(
333 OP_RECEIVE_CHUNK,
334 ToActor,
335 "Receives a chunk of a blob for download",
336 )
337 .build(),
338 )?)
339 }
340}
341
342async fn dispatch_chunk(
343 idx: u64,
344 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
345 client: Arc<S3Client>,
346 container: String,
347 id: String,
348 chunk_size: u64,
349 byte_size: u64,
350 actor: String,
351 context: Option<String>,
352) {
353 let start = idx * chunk_size;
357 let mut end = start + chunk_size - 1;
358 if end > byte_size {
359 end = byte_size - 1;
360 }
361
362 let bytes = s3::get_blob_range(&client, &container, &id, start, end)
363 .await
364 .unwrap();
365
366 let fc = FileChunk {
367 sequence_no: idx + 1,
368 container,
369 id,
370 chunk_size,
371 total_bytes: byte_size,
372 chunk_bytes: bytes,
373 context,
374 };
375 match dispatcher
376 .read()
377 .unwrap()
378 .dispatch(&actor, OP_RECEIVE_CHUNK, &serialize(&fc).unwrap())
379 {
380 Ok(_) => {}
381 Err(_) => error!("Failed to dispatch block to actor {}", actor),
382 }
383}
384
385impl CapabilityProvider for S3Provider {
386 fn configure_dispatch(
389 &self,
390 dispatcher: Box<dyn Dispatcher>,
391 ) -> Result<(), Box<dyn Error + Sync + Send>> {
392 trace!("Dispatcher received.");
393 let mut lock = self.dispatcher.write().unwrap();
394 *lock = dispatcher;
395
396 Ok(())
397 }
398
399 fn handle_call(
402 &self,
403 actor: &str,
404 op: &str,
405 msg: &[u8],
406 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
407 trace!("Received host call from {}, operation - {}", actor, op);
408
409 match op {
410 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
411 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
412 OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
413 OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
414 OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
415 OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
416 OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
417 OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
418 OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
419 OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
420 OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
421
422 _ => Err("bad dispatch".into()),
423 }
424 }
425}
426
427fn expected_chunks(total_bytes: u64, chunk_size: u64) -> u64 {
428 let mut chunks = total_bytes / chunk_size;
429 if total_bytes % chunk_size != 0 {
430 chunks = chunks + 1
431 }
432 chunks
433}
434
435fn upload_key(container: &str, blob_id: &str, actor: &str) -> String {
436 format!("{}-{}-{}", actor, container, blob_id)
437}
438
439#[cfg(test)]
440mod test {
441 use super::*;
442 use crossbeam_utils::sync::WaitGroup;
443 use std::collections::HashMap;
444
445 #[test]
450 fn test_create_and_remove_bucket() {
451 let provider = S3Provider::new();
452 provider.configure(gen_config("testar")).unwrap();
453 let container = Container {
454 id: "addremovebucket".to_string(),
455 };
456 let res = provider.handle_call(
457 "testar",
458 OP_CREATE_CONTAINER,
459 &serialize(&container).unwrap(),
460 );
461 assert!(res.is_ok());
462 let res2 = provider.handle_call(
463 "testar",
464 OP_REMOVE_CONTAINER,
465 &serialize(container).unwrap(),
466 );
467 assert!(res2.is_ok());
468 }
469
470 #[test]
471 fn test_upload_and_download() {
472 let provider = S3Provider::new();
473 provider.configure(gen_config("testupanddown")).unwrap();
474 let wg = WaitGroup::new();
475 let dispatcher = Box::new(TestDispatcher::new(wg.clone(), expected_chunks(10427, 100)));
476 provider.configure_dispatch(dispatcher).unwrap();
477
478 let container = Container {
479 id: "updownbucket".to_string(),
480 };
481 let _res = provider.handle_call(
482 "testupanddown",
483 OP_CREATE_CONTAINER,
484 &serialize(&container).unwrap(),
485 );
486
487 let mut data: Vec<u8> = Vec::new();
488 for _ in 0..10427 {
489 data.push(42);
490 }
491
492 let chunk_list: Vec<FileChunk> = data
493 .chunks(100)
494 .enumerate()
495 .map(|(idx, v)| FileChunk {
496 chunk_bytes: v.to_vec(),
497 chunk_size: 100,
498 container: "updownbucket".to_string(),
499 id: "updowntestfile".to_string(),
500 total_bytes: data.len() as u64,
501 sequence_no: idx as u64 + 1,
502 context: None,
503 })
504 .collect();
505
506 let first_chunk = FileChunk {
507 chunk_bytes: vec![],
508 chunk_size: 100,
509 container: "updownbucket".to_string(),
510 id: "updowntestfile".to_string(),
511 total_bytes: data.len() as u64,
512 sequence_no: 0,
513 context: None,
514 };
515
516 let _ = provider
517 .handle_call(
518 "testupanddown",
519 OP_START_UPLOAD,
520 &serialize(&first_chunk).unwrap(),
521 )
522 .unwrap();
523
524 for chunk in chunk_list {
525 let _ = provider
526 .handle_call("testupanddown", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap())
527 .unwrap();
528 }
529 let req = StreamRequest {
530 chunk_size: 100,
531 container: "updownbucket".to_string(),
532 id: "updowntestfile".to_string(),
533 context: Some("test1".to_string()),
534 };
535 let _ = provider
536 .handle_call(
537 "testupanddown",
538 OP_START_DOWNLOAD,
539 &serialize(&req).unwrap(),
540 )
541 .unwrap();
542
543 wg.wait();
544 assert!(true);
545 }
546
547 #[test]
548 fn test_upload() {
549 let provider = S3Provider::new();
550 provider.configure(gen_config("testupload")).unwrap();
551
552 let container = Container {
553 id: "uploadbucket".to_string(),
554 };
555 let _res = provider.handle_call(
556 "testupload",
557 OP_CREATE_CONTAINER,
558 &serialize(&container).unwrap(),
559 );
560
561 let mut data: Vec<u8> = Vec::new();
562 for _ in 0..10427 {
563 data.push(42);
564 }
565
566 let chunk_list: Vec<FileChunk> = data
567 .chunks(100)
568 .enumerate()
569 .map(|(idx, v)| FileChunk {
570 chunk_bytes: v.to_vec(),
571 chunk_size: 100,
572 container: "uploadbucket".to_string(),
573 id: "testfile".to_string(),
574 total_bytes: data.len() as u64,
575 sequence_no: idx as u64 + 1,
576 context: None,
577 })
578 .collect();
579
580 let first_chunk = FileChunk {
581 chunk_bytes: vec![],
582 chunk_size: 100,
583 container: "uploadbucket".to_string(),
584 id: "testfile".to_string(),
585 total_bytes: data.len() as u64,
586 sequence_no: 0,
587 context: None,
588 };
589
590 let _ = provider.handle_call(
591 "testupload",
592 OP_START_UPLOAD,
593 &serialize(&first_chunk).unwrap(),
594 );
595
596 for chunk in chunk_list {
597 let _ = provider.handle_call("testupload", OP_UPLOAD_CHUNK, &serialize(chunk).unwrap());
598 }
599
600 let list = provider
601 .handle_call(
602 "testupload",
603 OP_LIST_OBJECTS,
604 &serialize(&container).unwrap(),
605 )
606 .unwrap();
607 let object_list: BlobList = deserialize(&list).unwrap();
608 assert_eq!(1, object_list.blobs.len());
609 assert_eq!("testfile", object_list.blobs[0].id);
610
611 let blob = Blob {
612 container: "uploadbucket".to_string(),
613 id: "testfile".to_string(),
614 byte_size: 0,
615 };
616
617 let info = provider
618 .handle_call("testupload", OP_GET_OBJECT_INFO, &serialize(&blob).unwrap())
619 .unwrap();
620 let objinfo: Blob = deserialize(&info).unwrap();
621 assert_eq!(10427, objinfo.byte_size);
622 let _ = provider
623 .handle_call("testupload", OP_REMOVE_OBJECT, &serialize(&blob).unwrap())
624 .unwrap();
625 let _ = provider
626 .handle_call(
627 "testupload",
628 OP_REMOVE_CONTAINER,
629 &serialize(&container).unwrap(),
630 )
631 .unwrap();
632 }
633
634 fn gen_config(module: &str) -> CapabilityConfiguration {
635 CapabilityConfiguration {
636 module: module.to_string(),
637 values: minio_config(),
638 }
639 }
640
641 fn minio_config() -> HashMap<String, String> {
642 let mut hm = HashMap::new();
643 hm.insert("ENDPOINT".to_string(), "http://localhost:9000".to_string());
644 hm.insert("REGION".to_string(), "us-east-1".to_string());
645 hm.insert("AWS_ACCESS_KEY".to_string(), "minioadmin".to_string());
646 hm.insert(
647 "AWS_SECRET_ACCESS_KEY".to_string(),
648 "minioadmin".to_string(),
649 );
650
651 hm
652 }
653
654 struct TestDispatcher {
655 chunks: RwLock<Vec<FileChunk>>,
656 wg: RwLock<Option<WaitGroup>>,
657 expected_chunks: u64,
658 }
659
660 impl TestDispatcher {
661 fn new(wg: WaitGroup, expected_chunks: u64) -> TestDispatcher {
662 TestDispatcher {
663 chunks: RwLock::new(vec![]),
664 wg: RwLock::new(Some(wg)),
665 expected_chunks,
666 }
667 }
668 }
669
670 impl Dispatcher for TestDispatcher {
671 fn dispatch(
672 &self,
673 _actor: &str,
674 _op: &str,
675 msg: &[u8],
676 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
677 let fc: FileChunk = deserialize(msg)?;
678 self.chunks.write().unwrap().push(fc);
679 if self.chunks.read().unwrap().len() == self.expected_chunks as usize {
680 *self.wg.write().unwrap() = None;
681 }
682 Ok(vec![])
683 }
684 }
685}