starlane_core/
file_access.rs

1use std::collections::HashMap;
2use std::{fs, thread};
3
4use std::fs::{DirBuilder, File};
5
6use std::io::{Read, Write};
7use std::path::PathBuf;
8use std::str::FromStr;
9use std::sync::{mpsc, Arc};
10
11use notify::{raw_watcher, Op, RawEvent, RecursiveMode, Watcher};
12use tokio::io::AsyncReadExt;
13use tokio::time::Duration;
14use walkdir::WalkDir;
15
16use crate::error::Error;
17use crate::resource::FileKind;
18use crate::resource::Path;
19
20use crate::util;
21use std::convert::TryFrom;
22use std::convert::TryInto;
23use tokio::fs::ReadDir;
24
25pub enum FileCommand {
26    List {
27        path: Path,
28        tx: tokio::sync::oneshot::Sender<Result<Vec<Path>, Error>>,
29    },
30    Read {
31        path: Path,
32        tx: tokio::sync::oneshot::Sender<Result<Arc<Vec<u8>>, Error>>,
33    },
34    Write {
35        path: Path,
36        data: Arc<Vec<u8>>,
37        tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
38    },
39    //    WriteStream{ path: Path, stream: Box<dyn AsyncRead>, tx: tokio::sync::oneshot::Sender<Result<(),Error>> },
40    MkDir {
41        path: Path,
42        tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
43    },
44    Watch {
45        tx: tokio::sync::oneshot::Sender<Result<tokio::sync::mpsc::Receiver<FileEvent>, Error>>,
46    },
47    Walk {
48        tx: tokio::sync::oneshot::Sender<Result<tokio::sync::mpsc::Receiver<FileEvent>, Error>>,
49    },
50    UnZip {
51        source: String,
52        target: String,
53        tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
54    },
55    Shutdown,
56}
57
58#[derive(Clone)]
59pub struct FileAccess {
60    path: String,
61    tx: tokio::sync::mpsc::Sender<FileCommand>,
62}
63
64impl FileAccess {
65    pub fn path(&self) -> String {
66        self.path.clone()
67    }
68
69    pub fn new(path: String) -> Result<Self, Error> {
70        let tx = LocalFileAccess::new(path.clone())?;
71        let path = fs::canonicalize(&path)?
72            .to_str()
73            .ok_or("turning path to string")?
74            .to_string();
75        Ok(FileAccess { path: path, tx: tx })
76    }
77
78    pub async fn list(&self, path: &Path) -> Result<Vec<Path>, Error> {
79        let (tx, rx) = tokio::sync::oneshot::channel();
80        self.tx
81            .send(FileCommand::List {
82                path: path.clone(),
83                tx: tx,
84            })
85            .await?;
86        Ok(util::wait_for_it(rx).await?)
87    }
88
89    pub async fn read(&self, path: &Path) -> Result<Arc<Vec<u8>>, Error> {
90        let (tx, rx) = tokio::sync::oneshot::channel();
91        self.tx
92            .send(FileCommand::Read {
93                path: path.clone(),
94                tx: tx,
95            })
96            .await?;
97        Ok(util::wait_for_it(rx).await?)
98    }
99
100    pub async fn write(&self, path: &Path, data: Arc<Vec<u8>>) -> Result<(), Error> {
101        let (tx, rx) = tokio::sync::oneshot::channel();
102        self.tx
103            .send(FileCommand::Write {
104                path: path.clone(),
105                data,
106                tx,
107            })
108            .await?;
109        Ok(util::wait_for_it(rx).await?)
110    }
111
112    /*
113    pub async fn write_stream( &mut self, path: &Path, stream: Box<dyn AsyncReadExt> )->Result<(),Error> {
114        let (tx,mut rx) = tokio::sync::oneshot::channel();
115        self.tx.send( FileCommand::WriteStream{path:path.clone(),stream,tx}).await?;
116        Ok(util::wait_for_it_for(rx, Duration::from_secs(60*15)).await?)
117    }
118
119     */
120
121    pub fn with_path(&self, path: String) -> Result<FileAccess, Error> {
122        let path = format!("{}/{}", self.path, path);
123        Ok(FileAccess::new(path)?)
124    }
125
126    pub async fn unzip(&self, source: String, target: String) -> Result<(), Error> {
127        let (tx, rx) = tokio::sync::oneshot::channel();
128        self.tx
129            .send(FileCommand::UnZip { source, target, tx })
130            .await?;
131        Ok(util::wait_for_it_for(rx, Duration::from_secs(60 * 2)).await?)
132    }
133
134    pub async fn mkdir(&self, path: &Path) -> Result<FileAccess, Error> {
135        let (tx, rx) = tokio::sync::oneshot::channel();
136        self.tx
137            .send(FileCommand::MkDir {
138                path: path.clone(),
139                tx,
140            })
141            .await?;
142        util::wait_for_it(rx).await?;
143        self.with_path(path.to_relative())
144    }
145
146    pub async fn watch(&self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
147        let (tx, rx) = tokio::sync::oneshot::channel();
148        self.tx.send(FileCommand::Watch { tx }).await?;
149        Ok(util::wait_for_it(rx).await?)
150    }
151
152    pub async fn walk(&self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
153        let (tx, rx) = tokio::sync::oneshot::channel();
154        self.tx.send(FileCommand::Walk { tx }).await?;
155        Ok(util::wait_for_it(rx).await?)
156    }
157
158    pub fn close(&self) {
159        self.tx.try_send(FileCommand::Shutdown);
160    }
161}
162
163#[derive(Debug, Clone)]
164pub enum FileEventKind {
165    Discovered,
166    Create,
167    Update,
168    Delete,
169}
170
171#[derive(Debug, Clone)]
172pub struct FileEvent {
173    pub path: String,
174    pub event_kind: FileEventKind,
175    pub file_kind: FileKind,
176}
177
178#[derive(Clone)]
179pub struct MemoryFileAccess {
180    map: HashMap<Path, Arc<Vec<u8>>>,
181}
182
183impl MemoryFileAccess {
184    pub fn new() -> Self {
185        MemoryFileAccess {
186            map: HashMap::new(),
187        }
188    }
189}
190
191pub struct LocalFileAccess {
192    base_dir: String,
193    rx: tokio::sync::mpsc::Receiver<FileCommand>,
194}
195
196impl LocalFileAccess {
197    pub fn new(base_dir: String) -> Result<tokio::sync::mpsc::Sender<FileCommand>, Error> {
198        let mut builder = DirBuilder::new();
199        builder.recursive(true);
200        builder.create(base_dir.clone())?;
201
202        let (tx, rx) = tokio::sync::mpsc::channel(128);
203
204        tokio::spawn(async move {
205            Self {
206                base_dir: base_dir,
207                rx: rx,
208            }
209            .run()
210            .await;
211        });
212
213        Ok(tx)
214    }
215
216    async fn run(mut self) {
217        tokio::spawn(async move {
218            while let Option::Some(command) = self.rx.recv().await {
219                if let FileCommand::Shutdown = command {
220                    break;
221                }
222
223                match self.process(command).await {
224                    Ok(_) => {}
225                    Err(error) => {
226                        eprintln!("Error in LocalFileAccess: {}", error)
227                    }
228                }
229            }
230        });
231    }
232
233    async fn process(&mut self, command: FileCommand) -> Result<(), Error> {
234        match command {
235            FileCommand::List { path, tx } => {
236                tx.send(self.list(path).await).unwrap_or_default();
237            }
238            FileCommand::Read { path, tx } => {
239                tx.send(self.read(&path)).unwrap_or_default();
240            }
241            FileCommand::Write { path, data, tx } => {
242                tx.send(self.write(&path, data)).unwrap_or_default();
243            }
244            /*            FileCommand::WriteStream { path: path, stream, tx } => {
245                tx.send(self.write_sream(&path,stream).await);
246            }*/
247            FileCommand::MkDir { path, tx } => {
248                tx.send(self.mkdir(&path)).unwrap_or_default();
249            }
250            FileCommand::Watch { tx } => {
251                tx.send(self.watch()).unwrap_or_default();
252            }
253            FileCommand::Walk { tx } => {
254                tx.send(self.walk()).unwrap_or_default();
255            }
256            FileCommand::UnZip { source, target, tx } => {
257                tx.send(self.unzip(source, target)).unwrap_or_default();
258            }
259            FileCommand::Shutdown => {
260                // do nothing
261            }
262        }
263        Ok(())
264    }
265
266    async fn list(&self, dir_path: Path) -> Result<Vec<Path>, Error> {
267        let path = self.cat_path(dir_path.to_relative().as_str())?;
268        let mut read_dir = tokio::fs::read_dir(path).await?;
269
270        let mut rtn = vec![];
271        while let Result::Ok(Option::Some(entry)) = read_dir.next_entry().await {
272            let entry = Path::make_absolute(
273                entry
274                    .file_name()
275                    .to_str()
276                    .ok_or("expected os str to be able to change to str")?,
277            )?;
278            let entry = dir_path.cat(&entry)?;
279            rtn.push(entry);
280        }
281        Ok(rtn)
282    }
283
284    pub fn cat_path(&self, path: &str) -> Result<String, Error> {
285        if path.len() < 1 {
286            return Err("path cannot be empty".into());
287        }
288
289        let mut path_str = path.to_string();
290        if path_str.starts_with("/") {
291            path_str.remove(0);
292        }
293        let mut path_buf = PathBuf::new();
294        path_buf.push(self.base_dir.clone());
295        path_buf.push(path_str);
296        let path = path_buf.as_path().clone();
297        let path = path.to_str().ok_or("path error")?.to_string();
298
299        Ok(path)
300    }
301}
302
303impl LocalFileAccess {
304    pub fn unzip(&mut self, source: String, target: String) -> Result<(), Error> {
305        let source = format!("{}/{}", self.base_dir, source);
306        let source = File::open(source)?;
307        let mut archive = zip::ZipArchive::new(source)?;
308
309        for i in 0..archive.len() {
310            let mut zip_file = archive.by_index(i)?;
311            if zip_file.is_dir() {
312                let path = Path::from_str(format!("/{}/{}", target, zip_file.name()).as_str())?;
313                self.mkdir(&path)?;
314            } else {
315                let path = format!("{}/{}/{}", self.base_dir, target, zip_file.name());
316                let mut file = fs::File::create(path)?;
317                std::io::copy(&mut zip_file, &mut file)?;
318            }
319        }
320
321        Ok(())
322    }
323
324    pub fn read(&self, path: &Path) -> Result<Arc<Vec<u8>>, Error> {
325        let path = self.cat_path(path.to_relative().as_str())?;
326        let mut buf = vec![];
327        let mut file = match File::open(&path) {
328            Ok(file) => {file}
329            Err(error) => {
330                return Result::Err(format!("{} PATH: {}", error.to_string(), path.to_string() ).into());
331            }
332        };
333        file.read_to_end(&mut buf)?;
334        Ok(Arc::new(buf))
335    }
336
337    pub fn write(&mut self, path: &Path, data: Arc<Vec<u8>>) -> Result<(), Error> {
338        if let Option::Some(parent) = path.parent() {
339            self.mkdir(&parent)?;
340        }
341
342        let path = self.cat_path(path.to_relative().as_str())?;
343        let mut file = File::create(&path)?;
344        file.write(data.as_slice()).unwrap();
345
346        Ok(())
347    }
348
349    /*
350    pub async fn write_stream(&mut self, path: &Path, mut stream: Box<dyn AsyncReadExt>) -> Result<(), Error> {
351
352        if let Option::Some(parent) = path.parent(){
353            self.mkdir(&parent)?;
354        }
355
356        let path = self.cat_path(path.to_relative().as_str() )?;
357        let mut file = tokio::fs::File::create(&path).await?;
358
359        tokio::io::copy(&mut stream,&mut file ).await?;
360
361        Ok(())
362    }
363     */
364
365    fn mkdir(&mut self, path: &Path) -> Result<(), Error> {
366        let path = self.cat_path(path.to_relative().as_str())?;
367        let mut builder = DirBuilder::new();
368        builder.recursive(true);
369        builder.create(path.clone())?;
370        Ok(())
371    }
372
373    fn walk(&mut self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
374        let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
375        let tokio_runtime = tokio::runtime::Builder::new_current_thread()
376            .enable_all()
377            .build()?;
378        let base_dir = self.base_dir.clone();
379
380        thread::spawn(move || {
381            for entry in WalkDir::new(base_dir.clone()) {
382                match entry {
383                    Ok(dir_entry) => match dir_entry.path().to_str() {
384                        Some(path) => {
385                            let event = FileEvent {
386                                path: path.to_string(),
387                                event_kind: FileEventKind::Discovered,
388                                file_kind: FileKind::Directory,
389                            };
390                            let event_tx = event_tx.clone();
391                            tokio_runtime.block_on(async move {
392                                event_tx.send(event).await;
393                            })
394                        }
395                        None => {
396                            return;
397                        }
398                    },
399                    Err(error) => {
400                        eprintln!("Error when walking filesystem: {}", error);
401                    }
402                }
403            }
404        });
405
406        Ok(event_rx)
407    }
408
409    fn watch(&mut self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
410        let (tx, rx) = mpsc::channel();
411        let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
412
413        let tokio_runtime = tokio::runtime::Builder::new_current_thread()
414            .enable_all()
415            .build()?;
416        let base_dir = self.base_dir.clone();
417
418        thread::spawn(move || {
419            let mut watcher = raw_watcher(tx).unwrap();
420            watcher
421                .watch(base_dir.clone(), RecursiveMode::Recursive)
422                .unwrap();
423
424            loop {
425                match rx.recv() {
426                    Ok(RawEvent {
427                        path: Some(path),
428                        op: Ok(op),
429                        cookie: _,
430                    }) => {
431                        let event_kind = match op {
432                            Op::CREATE => FileEventKind::Create,
433                            _CREATE_WRITE => FileEventKind::Create,
434                            Op::REMOVE => FileEventKind::Delete,
435                            Op::WRITE => FileEventKind::Update,
436                            _x => {
437                                continue;
438                            }
439                        };
440
441                        let file_kind = match path.is_dir() {
442                            true => FileKind::Directory,
443                            false => FileKind::File,
444                        };
445
446                        let event = FileEvent {
447                            path: path.to_str().unwrap().to_string(),
448                            event_kind: event_kind,
449                            file_kind: file_kind,
450                        };
451
452                        let event_tx = event_tx.clone();
453                        tokio_runtime.block_on(async move {
454                            event_tx.send(event).await;
455                        })
456                    }
457                    Ok(event) => {
458                        eprintln!("file_access broken event: {:?}", event);
459                    }
460                    Err(error) => {
461                        eprintln!("WATCH ERROR: {}", error);
462                        break;
463                    }
464                }
465            }
466        });
467        Ok(event_rx)
468    }
469}