1use chunks::Chunks;
2use codec::{
3 capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
4 capability_provider,
5 core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR},
6 deserialize, serialize,
7};
8use log::{error, info, trace};
9use std::{
10 collections::HashMap,
11 error::Error,
12 fs::OpenOptions,
13 io::Write,
14 path::{Path, PathBuf},
15 sync::{Arc, RwLock},
16};
17use wasmcloud_actor_blobstore::*;
18use wasmcloud_actor_core::CapabilityConfiguration;
19use wasmcloud_provider_core as codec;
20
21mod chunks;
22
23#[cfg(not(feature = "static_plugin"))]
24capability_provider!(FileSystemProvider, FileSystemProvider::new);
25
26#[allow(unused)]
27const CAPABILITY_ID: &str = "wasmcloud:blobstore";
28const SYSTEM_ACTOR: &str = "system";
29const FIRST_SEQ_NBR: u64 = 0;
30
31type SequencedChunk = (u64, Vec<FileChunk>);
33
34#[derive(Clone)]
35pub struct FileSystemProvider {
36 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
37 rootdir: Arc<RwLock<PathBuf>>,
38 upload_chunks: Arc<RwLock<HashMap<String, SequencedChunk>>>,
39}
40
41impl Default for FileSystemProvider {
42 fn default() -> Self {
43 let _ = env_logger::builder().format_module_path(false).try_init();
44
45 FileSystemProvider {
46 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
47 rootdir: Arc::new(RwLock::new(PathBuf::new())),
48 upload_chunks: Arc::new(RwLock::new(HashMap::new())),
49 }
50 }
51}
52
53impl FileSystemProvider {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 fn configure(
59 &self,
60 config: CapabilityConfiguration,
61 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
62 let mut lock = self.rootdir.write().unwrap();
63 let root_dir = config.values["ROOT"].clone();
64 info!("File System Blob Store Container Root: '{}'", root_dir);
65 *lock = PathBuf::from(root_dir);
66
67 Ok(vec![])
68 }
69
70 fn create_container(
71 &self,
72 _actor: &str,
73 args: CreateContainerArgs,
74 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
75 let container = Container::new(sanitize_id(&args.id));
76 let cdir = self.container_to_path(&container);
77 std::fs::create_dir_all(cdir)?;
78 serialize(&container)
79 }
80
81 fn remove_container(
82 &self,
83 _actor: &str,
84 args: RemoveContainerArgs,
85 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
86 let container = Container::new(sanitize_id(&args.id));
87 let cdir = self.container_to_path(&container);
88 std::fs::remove_dir(cdir)?;
89 serialize(BlobstoreResult {
90 success: true,
91 error: None,
92 })
93 }
94
95 fn start_upload(
96 &self,
97 _actor: &str,
98 blob: FileChunk,
99 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
100 let blob = sanitize_blob(&Blob {
101 id: blob.id,
102 container: blob.container,
103 byte_size: 0,
104 });
105 let bfile = self.blob_to_path(&blob);
106 serialize(std::fs::write(bfile, &[]).map_or_else(
107 |e| BlobstoreResult {
108 success: false,
109 error: Some(e.to_string()),
110 },
111 |_| BlobstoreResult {
112 success: true,
113 error: None,
114 },
115 ))
116 }
117
118 fn remove_object(
119 &self,
120 _actor: &str,
121 args: RemoveObjectArgs,
122 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
123 let blob = sanitize_blob(&Blob {
124 id: args.id,
125 container: Container::new(args.container_id),
126 byte_size: 0,
127 });
128 let bfile = self.blob_to_path(&blob);
129 serialize(std::fs::remove_file(&bfile).map_or_else(
130 |e| BlobstoreResult {
131 success: false,
132 error: Some(e.to_string()),
133 },
134 |_| BlobstoreResult {
135 success: true,
136 error: None,
137 },
138 ))
139 }
140
141 fn get_object_info(
142 &self,
143 _actor: &str,
144 args: GetObjectInfoArgs,
145 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
146 let blob = sanitize_blob(&Blob {
147 id: args.blob_id,
148 container: Container::new(&args.container_id),
149 byte_size: 0,
150 });
151 let bfile = self.blob_to_path(&blob);
152 serialize(if bfile.exists() {
153 Blob {
154 id: blob.id,
155 container: blob.container,
156 byte_size: bfile.metadata().unwrap().len(),
157 }
158 } else {
159 Blob {
160 id: "none".to_string(),
161 container: Container::new("none"),
162 byte_size: 0,
163 }
164 })
165 }
166
167 fn list_objects(
168 &self,
169 _actor: &str,
170 args: ListObjectsArgs,
171 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
172 let container = sanitize_container(&Container::new(args.container_id));
173 let cpath = self.container_to_path(&container);
174 let (blobs, _errors): (Vec<_>, Vec<_>) = std::fs::read_dir(&cpath)?
175 .map(|e| {
176 e.map(|e| Blob {
177 id: e.file_name().into_string().unwrap(),
178 container: container.clone(),
179 byte_size: e.metadata().unwrap().len(),
180 })
181 })
182 .partition(Result::is_ok);
183 let blobs = blobs.into_iter().map(Result::unwrap).collect();
184 serialize(&BlobList { blobs })
185 }
186
187 fn upload_chunk(
188 &self,
189 actor: &str,
190 chunk: FileChunk,
191 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
192 let mut upload_chunks = self.upload_chunks.write().unwrap();
193 let key = actor.to_string() + &sanitize_id(&chunk.container.id) + &sanitize_id(&chunk.id);
194 let total_chunk_count = chunk.total_bytes / chunk.chunk_size;
195
196 let (expected_sequence_no, chunks) = upload_chunks
197 .entry(key.clone())
198 .or_insert((FIRST_SEQ_NBR, vec![]));
199 chunks.push(chunk);
200
201 while let Some(i) = chunks
202 .iter()
203 .position(|fc| fc.sequence_no == *expected_sequence_no)
204 {
205 let chunk = chunks.get(i).unwrap();
206 let bpath = Path::join(
207 &Path::join(
208 &self.rootdir.read().unwrap(),
209 sanitize_id(&chunk.container.id),
210 ),
211 sanitize_id(&chunk.id),
212 );
213 let mut file = OpenOptions::new().create(false).append(true).open(bpath)?;
214 info!(
215 "Receiving file chunk: {} for {}/{}",
216 chunk.sequence_no, chunk.container.id, chunk.id
217 );
218
219 let count = file.write(chunk.chunk_bytes.as_ref())?;
220 if count != chunk.chunk_bytes.len() {
221 let msg = format!(
222 "Failed to fully write chunk: {} of {} bytes",
223 count,
224 chunk.chunk_bytes.len()
225 );
226 error!("{}", &msg);
227 return Err(msg.into());
228 }
229
230 chunks.remove(i);
231 *expected_sequence_no += 1;
232 }
233
234 if *expected_sequence_no == total_chunk_count {
235 upload_chunks.remove(&key);
236 }
237
238 serialize(BlobstoreResult {
239 success: true,
240 error: None,
241 })
242 }
243
244 fn start_download(
245 &self,
246 actor: &str,
247 args: StartDownloadArgs,
248 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
249 info!("Received request to start download : {:?}", args);
250 let actor = actor.to_string();
251 let bpath = Path::join(
252 &Path::join(
253 &self.rootdir.read().unwrap(),
254 sanitize_id(&args.container_id),
255 ),
256 sanitize_id(&args.blob_id),
257 );
258 let byte_size = &bpath.metadata()?.len();
259 let bfile = std::fs::File::open(bpath)?;
260 let chunk_size = if args.chunk_size == 0 {
261 chunks::DEFAULT_CHUNK_SIZE
262 } else {
263 args.chunk_size as usize
264 };
265 let xfer = Transfer {
266 blob_id: sanitize_id(&args.blob_id),
267 container: Container::new(sanitize_id(&args.container_id)),
268 total_size: *byte_size,
269 chunk_size: chunk_size as _,
270 total_chunks: *byte_size / chunk_size as u64,
271 context: args.context,
272 };
273 let iter = Chunks::new(bfile, chunk_size);
274 let d = self.dispatcher.clone();
275 std::thread::spawn(move || {
276 iter.enumerate().for_each(|(i, chunk)| {
277 dispatch_chunk(&xfer, &actor, i, d.clone(), chunk);
278 });
279 });
280
281 serialize(BlobstoreResult {
282 success: true,
283 error: None,
284 })
285 }
286
287 fn blob_to_path(&self, blob: &Blob) -> PathBuf {
288 let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.id.to_string());
289 Path::join(&cdir, blob.id.to_string())
290 }
291
292 fn container_to_path(&self, container: &Container) -> PathBuf {
293 Path::join(&self.rootdir.read().unwrap(), container.id.to_string())
294 }
295}
296
297fn sanitize_container(container: &Container) -> Container {
298 Container {
299 id: sanitize_id(&container.id),
300 }
301}
302
303fn sanitize_blob(blob: &Blob) -> Blob {
304 Blob {
305 id: sanitize_id(&blob.id),
306 byte_size: blob.byte_size,
307 container: Container::new(sanitize_id(&blob.container.id)),
308 }
309}
310
311fn sanitize_id(id: &str) -> String {
312 let bad_prefixes: &[_] = &['/', '.'];
313 let s = id.trim_start_matches(bad_prefixes);
314 let s = s.replace("..", "");
315 s.replace("/", "_")
316}
317
318fn dispatch_chunk(
319 xfer: &Transfer,
320 actor: &str,
321 i: usize,
322 d: Arc<RwLock<Box<dyn Dispatcher>>>,
323 chunk: Result<Vec<u8>, std::io::Error>,
324) {
325 if let Ok(chunk) = chunk {
326 let fc = FileChunk {
327 sequence_no: i as u64,
328 container: Container::new(xfer.container.id.clone()),
329 id: xfer.blob_id.to_string(),
330 chunk_bytes: chunk,
331 chunk_size: xfer.chunk_size,
332 total_bytes: xfer.total_size,
333 context: xfer.context.clone(),
334 };
335 let buf = serialize(&fc).unwrap();
336 let _ = d.read().unwrap().dispatch(actor, OP_RECEIVE_CHUNK, &buf);
337 }
338}
339
340impl CapabilityProvider for FileSystemProvider {
341 fn configure_dispatch(
344 &self,
345 dispatcher: Box<dyn Dispatcher>,
346 ) -> Result<(), Box<dyn Error + Sync + Send>> {
347 trace!("Dispatcher received.");
348 let mut lock = self.dispatcher.write().unwrap();
349 *lock = dispatcher;
350
351 Ok(())
352 }
353
354 fn handle_call(
357 &self,
358 actor: &str,
359 op: &str,
360 msg: &[u8],
361 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
362 trace!("Received host call from {}, operation - {}", actor, op);
363
364 match op {
365 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
366 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => Ok(vec![]),
367 OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
368 OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
369 OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
370 OP_LIST_OBJECTS => self.list_objects(actor, deserialize(msg)?),
371 OP_UPLOAD_CHUNK => self.upload_chunk(actor, deserialize(msg)?),
372 OP_START_DOWNLOAD => self.start_download(actor, deserialize(msg)?),
373 OP_START_UPLOAD => self.start_upload(actor, deserialize(msg)?),
374 OP_GET_OBJECT_INFO => self.get_object_info(actor, deserialize(msg)?),
375 _ => Err("bad dispatch".into()),
376 }
377 }
378
379 fn stop(&self) {
380 }
382}
383
384#[cfg(test)]
385#[allow(unused_imports)]
386mod tests {
387 use super::{sanitize_blob, sanitize_container};
388 use crate::FileSystemProvider;
389 use std::collections::HashMap;
390 use std::env::temp_dir;
391 use std::fs::File;
392 use std::io::{BufReader, Read};
393 use std::path::{Path, PathBuf};
394 use wasmcloud_actor_blobstore::{Blob, Container, FileChunk};
395 use wasmcloud_actor_core::CapabilityConfiguration;
396
397 #[test]
398 fn no_hacky_hacky() {
399 let container = Container {
400 id: "/etc/h4x0rd".to_string(),
401 };
402 let blob = Blob {
403 byte_size: 0,
404 id: "../passwd".to_string(),
405 container: Container::new("/etc/h4x0rd"),
406 };
407 let c = sanitize_container(&container);
408 let b = sanitize_blob(&blob);
409
410 assert_eq!(c.id, "etc_h4x0rd");
413 assert_eq!(b.id, "passwd");
414 assert_eq!(b.container.id, "etc_h4x0rd");
415 }
416
417 #[test]
418 fn test_start_upload() {
419 let actor = "actor1";
420 let container = Container::new("container");
421 let id = "blob".to_string();
422
423 let fs = FileSystemProvider::new();
424 let root_dir = setup_test_start_upload(&fs);
425 let upload_dir = Path::join(&root_dir, &container.id);
426 let bpath = create_dir(&upload_dir, &id);
427
428 let total_bytes = 6;
429 let chunk_size = 2;
430
431 let chunk1 = FileChunk {
432 sequence_no: 0,
433 container: container.clone(),
434 id: id.clone(),
435 total_bytes,
436 chunk_size,
437 chunk_bytes: vec![1, 1],
438 context: None,
439 };
440 let chunk2 = FileChunk {
441 sequence_no: 1,
442 container: container.clone(),
443 id: id.clone(),
444 total_bytes,
445 chunk_size,
446 chunk_bytes: vec![2, 2],
447 context: None,
448 };
449 let chunk3 = FileChunk {
450 sequence_no: 2,
451 container: container.clone(),
452 id: id.clone(),
453 total_bytes,
454 chunk_size,
455 chunk_bytes: vec![3],
456 context: None,
457 };
458 let chunk4 = FileChunk {
459 sequence_no: 3,
460 container: container.clone(),
461 id: id.clone(),
462 total_bytes,
463 chunk_size,
464 chunk_bytes: vec![3],
465 context: None,
466 };
467
468 assert!(fs.upload_chunk(actor, chunk1).is_ok());
469 assert!(fs.upload_chunk(actor, chunk2).is_ok());
470 assert!(fs.upload_chunk(actor, chunk3).is_ok());
471 assert!(fs.upload_chunk(actor, chunk4).is_ok());
472
473 let mut reader = BufReader::new(File::open(&bpath).unwrap());
475 let mut buffer = [0; 5];
476
477 teardown_test_start_upload(&bpath, &upload_dir);
478
479 assert!(reader.read(&mut buffer).is_ok());
480 assert_eq!(vec![1, 1, 2, 2, 3], buffer);
481 assert_eq!(1, fs.upload_chunks.read().unwrap().len());
484 }
485
486 #[allow(dead_code)]
487 fn setup_test_start_upload(fs: &FileSystemProvider) -> PathBuf {
488 let mut config = HashMap::new();
489 let root_dir = temp_dir();
490
491 config.insert("ROOT".to_string(), String::from(root_dir.to_str().unwrap()));
492 fs.configure(CapabilityConfiguration {
493 module: "test_start_upload-module".to_string(),
494 values: config,
495 })
496 .unwrap();
497
498 root_dir
499 }
500
501 #[allow(dead_code)]
502 fn teardown_test_start_upload(file: &PathBuf, upload_dir: &PathBuf) {
503 std::fs::remove_file(file).unwrap();
504 std::fs::remove_dir_all(upload_dir).unwrap();
505 }
506
507 #[allow(dead_code)]
508 fn create_dir(dir: &PathBuf, id: &String) -> PathBuf {
509 let bpath = Path::join(&dir, &id);
510 let _res = std::fs::create_dir(&dir);
511 drop(File::create(&bpath).unwrap());
512 bpath
513 }
514}