wasmcloud_fs/
lib.rs

1use chunks::Chunks;
2use codec::{
3    capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
4    capability_provider,
5    core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR},
6    deserialize, serialize,
7};
8use log::{error, info, trace};
9use std::{
10    collections::HashMap,
11    error::Error,
12    fs::OpenOptions,
13    io::Write,
14    path::{Path, PathBuf},
15    sync::{Arc, RwLock},
16};
17use wasmcloud_actor_blobstore::*;
18use wasmcloud_actor_core::CapabilityConfiguration;
19use wasmcloud_provider_core as codec;
20
21mod chunks;
22
23#[cfg(not(feature = "static_plugin"))]
24capability_provider!(FileSystemProvider, FileSystemProvider::new);
25
26#[allow(unused)]
27const CAPABILITY_ID: &str = "wasmcloud:blobstore";
28const SYSTEM_ACTOR: &str = "system";
29const FIRST_SEQ_NBR: u64 = 0;
30
31/// Tuple of (expected sequence number, chunks)
32type SequencedChunk = (u64, Vec<FileChunk>);
33
34#[derive(Clone)]
35pub struct FileSystemProvider {
36    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
37    rootdir: Arc<RwLock<PathBuf>>,
38    upload_chunks: Arc<RwLock<HashMap<String, SequencedChunk>>>,
39}
40
41impl Default for FileSystemProvider {
42    fn default() -> Self {
43        let _ = env_logger::builder().format_module_path(false).try_init();
44
45        FileSystemProvider {
46            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
47            rootdir: Arc::new(RwLock::new(PathBuf::new())),
48            upload_chunks: Arc::new(RwLock::new(HashMap::new())),
49        }
50    }
51}
52
53impl FileSystemProvider {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    fn configure(
59        &self,
60        config: CapabilityConfiguration,
61    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
62        let mut lock = self.rootdir.write().unwrap();
63        let root_dir = config.values["ROOT"].clone();
64        info!("File System Blob Store Container Root: '{}'", root_dir);
65        *lock = PathBuf::from(root_dir);
66
67        Ok(vec![])
68    }
69
70    fn create_container(
71        &self,
72        _actor: &str,
73        args: CreateContainerArgs,
74    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
75        let container = Container::new(sanitize_id(&args.id));
76        let cdir = self.container_to_path(&container);
77        std::fs::create_dir_all(cdir)?;
78        serialize(&container)
79    }
80
81    fn remove_container(
82        &self,
83        _actor: &str,
84        args: RemoveContainerArgs,
85    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
86        let container = Container::new(sanitize_id(&args.id));
87        let cdir = self.container_to_path(&container);
88        std::fs::remove_dir(cdir)?;
89        serialize(BlobstoreResult {
90            success: true,
91            error: None,
92        })
93    }
94
95    fn start_upload(
96        &self,
97        _actor: &str,
98        blob: FileChunk,
99    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
100        let blob = sanitize_blob(&Blob {
101            id: blob.id,
102            container: blob.container,
103            byte_size: 0,
104        });
105        let bfile = self.blob_to_path(&blob);
106        serialize(std::fs::write(bfile, &[]).map_or_else(
107            |e| BlobstoreResult {
108                success: false,
109                error: Some(e.to_string()),
110            },
111            |_| BlobstoreResult {
112                success: true,
113                error: None,
114            },
115        ))
116    }
117
118    fn remove_object(
119        &self,
120        _actor: &str,
121        args: RemoveObjectArgs,
122    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
123        let blob = sanitize_blob(&Blob {
124            id: args.id,
125            container: Container::new(args.container_id),
126            byte_size: 0,
127        });
128        let bfile = self.blob_to_path(&blob);
129        serialize(std::fs::remove_file(&bfile).map_or_else(
130            |e| BlobstoreResult {
131                success: false,
132                error: Some(e.to_string()),
133            },
134            |_| BlobstoreResult {
135                success: true,
136                error: None,
137            },
138        ))
139    }
140
141    fn get_object_info(
142        &self,
143        _actor: &str,
144        args: GetObjectInfoArgs,
145    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
146        let blob = sanitize_blob(&Blob {
147            id: args.blob_id,
148            container: Container::new(&args.container_id),
149            byte_size: 0,
150        });
151        let bfile = self.blob_to_path(&blob);
152        serialize(if bfile.exists() {
153            Blob {
154                id: blob.id,
155                container: blob.container,
156                byte_size: bfile.metadata().unwrap().len(),
157            }
158        } else {
159            Blob {
160                id: "none".to_string(),
161                container: Container::new("none"),
162                byte_size: 0,
163            }
164        })
165    }
166
167    fn list_objects(
168        &self,
169        _actor: &str,
170        args: ListObjectsArgs,
171    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
172        let container = sanitize_container(&Container::new(args.container_id));
173        let cpath = self.container_to_path(&container);
174        let (blobs, _errors): (Vec<_>, Vec<_>) = std::fs::read_dir(&cpath)?
175            .map(|e| {
176                e.map(|e| Blob {
177                    id: e.file_name().into_string().unwrap(),
178                    container: container.clone(),
179                    byte_size: e.metadata().unwrap().len(),
180                })
181            })
182            .partition(Result::is_ok);
183        let blobs = blobs.into_iter().map(Result::unwrap).collect();
184        serialize(&BlobList { blobs })
185    }
186
187    fn upload_chunk(
188        &self,
189        actor: &str,
190        chunk: FileChunk,
191    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
192        let mut upload_chunks = self.upload_chunks.write().unwrap();
193        let key = actor.to_string() + &sanitize_id(&chunk.container.id) + &sanitize_id(&chunk.id);
194        let total_chunk_count = chunk.total_bytes / chunk.chunk_size;
195
196        let (expected_sequence_no, chunks) = upload_chunks
197            .entry(key.clone())
198            .or_insert((FIRST_SEQ_NBR, vec![]));
199        chunks.push(chunk);
200
201        while let Some(i) = chunks
202            .iter()
203            .position(|fc| fc.sequence_no == *expected_sequence_no)
204        {
205            let chunk = chunks.get(i).unwrap();
206            let bpath = Path::join(
207                &Path::join(
208                    &self.rootdir.read().unwrap(),
209                    sanitize_id(&chunk.container.id),
210                ),
211                sanitize_id(&chunk.id),
212            );
213            let mut file = OpenOptions::new().create(false).append(true).open(bpath)?;
214            info!(
215                "Receiving file chunk: {} for {}/{}",
216                chunk.sequence_no, chunk.container.id, chunk.id
217            );
218
219            let count = file.write(chunk.chunk_bytes.as_ref())?;
220            if count != chunk.chunk_bytes.len() {
221                let msg = format!(
222                    "Failed to fully write chunk: {} of {} bytes",
223                    count,
224                    chunk.chunk_bytes.len()
225                );
226                error!("{}", &msg);
227                return Err(msg.into());
228            }
229
230            chunks.remove(i);
231            *expected_sequence_no += 1;
232        }
233
234        if *expected_sequence_no == total_chunk_count {
235            upload_chunks.remove(&key);
236        }
237
238        serialize(BlobstoreResult {
239            success: true,
240            error: None,
241        })
242    }
243
244    fn start_download(
245        &self,
246        actor: &str,
247        args: StartDownloadArgs,
248    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
249        info!("Received request to start download : {:?}", args);
250        let actor = actor.to_string();
251        let bpath = Path::join(
252            &Path::join(
253                &self.rootdir.read().unwrap(),
254                sanitize_id(&args.container_id),
255            ),
256            sanitize_id(&args.blob_id),
257        );
258        let byte_size = &bpath.metadata()?.len();
259        let bfile = std::fs::File::open(bpath)?;
260        let chunk_size = if args.chunk_size == 0 {
261            chunks::DEFAULT_CHUNK_SIZE
262        } else {
263            args.chunk_size as usize
264        };
265        let xfer = Transfer {
266            blob_id: sanitize_id(&args.blob_id),
267            container: Container::new(sanitize_id(&args.container_id)),
268            total_size: *byte_size,
269            chunk_size: chunk_size as _,
270            total_chunks: *byte_size / chunk_size as u64,
271            context: args.context,
272        };
273        let iter = Chunks::new(bfile, chunk_size);
274        let d = self.dispatcher.clone();
275        std::thread::spawn(move || {
276            iter.enumerate().for_each(|(i, chunk)| {
277                dispatch_chunk(&xfer, &actor, i, d.clone(), chunk);
278            });
279        });
280
281        serialize(BlobstoreResult {
282            success: true,
283            error: None,
284        })
285    }
286
287    fn blob_to_path(&self, blob: &Blob) -> PathBuf {
288        let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.id.to_string());
289        Path::join(&cdir, blob.id.to_string())
290    }
291
292    fn container_to_path(&self, container: &Container) -> PathBuf {
293        Path::join(&self.rootdir.read().unwrap(), container.id.to_string())
294    }
295}
296
297fn sanitize_container(container: &Container) -> Container {
298    Container {
299        id: sanitize_id(&container.id),
300    }
301}
302
303fn sanitize_blob(blob: &Blob) -> Blob {
304    Blob {
305        id: sanitize_id(&blob.id),
306        byte_size: blob.byte_size,
307        container: Container::new(sanitize_id(&blob.container.id)),
308    }
309}
310
311fn sanitize_id(id: &str) -> String {
312    let bad_prefixes: &[_] = &['/', '.'];
313    let s = id.trim_start_matches(bad_prefixes);
314    let s = s.replace("..", "");
315    s.replace("/", "_")
316}
317
318fn dispatch_chunk(
319    xfer: &Transfer,
320    actor: &str,
321    i: usize,
322    d: Arc<RwLock<Box<dyn Dispatcher>>>,
323    chunk: Result<Vec<u8>, std::io::Error>,
324) {
325    if let Ok(chunk) = chunk {
326        let fc = FileChunk {
327            sequence_no: i as u64,
328            container: Container::new(xfer.container.id.clone()),
329            id: xfer.blob_id.to_string(),
330            chunk_bytes: chunk,
331            chunk_size: xfer.chunk_size,
332            total_bytes: xfer.total_size,
333            context: xfer.context.clone(),
334        };
335        let buf = serialize(&fc).unwrap();
336        let _ = d.read().unwrap().dispatch(actor, OP_RECEIVE_CHUNK, &buf);
337    }
338}
339
340impl CapabilityProvider for FileSystemProvider {
341    // Invoked by the runtime host to give this provider plugin the ability to communicate
342    // with actors
343    fn configure_dispatch(
344        &self,
345        dispatcher: Box<dyn Dispatcher>,
346    ) -> Result<(), Box<dyn Error + Sync + Send>> {
347        trace!("Dispatcher received.");
348        let mut lock = self.dispatcher.write().unwrap();
349        *lock = dispatcher;
350
351        Ok(())
352    }
353
354    // Invoked by host runtime to allow an actor to make use of the capability
355    // All providers MUST handle the "configure" message, even if no work will be done
356    fn handle_call(
357        &self,
358        actor: &str,
359        op: &str,
360        msg: &[u8],
361    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
362        trace!("Received host call from {}, operation - {}", actor, op);
363
364        match op {
365            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
366            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => Ok(vec![]),
367            OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
368            OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
369            OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
370            OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
371            OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
372            OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
373            OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
374            OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
375            _ => Err("bad dispatch".into()),
376        }
377    }
378
379    fn stop(&self) {
380        // No cleanup needed on stop at the moment
381    }
382}
383
384#[cfg(test)]
385#[allow(unused_imports)]
386mod tests {
387    use super::{sanitize_blob, sanitize_container};
388    use crate::FileSystemProvider;
389    use std::collections::HashMap;
390    use std::env::temp_dir;
391    use std::fs::File;
392    use std::io::{BufReader, Read};
393    use std::path::{Path, PathBuf};
394    use wasmcloud_actor_blobstore::{Blob, Container, FileChunk};
395    use wasmcloud_actor_core::CapabilityConfiguration;
396
397    #[test]
398    fn no_hacky_hacky() {
399        let container = Container {
400            id: "/etc/h4x0rd".to_string(),
401        };
402        let blob = Blob {
403            byte_size: 0,
404            id: "../passwd".to_string(),
405            container: Container::new("/etc/h4x0rd"),
406        };
407        let c = sanitize_container(&container);
408        let b = sanitize_blob(&blob);
409
410        // the resulting tricksy blob should end up in ${ROOT}/etc_h4x0rd/passwd and
411        // thereby not expose anything sensitive
412        assert_eq!(c.id, "etc_h4x0rd");
413        assert_eq!(b.id, "passwd");
414        assert_eq!(b.container.id, "etc_h4x0rd");
415    }
416
417    #[test]
418    fn test_start_upload() {
419        let actor = "actor1";
420        let container = Container::new("container");
421        let id = "blob".to_string();
422
423        let fs = FileSystemProvider::new();
424        let root_dir = setup_test_start_upload(&fs);
425        let upload_dir = Path::join(&root_dir, &container.id);
426        let bpath = create_dir(&upload_dir, &id);
427
428        let total_bytes = 6;
429        let chunk_size = 2;
430
431        let chunk1 = FileChunk {
432            sequence_no: 0,
433            container: container.clone(),
434            id: id.clone(),
435            total_bytes,
436            chunk_size,
437            chunk_bytes: vec![1, 1],
438            context: None,
439        };
440        let chunk2 = FileChunk {
441            sequence_no: 1,
442            container: container.clone(),
443            id: id.clone(),
444            total_bytes,
445            chunk_size,
446            chunk_bytes: vec![2, 2],
447            context: None,
448        };
449        let chunk3 = FileChunk {
450            sequence_no: 2,
451            container: container.clone(),
452            id: id.clone(),
453            total_bytes,
454            chunk_size,
455            chunk_bytes: vec![3],
456            context: None,
457        };
458        let chunk4 = FileChunk {
459            sequence_no: 3,
460            container: container.clone(),
461            id: id.clone(),
462            total_bytes,
463            chunk_size,
464            chunk_bytes: vec![3],
465            context: None,
466        };
467
468        assert!(fs.upload_chunk(actor, chunk1).is_ok());
469        assert!(fs.upload_chunk(actor, chunk2).is_ok());
470        assert!(fs.upload_chunk(actor, chunk3).is_ok());
471        assert!(fs.upload_chunk(actor, chunk4).is_ok());
472
473        // check file contents
474        let mut reader = BufReader::new(File::open(&bpath).unwrap());
475        let mut buffer = [0; 5];
476
477        teardown_test_start_upload(&bpath, &upload_dir);
478
479        assert!(reader.read(&mut buffer).is_ok());
480        assert_eq!(vec![1, 1, 2, 2, 3], buffer);
481        // the last duplicate is not cleaned up because it can't tell the
482        // difference between a late duplicate chunk and an early out of order chunk
483        assert_eq!(1, fs.upload_chunks.read().unwrap().len());
484    }
485
486    #[allow(dead_code)]
487    fn setup_test_start_upload(fs: &FileSystemProvider) -> PathBuf {
488        let mut config = HashMap::new();
489        let root_dir = temp_dir();
490
491        config.insert("ROOT".to_string(), String::from(root_dir.to_str().unwrap()));
492        fs.configure(CapabilityConfiguration {
493            module: "test_start_upload-module".to_string(),
494            values: config,
495        })
496        .unwrap();
497
498        root_dir
499    }
500
501    #[allow(dead_code)]
502    fn teardown_test_start_upload(file: &PathBuf, upload_dir: &PathBuf) {
503        std::fs::remove_file(file).unwrap();
504        std::fs::remove_dir_all(upload_dir).unwrap();
505    }
506
507    #[allow(dead_code)]
508    fn create_dir(dir: &PathBuf, id: &String) -> PathBuf {
509        let bpath = Path::join(&dir, &id);
510        let _res = std::fs::create_dir(&dir);
511        drop(File::create(&bpath).unwrap());
512        bpath
513    }
514}