wascc_fs/
lib.rs

1#[macro_use]
2extern crate wascc_codec as codec;
3
4#[macro_use]
5extern crate log;
6
7use chunks::Chunks;
8use codec::blobstore::*;
9use codec::capabilities::{
10    CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
11    OP_GET_CAPABILITY_DESCRIPTOR,
12};
13use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
14use codec::{deserialize, serialize};
15use std::collections::HashMap;
16use std::error::Error;
17use std::io::Write;
18use std::{
19    fs::OpenOptions,
20    path::{Path, PathBuf},
21    sync::{Arc, RwLock},
22};
23use wascc_codec::core::CapabilityConfiguration;
24
25mod chunks;
26
27#[cfg(not(feature = "static_plugin"))]
28capability_provider!(FileSystemProvider, FileSystemProvider::new);
29
30const CAPABILITY_ID: &str = "wascc:blobstore";
31const SYSTEM_ACTOR: &str = "system";
32const FIRST_SEQ_NBR: u64 = 0;
33const VERSION: &str = env!("CARGO_PKG_VERSION");
34const REVISION: u32 = 3; // Increment for each crates publish
35
36pub struct FileSystemProvider {
37    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
38    rootdir: RwLock<PathBuf>,
39    upload_chunks: RwLock<HashMap<String, (u64, Vec<FileChunk>)>>,
40}
41
42impl Default for FileSystemProvider {
43    fn default() -> Self {
44        let _ = env_logger::builder().format_module_path(false).try_init();
45
46        FileSystemProvider {
47            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
48            rootdir: RwLock::new(PathBuf::new()),
49            upload_chunks: RwLock::new(HashMap::new()),
50        }
51    }
52}
53
54impl FileSystemProvider {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    fn configure(
60        &self,
61        config: CapabilityConfiguration,
62    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
63        let mut lock = self.rootdir.write().unwrap();
64        let root_dir = config.values["ROOT"].clone();
65        info!("File System Blob Store Container Root: '{}'", root_dir);
66        *lock = PathBuf::from(root_dir);
67
68        Ok(vec![])
69    }
70
71    fn create_container(
72        &self,
73        _actor: &str,
74        container: Container,
75    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
76        let container = sanitize_container(&container);
77        let cdir = self.container_to_path(&container);
78        std::fs::create_dir_all(cdir)?;
79        Ok(serialize(&container)?)
80    }
81
82    fn remove_container(
83        &self,
84        _actor: &str,
85        container: Container,
86    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
87        let container = sanitize_container(&container);
88        let cdir = self.container_to_path(&container);
89        std::fs::remove_dir(cdir)?;
90        Ok(vec![])
91    }
92
93    fn start_upload(
94        &self,
95        _actor: &str,
96        blob: FileChunk,
97    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
98        let blob = Blob {
99            byte_size: 0,
100            id: blob.id,
101            container: blob.container,
102        };
103        let blob = sanitize_blob(&blob);
104        info!("Starting upload: {}/{}", blob.container, blob.id);
105        let bfile = self.blob_to_path(&blob);
106        std::fs::write(bfile, &[])?;
107        Ok(vec![])
108    }
109
110    fn remove_object(
111        &self,
112        _actor: &str,
113        blob: Blob,
114    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
115        let blob = sanitize_blob(&blob);
116        let bfile = self.blob_to_path(&blob);
117        std::fs::remove_file(&bfile)?;
118        Ok(vec![])
119    }
120
121    fn get_object_info(
122        &self,
123        _actor: &str,
124        blob: Blob,
125    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
126        let blob = sanitize_blob(&blob);
127        let bfile = self.blob_to_path(&blob);
128        let blob: Blob = if bfile.exists() {
129            Blob {
130                id: blob.id,
131                container: blob.container,
132                byte_size: bfile.metadata().unwrap().len(),
133            }
134        } else {
135            Blob {
136                id: "none".to_string(),
137                container: "none".to_string(),
138                byte_size: 0,
139            }
140        };
141        Ok(serialize(&blob)?)
142    }
143
144    fn list_objects(
145        &self,
146        _actor: &str,
147        container: Container,
148    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
149        let container = sanitize_container(&container);
150        let cpath = self.container_to_path(&container);
151        let (blobs, _errors): (Vec<_>, Vec<_>) = std::fs::read_dir(&cpath)?
152            .map(|e| {
153                e.map(|e| Blob {
154                    id: e.file_name().into_string().unwrap(),
155                    container: container.id.to_string(),
156                    byte_size: e.metadata().unwrap().len(),
157                })
158            })
159            .partition(Result::is_ok);
160        let blobs = blobs.into_iter().map(Result::unwrap).collect();
161        let bloblist = BlobList { blobs };
162        Ok(serialize(&bloblist)?)
163    }
164
165    fn upload_chunk(
166        &self,
167        actor: &str,
168        chunk: FileChunk,
169    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
170        let mut upload_chunks = self.upload_chunks.write().unwrap();
171        let key = actor.to_string() + &sanitize_id(&chunk.container) + &sanitize_id(&chunk.id);
172        let total_chunk_count = chunk.total_bytes / chunk.chunk_size;
173
174        let (expected_sequence_no, chunks) = upload_chunks
175            .entry(key.clone())
176            .or_insert((FIRST_SEQ_NBR, vec![]));
177        chunks.push(chunk);
178
179        while let Some(i) = chunks
180            .iter()
181            .position(|fc| fc.sequence_no == *expected_sequence_no)
182        {
183            let chunk = chunks.get(i).unwrap();
184            let bpath = Path::join(
185                &Path::join(&self.rootdir.read().unwrap(), sanitize_id(&chunk.container)),
186                sanitize_id(&chunk.id),
187            );
188            let mut file = OpenOptions::new().create(false).append(true).open(bpath)?;
189            info!(
190                "Receiving file chunk: {} for {}/{}",
191                chunk.sequence_no, chunk.container, chunk.id
192            );
193
194            let count = file.write(chunk.chunk_bytes.as_ref())?;
195            if count != chunk.chunk_bytes.len() {
196                let msg = format!(
197                    "Failed to fully write chunk: {} of {} bytes",
198                    count,
199                    chunk.chunk_bytes.len()
200                );
201                error!("{}", &msg);
202                return Err(msg.into());
203            }
204
205            chunks.remove(i);
206            *expected_sequence_no += 1;
207        }
208
209        if *expected_sequence_no == total_chunk_count {
210            upload_chunks.remove(&key);
211        }
212
213        Ok(vec![])
214    }
215
216    fn start_download(
217        &self,
218        actor: &str,
219        request: StreamRequest,
220    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
221        info!("Received request to start download : {:?}", request);
222        let actor = actor.to_string();
223        let bpath = Path::join(
224            &Path::join(
225                &self.rootdir.read().unwrap(),
226                sanitize_id(&request.container),
227            ),
228            sanitize_id(&request.id),
229        );
230        let byte_size = &bpath.metadata()?.len();
231        let bfile = std::fs::File::open(bpath)?;
232        let chunk_size = if request.chunk_size == 0 {
233            chunks::DEFAULT_CHUNK_SIZE
234        } else {
235            request.chunk_size as usize
236        };
237        let xfer = Transfer {
238            blob_id: sanitize_id(&request.id),
239            container: sanitize_id(&request.container),
240            total_size: *byte_size,
241            chunk_size: chunk_size as _,
242            total_chunks: *byte_size / chunk_size as u64,
243            context: request.context,
244        };
245        let iter = Chunks::new(bfile, chunk_size);
246        let d = self.dispatcher.clone();
247        std::thread::spawn(move || {
248            iter.enumerate().for_each(|(i, chunk)| {
249                dispatch_chunk(&xfer, &actor, i, d.clone(), chunk);
250            });
251        });
252
253        Ok(vec![])
254    }
255
256    fn blob_to_path(&self, blob: &Blob) -> PathBuf {
257        let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.to_string());
258        Path::join(&cdir, blob.id.to_string())
259    }
260
261    fn container_to_path(&self, container: &Container) -> PathBuf {
262        Path::join(&self.rootdir.read().unwrap(), container.id.to_string())
263    }
264
265    fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
266        use OperationDirection::{ToActor, ToProvider};
267        Ok(serialize(
268            CapabilityDescriptor::builder()
269                .id(CAPABILITY_ID)
270                .name("waSCC Blob Store Provider (Disk/File System)")
271                .long_description(
272                    "A waSCC blob store capability provider exposing a file system to actors",
273                )
274                .version(VERSION)
275                .revision(REVISION)
276                .with_operation(
277                    OP_CREATE_CONTAINER,
278                    ToProvider,
279                    "Creates a new container/bucket",
280                )
281                .with_operation(
282                    OP_REMOVE_CONTAINER,
283                    ToProvider,
284                    "Removes a container/bucket",
285                )
286                .with_operation(
287                    OP_LIST_OBJECTS,
288                    ToProvider,
289                    "Lists objects within a container",
290                )
291                .with_operation(
292                    OP_UPLOAD_CHUNK,
293                    ToProvider,
294                    "Uploads a chunk of a blob to an item in a container. Must start upload first",
295                )
296                .with_operation(
297                    OP_START_UPLOAD,
298                    ToProvider,
299                    "Starts the chunked upload of a blob",
300                )
301                .with_operation(
302                    OP_START_DOWNLOAD,
303                    ToProvider,
304                    "Starts the chunked download of a blob",
305                )
306                .with_operation(
307                    OP_GET_OBJECT_INFO,
308                    ToProvider,
309                    "Retrieves metadata about a blob",
310                )
311                .with_operation(
312                    OP_RECEIVE_CHUNK,
313                    ToActor,
314                    "Receives a chunk of a blob for download",
315                )
316                .build(),
317        )?)
318    }
319}
320fn sanitize_container(container: &Container) -> Container {
321    Container {
322        id: sanitize_id(&container.id),
323    }
324}
325fn sanitize_blob(blob: &Blob) -> Blob {
326    Blob {
327        id: sanitize_id(&blob.id),
328        byte_size: blob.byte_size,
329        container: sanitize_id(&blob.container),
330    }
331}
332
333fn sanitize_id(id: &str) -> String {
334    let bad_prefixes: &[_] = &['/', '.'];
335    let s = id.trim_start_matches(bad_prefixes);
336    let s = s.replace("..", "");
337    s.replace("/", "_")
338}
339
340fn dispatch_chunk(
341    xfer: &Transfer,
342    actor: &str,
343    i: usize,
344    d: Arc<RwLock<Box<dyn Dispatcher>>>,
345    chunk: Result<Vec<u8>, std::io::Error>,
346) {
347    if let Ok(chunk) = chunk {
348        let fc = FileChunk {
349            sequence_no: i as u64,
350            container: xfer.container.to_string(),
351            id: xfer.blob_id.to_string(),
352            chunk_bytes: chunk,
353            chunk_size: xfer.chunk_size,
354            total_bytes: xfer.total_size,
355            context: xfer.context.clone(),
356        };
357        let buf = serialize(&fc).unwrap();
358        let _ = d.read().unwrap().dispatch(actor, OP_RECEIVE_CHUNK, &buf);
359    }
360}
361
362impl CapabilityProvider for FileSystemProvider {
363    // Invoked by the runtime host to give this provider plugin the ability to communicate
364    // with actors
365    fn configure_dispatch(
366        &self,
367        dispatcher: Box<dyn Dispatcher>,
368    ) -> Result<(), Box<dyn Error + Sync + Send>> {
369        trace!("Dispatcher received.");
370        let mut lock = self.dispatcher.write().unwrap();
371        *lock = dispatcher;
372
373        Ok(())
374    }
375
376    // Invoked by host runtime to allow an actor to make use of the capability
377    // All providers MUST handle the "configure" message, even if no work will be done
378    fn handle_call(
379        &self,
380        actor: &str,
381        op: &str,
382        msg: &[u8],
383    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
384        trace!("Received host call from {}, operation - {}", actor, op);
385
386        match op {
387            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
388            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => Ok(vec![]),
389            OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
390            OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
391            OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
392            OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
393            OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
394            OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
395            OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
396            OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
397            OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
398            _ => Err("bad dispatch".into()),
399        }
400    }
401}
402
403#[cfg(test)]
404#[allow(unused_imports)]
405mod tests {
406    use super::{sanitize_blob, sanitize_container};
407    use crate::FileSystemProvider;
408    use codec::blobstore::{Blob, Container};
409    use std::collections::HashMap;
410    use std::env::temp_dir;
411    use std::fs::File;
412    use std::io::{BufReader, Read};
413    use std::path::{Path, PathBuf};
414    use wascc_codec::blobstore::FileChunk;
415    use wascc_codec::core::CapabilityConfiguration;
416
417    #[test]
418    fn no_hacky_hacky() {
419        let container = Container {
420            id: "/etc/h4x0rd".to_string(),
421        };
422        let blob = Blob {
423            byte_size: 0,
424            id: "../passwd".to_string(),
425            container: "/etc/h4x0rd".to_string(),
426        };
427        let c = sanitize_container(&container);
428        let b = sanitize_blob(&blob);
429
430        // the resulting tricksy blob should end up in ${ROOT}/etc_h4x0rd/passwd and
431        // thereby not expose anything sensitive
432        assert_eq!(c.id, "etc_h4x0rd");
433        assert_eq!(b.id, "passwd");
434        assert_eq!(b.container, "etc_h4x0rd");
435    }
436
437    #[test]
438    fn test_start_upload() {
439        let actor = "actor1";
440        let container = "container".to_string();
441        let id = "blob".to_string();
442
443        let fs = FileSystemProvider::new();
444        let root_dir = setup_test_start_upload(&fs);
445        let upload_dir = Path::join(&root_dir, &container);
446        let bpath = create_dir(&upload_dir, &id);
447
448        let total_bytes = 6;
449        let chunk_size = 2;
450
451        let chunk1 = FileChunk {
452            sequence_no: 0,
453            container: container.clone(),
454            id: id.clone(),
455            total_bytes,
456            chunk_size,
457            chunk_bytes: vec![1, 1],
458            context: None,
459        };
460        let chunk2 = FileChunk {
461            sequence_no: 1,
462            container: container.clone(),
463            id: id.clone(),
464            total_bytes,
465            chunk_size,
466            chunk_bytes: vec![2, 2],
467            context: None,
468        };
469        let chunk3 = FileChunk {
470            sequence_no: 2,
471            container: container.clone(),
472            id: id.clone(),
473            total_bytes,
474            chunk_size,
475            chunk_bytes: vec![3],
476            context: None,
477        };
478        let chunk4 = FileChunk {
479            sequence_no: 3,
480            container: container.clone(),
481            id: id.clone(),
482            total_bytes,
483            chunk_size,
484            chunk_bytes: vec![3],
485            context: None,
486        };
487
488        assert!(fs.upload_chunk(actor, chunk1).is_ok());
489        assert!(fs.upload_chunk(actor, chunk2).is_ok());
490        assert!(fs.upload_chunk(actor, chunk3).is_ok());
491        assert!(fs.upload_chunk(actor, chunk4).is_ok());
492
493        // check file contents
494        let mut reader = BufReader::new(File::open(&bpath).unwrap());
495        let mut buffer = [0; 5];
496
497        teardown_test_start_upload(&bpath, &upload_dir);
498
499        assert!(reader.read(&mut buffer).is_ok());
500        assert_eq!(vec![1, 1, 2, 2, 3], buffer);
501        // the last duplicate is not cleaned up because it can't tell the
502        // difference between a late duplicate chunk and an early out of order chunk
503        assert_eq!(1, fs.upload_chunks.read().unwrap().len());
504    }
505
506    #[allow(dead_code)]
507    fn setup_test_start_upload(fs: &FileSystemProvider) -> PathBuf {
508        let mut config = HashMap::new();
509        let root_dir = temp_dir();
510
511        config.insert("ROOT".to_string(), String::from(root_dir.to_str().unwrap()));
512        fs.configure(CapabilityConfiguration {
513            module: "test_start_upload-module".to_string(),
514            values: config,
515        })
516        .unwrap();
517
518        root_dir
519    }
520
521    #[allow(dead_code)]
522    fn teardown_test_start_upload(file: &PathBuf, upload_dir: &PathBuf) {
523        std::fs::remove_file(file).unwrap();
524        std::fs::remove_dir_all(upload_dir).unwrap();
525    }
526
527    #[allow(dead_code)]
528    fn create_dir(dir: &PathBuf, id: &String) -> PathBuf {
529        let bpath = Path::join(&dir, &id);
530        let _res = std::fs::create_dir(&dir);
531        drop(File::create(&bpath).unwrap());
532        bpath
533    }
534}