1#[macro_use]
2extern crate wascc_codec as codec;
3
4#[macro_use]
5extern crate log;
6
7use chunks::Chunks;
8use codec::blobstore::*;
9use codec::capabilities::{
10 CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
11 OP_GET_CAPABILITY_DESCRIPTOR,
12};
13use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
14use codec::{deserialize, serialize};
15use std::collections::HashMap;
16use std::error::Error;
17use std::io::Write;
18use std::{
19 fs::OpenOptions,
20 path::{Path, PathBuf},
21 sync::{Arc, RwLock},
22};
23use wascc_codec::core::CapabilityConfiguration;
24
25mod chunks;
26
27#[cfg(not(feature = "static_plugin"))]
28capability_provider!(FileSystemProvider, FileSystemProvider::new);
29
30const CAPABILITY_ID: &str = "wascc:blobstore";
31const SYSTEM_ACTOR: &str = "system";
32const FIRST_SEQ_NBR: u64 = 0;
33const VERSION: &str = env!("CARGO_PKG_VERSION");
34const REVISION: u32 = 3; pub struct FileSystemProvider {
37 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
38 rootdir: RwLock<PathBuf>,
39 upload_chunks: RwLock<HashMap<String, (u64, Vec<FileChunk>)>>,
40}
41
42impl Default for FileSystemProvider {
43 fn default() -> Self {
44 let _ = env_logger::builder().format_module_path(false).try_init();
45
46 FileSystemProvider {
47 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
48 rootdir: RwLock::new(PathBuf::new()),
49 upload_chunks: RwLock::new(HashMap::new()),
50 }
51 }
52}
53
54impl FileSystemProvider {
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 fn configure(
60 &self,
61 config: CapabilityConfiguration,
62 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
63 let mut lock = self.rootdir.write().unwrap();
64 let root_dir = config.values["ROOT"].clone();
65 info!("File System Blob Store Container Root: '{}'", root_dir);
66 *lock = PathBuf::from(root_dir);
67
68 Ok(vec![])
69 }
70
71 fn create_container(
72 &self,
73 _actor: &str,
74 container: Container,
75 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
76 let container = sanitize_container(&container);
77 let cdir = self.container_to_path(&container);
78 std::fs::create_dir_all(cdir)?;
79 Ok(serialize(&container)?)
80 }
81
82 fn remove_container(
83 &self,
84 _actor: &str,
85 container: Container,
86 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
87 let container = sanitize_container(&container);
88 let cdir = self.container_to_path(&container);
89 std::fs::remove_dir(cdir)?;
90 Ok(vec![])
91 }
92
93 fn start_upload(
94 &self,
95 _actor: &str,
96 blob: FileChunk,
97 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
98 let blob = Blob {
99 byte_size: 0,
100 id: blob.id,
101 container: blob.container,
102 };
103 let blob = sanitize_blob(&blob);
104 info!("Starting upload: {}/{}", blob.container, blob.id);
105 let bfile = self.blob_to_path(&blob);
106 std::fs::write(bfile, &[])?;
107 Ok(vec![])
108 }
109
110 fn remove_object(
111 &self,
112 _actor: &str,
113 blob: Blob,
114 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
115 let blob = sanitize_blob(&blob);
116 let bfile = self.blob_to_path(&blob);
117 std::fs::remove_file(&bfile)?;
118 Ok(vec![])
119 }
120
121 fn get_object_info(
122 &self,
123 _actor: &str,
124 blob: Blob,
125 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
126 let blob = sanitize_blob(&blob);
127 let bfile = self.blob_to_path(&blob);
128 let blob: Blob = if bfile.exists() {
129 Blob {
130 id: blob.id,
131 container: blob.container,
132 byte_size: bfile.metadata().unwrap().len(),
133 }
134 } else {
135 Blob {
136 id: "none".to_string(),
137 container: "none".to_string(),
138 byte_size: 0,
139 }
140 };
141 Ok(serialize(&blob)?)
142 }
143
144 fn list_objects(
145 &self,
146 _actor: &str,
147 container: Container,
148 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
149 let container = sanitize_container(&container);
150 let cpath = self.container_to_path(&container);
151 let (blobs, _errors): (Vec<_>, Vec<_>) = std::fs::read_dir(&cpath)?
152 .map(|e| {
153 e.map(|e| Blob {
154 id: e.file_name().into_string().unwrap(),
155 container: container.id.to_string(),
156 byte_size: e.metadata().unwrap().len(),
157 })
158 })
159 .partition(Result::is_ok);
160 let blobs = blobs.into_iter().map(Result::unwrap).collect();
161 let bloblist = BlobList { blobs };
162 Ok(serialize(&bloblist)?)
163 }
164
165 fn upload_chunk(
166 &self,
167 actor: &str,
168 chunk: FileChunk,
169 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
170 let mut upload_chunks = self.upload_chunks.write().unwrap();
171 let key = actor.to_string() + &sanitize_id(&chunk.container) + &sanitize_id(&chunk.id);
172 let total_chunk_count = chunk.total_bytes / chunk.chunk_size;
173
174 let (expected_sequence_no, chunks) = upload_chunks
175 .entry(key.clone())
176 .or_insert((FIRST_SEQ_NBR, vec![]));
177 chunks.push(chunk);
178
179 while let Some(i) = chunks
180 .iter()
181 .position(|fc| fc.sequence_no == *expected_sequence_no)
182 {
183 let chunk = chunks.get(i).unwrap();
184 let bpath = Path::join(
185 &Path::join(&self.rootdir.read().unwrap(), sanitize_id(&chunk.container)),
186 sanitize_id(&chunk.id),
187 );
188 let mut file = OpenOptions::new().create(false).append(true).open(bpath)?;
189 info!(
190 "Receiving file chunk: {} for {}/{}",
191 chunk.sequence_no, chunk.container, chunk.id
192 );
193
194 let count = file.write(chunk.chunk_bytes.as_ref())?;
195 if count != chunk.chunk_bytes.len() {
196 let msg = format!(
197 "Failed to fully write chunk: {} of {} bytes",
198 count,
199 chunk.chunk_bytes.len()
200 );
201 error!("{}", &msg);
202 return Err(msg.into());
203 }
204
205 chunks.remove(i);
206 *expected_sequence_no += 1;
207 }
208
209 if *expected_sequence_no == total_chunk_count {
210 upload_chunks.remove(&key);
211 }
212
213 Ok(vec![])
214 }
215
216 fn start_download(
217 &self,
218 actor: &str,
219 request: StreamRequest,
220 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
221 info!("Received request to start download : {:?}", request);
222 let actor = actor.to_string();
223 let bpath = Path::join(
224 &Path::join(
225 &self.rootdir.read().unwrap(),
226 sanitize_id(&request.container),
227 ),
228 sanitize_id(&request.id),
229 );
230 let byte_size = &bpath.metadata()?.len();
231 let bfile = std::fs::File::open(bpath)?;
232 let chunk_size = if request.chunk_size == 0 {
233 chunks::DEFAULT_CHUNK_SIZE
234 } else {
235 request.chunk_size as usize
236 };
237 let xfer = Transfer {
238 blob_id: sanitize_id(&request.id),
239 container: sanitize_id(&request.container),
240 total_size: *byte_size,
241 chunk_size: chunk_size as _,
242 total_chunks: *byte_size / chunk_size as u64,
243 context: request.context,
244 };
245 let iter = Chunks::new(bfile, chunk_size);
246 let d = self.dispatcher.clone();
247 std::thread::spawn(move || {
248 iter.enumerate().for_each(|(i, chunk)| {
249 dispatch_chunk(&xfer, &actor, i, d.clone(), chunk);
250 });
251 });
252
253 Ok(vec![])
254 }
255
256 fn blob_to_path(&self, blob: &Blob) -> PathBuf {
257 let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.to_string());
258 Path::join(&cdir, blob.id.to_string())
259 }
260
261 fn container_to_path(&self, container: &Container) -> PathBuf {
262 Path::join(&self.rootdir.read().unwrap(), container.id.to_string())
263 }
264
265 fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
266 use OperationDirection::{ToActor, ToProvider};
267 Ok(serialize(
268 CapabilityDescriptor::builder()
269 .id(CAPABILITY_ID)
270 .name("waSCC Blob Store Provider (Disk/File System)")
271 .long_description(
272 "A waSCC blob store capability provider exposing a file system to actors",
273 )
274 .version(VERSION)
275 .revision(REVISION)
276 .with_operation(
277 OP_CREATE_CONTAINER,
278 ToProvider,
279 "Creates a new container/bucket",
280 )
281 .with_operation(
282 OP_REMOVE_CONTAINER,
283 ToProvider,
284 "Removes a container/bucket",
285 )
286 .with_operation(
287 OP_LIST_OBJECTS,
288 ToProvider,
289 "Lists objects within a container",
290 )
291 .with_operation(
292 OP_UPLOAD_CHUNK,
293 ToProvider,
294 "Uploads a chunk of a blob to an item in a container. Must start upload first",
295 )
296 .with_operation(
297 OP_START_UPLOAD,
298 ToProvider,
299 "Starts the chunked upload of a blob",
300 )
301 .with_operation(
302 OP_START_DOWNLOAD,
303 ToProvider,
304 "Starts the chunked download of a blob",
305 )
306 .with_operation(
307 OP_GET_OBJECT_INFO,
308 ToProvider,
309 "Retrieves metadata about a blob",
310 )
311 .with_operation(
312 OP_RECEIVE_CHUNK,
313 ToActor,
314 "Receives a chunk of a blob for download",
315 )
316 .build(),
317 )?)
318 }
319}
320fn sanitize_container(container: &Container) -> Container {
321 Container {
322 id: sanitize_id(&container.id),
323 }
324}
325fn sanitize_blob(blob: &Blob) -> Blob {
326 Blob {
327 id: sanitize_id(&blob.id),
328 byte_size: blob.byte_size,
329 container: sanitize_id(&blob.container),
330 }
331}
332
333fn sanitize_id(id: &str) -> String {
334 let bad_prefixes: &[_] = &['/', '.'];
335 let s = id.trim_start_matches(bad_prefixes);
336 let s = s.replace("..", "");
337 s.replace("/", "_")
338}
339
340fn dispatch_chunk(
341 xfer: &Transfer,
342 actor: &str,
343 i: usize,
344 d: Arc<RwLock<Box<dyn Dispatcher>>>,
345 chunk: Result<Vec<u8>, std::io::Error>,
346) {
347 if let Ok(chunk) = chunk {
348 let fc = FileChunk {
349 sequence_no: i as u64,
350 container: xfer.container.to_string(),
351 id: xfer.blob_id.to_string(),
352 chunk_bytes: chunk,
353 chunk_size: xfer.chunk_size,
354 total_bytes: xfer.total_size,
355 context: xfer.context.clone(),
356 };
357 let buf = serialize(&fc).unwrap();
358 let _ = d.read().unwrap().dispatch(actor, OP_RECEIVE_CHUNK, &buf);
359 }
360}
361
362impl CapabilityProvider for FileSystemProvider {
363 fn configure_dispatch(
366 &self,
367 dispatcher: Box<dyn Dispatcher>,
368 ) -> Result<(), Box<dyn Error + Sync + Send>> {
369 trace!("Dispatcher received.");
370 let mut lock = self.dispatcher.write().unwrap();
371 *lock = dispatcher;
372
373 Ok(())
374 }
375
376 fn handle_call(
379 &self,
380 actor: &str,
381 op: &str,
382 msg: &[u8],
383 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
384 trace!("Received host call from {}, operation - {}", actor, op);
385
386 match op {
387 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
388 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => Ok(vec![]),
389 OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
390 OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
391 OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
392 OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
393 OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
394 OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
395 OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
396 OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
397 OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
398 _ => Err("bad dispatch".into()),
399 }
400 }
401}
402
403#[cfg(test)]
404#[allow(unused_imports)]
405mod tests {
406 use super::{sanitize_blob, sanitize_container};
407 use crate::FileSystemProvider;
408 use codec::blobstore::{Blob, Container};
409 use std::collections::HashMap;
410 use std::env::temp_dir;
411 use std::fs::File;
412 use std::io::{BufReader, Read};
413 use std::path::{Path, PathBuf};
414 use wascc_codec::blobstore::FileChunk;
415 use wascc_codec::core::CapabilityConfiguration;
416
417 #[test]
418 fn no_hacky_hacky() {
419 let container = Container {
420 id: "/etc/h4x0rd".to_string(),
421 };
422 let blob = Blob {
423 byte_size: 0,
424 id: "../passwd".to_string(),
425 container: "/etc/h4x0rd".to_string(),
426 };
427 let c = sanitize_container(&container);
428 let b = sanitize_blob(&blob);
429
430 assert_eq!(c.id, "etc_h4x0rd");
433 assert_eq!(b.id, "passwd");
434 assert_eq!(b.container, "etc_h4x0rd");
435 }
436
437 #[test]
438 fn test_start_upload() {
439 let actor = "actor1";
440 let container = "container".to_string();
441 let id = "blob".to_string();
442
443 let fs = FileSystemProvider::new();
444 let root_dir = setup_test_start_upload(&fs);
445 let upload_dir = Path::join(&root_dir, &container);
446 let bpath = create_dir(&upload_dir, &id);
447
448 let total_bytes = 6;
449 let chunk_size = 2;
450
451 let chunk1 = FileChunk {
452 sequence_no: 0,
453 container: container.clone(),
454 id: id.clone(),
455 total_bytes,
456 chunk_size,
457 chunk_bytes: vec![1, 1],
458 context: None,
459 };
460 let chunk2 = FileChunk {
461 sequence_no: 1,
462 container: container.clone(),
463 id: id.clone(),
464 total_bytes,
465 chunk_size,
466 chunk_bytes: vec![2, 2],
467 context: None,
468 };
469 let chunk3 = FileChunk {
470 sequence_no: 2,
471 container: container.clone(),
472 id: id.clone(),
473 total_bytes,
474 chunk_size,
475 chunk_bytes: vec![3],
476 context: None,
477 };
478 let chunk4 = FileChunk {
479 sequence_no: 3,
480 container: container.clone(),
481 id: id.clone(),
482 total_bytes,
483 chunk_size,
484 chunk_bytes: vec![3],
485 context: None,
486 };
487
488 assert!(fs.upload_chunk(actor, chunk1).is_ok());
489 assert!(fs.upload_chunk(actor, chunk2).is_ok());
490 assert!(fs.upload_chunk(actor, chunk3).is_ok());
491 assert!(fs.upload_chunk(actor, chunk4).is_ok());
492
493 let mut reader = BufReader::new(File::open(&bpath).unwrap());
495 let mut buffer = [0; 5];
496
497 teardown_test_start_upload(&bpath, &upload_dir);
498
499 assert!(reader.read(&mut buffer).is_ok());
500 assert_eq!(vec![1, 1, 2, 2, 3], buffer);
501 assert_eq!(1, fs.upload_chunks.read().unwrap().len());
504 }
505
506 #[allow(dead_code)]
507 fn setup_test_start_upload(fs: &FileSystemProvider) -> PathBuf {
508 let mut config = HashMap::new();
509 let root_dir = temp_dir();
510
511 config.insert("ROOT".to_string(), String::from(root_dir.to_str().unwrap()));
512 fs.configure(CapabilityConfiguration {
513 module: "test_start_upload-module".to_string(),
514 values: config,
515 })
516 .unwrap();
517
518 root_dir
519 }
520
521 #[allow(dead_code)]
522 fn teardown_test_start_upload(file: &PathBuf, upload_dir: &PathBuf) {
523 std::fs::remove_file(file).unwrap();
524 std::fs::remove_dir_all(upload_dir).unwrap();
525 }
526
527 #[allow(dead_code)]
528 fn create_dir(dir: &PathBuf, id: &String) -> PathBuf {
529 let bpath = Path::join(&dir, &id);
530 let _res = std::fs::create_dir(&dir);
531 drop(File::create(&bpath).unwrap());
532 bpath
533 }
534}