1use std::collections::HashMap;
2use std::{fs, thread};
3
4use std::fs::{DirBuilder, File};
5
6use std::io::{Read, Write};
7use std::path::PathBuf;
8use std::str::FromStr;
9use std::sync::{mpsc, Arc};
10
11use notify::{raw_watcher, Op, RawEvent, RecursiveMode, Watcher};
12use tokio::io::AsyncReadExt;
13use tokio::time::Duration;
14use walkdir::WalkDir;
15
16use crate::error::Error;
17use crate::resource::FileKind;
18use crate::resource::Path;
19
20use crate::util;
21use std::convert::TryFrom;
22use std::convert::TryInto;
23use tokio::fs::ReadDir;
24
25pub enum FileCommand {
26 List {
27 path: Path,
28 tx: tokio::sync::oneshot::Sender<Result<Vec<Path>, Error>>,
29 },
30 Read {
31 path: Path,
32 tx: tokio::sync::oneshot::Sender<Result<Arc<Vec<u8>>, Error>>,
33 },
34 Write {
35 path: Path,
36 data: Arc<Vec<u8>>,
37 tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
38 },
39 MkDir {
41 path: Path,
42 tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
43 },
44 Watch {
45 tx: tokio::sync::oneshot::Sender<Result<tokio::sync::mpsc::Receiver<FileEvent>, Error>>,
46 },
47 Walk {
48 tx: tokio::sync::oneshot::Sender<Result<tokio::sync::mpsc::Receiver<FileEvent>, Error>>,
49 },
50 UnZip {
51 source: String,
52 target: String,
53 tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
54 },
55 Shutdown,
56}
57
58#[derive(Clone)]
59pub struct FileAccess {
60 path: String,
61 tx: tokio::sync::mpsc::Sender<FileCommand>,
62}
63
64impl FileAccess {
65 pub fn path(&self) -> String {
66 self.path.clone()
67 }
68
69 pub fn new(path: String) -> Result<Self, Error> {
70 let tx = LocalFileAccess::new(path.clone())?;
71 let path = fs::canonicalize(&path)?
72 .to_str()
73 .ok_or("turning path to string")?
74 .to_string();
75 Ok(FileAccess { path: path, tx: tx })
76 }
77
78 pub async fn list(&self, path: &Path) -> Result<Vec<Path>, Error> {
79 let (tx, rx) = tokio::sync::oneshot::channel();
80 self.tx
81 .send(FileCommand::List {
82 path: path.clone(),
83 tx: tx,
84 })
85 .await?;
86 Ok(util::wait_for_it(rx).await?)
87 }
88
89 pub async fn read(&self, path: &Path) -> Result<Arc<Vec<u8>>, Error> {
90 let (tx, rx) = tokio::sync::oneshot::channel();
91 self.tx
92 .send(FileCommand::Read {
93 path: path.clone(),
94 tx: tx,
95 })
96 .await?;
97 Ok(util::wait_for_it(rx).await?)
98 }
99
100 pub async fn write(&self, path: &Path, data: Arc<Vec<u8>>) -> Result<(), Error> {
101 let (tx, rx) = tokio::sync::oneshot::channel();
102 self.tx
103 .send(FileCommand::Write {
104 path: path.clone(),
105 data,
106 tx,
107 })
108 .await?;
109 Ok(util::wait_for_it(rx).await?)
110 }
111
112 pub fn with_path(&self, path: String) -> Result<FileAccess, Error> {
122 let path = format!("{}/{}", self.path, path);
123 Ok(FileAccess::new(path)?)
124 }
125
126 pub async fn unzip(&self, source: String, target: String) -> Result<(), Error> {
127 let (tx, rx) = tokio::sync::oneshot::channel();
128 self.tx
129 .send(FileCommand::UnZip { source, target, tx })
130 .await?;
131 Ok(util::wait_for_it_for(rx, Duration::from_secs(60 * 2)).await?)
132 }
133
134 pub async fn mkdir(&self, path: &Path) -> Result<FileAccess, Error> {
135 let (tx, rx) = tokio::sync::oneshot::channel();
136 self.tx
137 .send(FileCommand::MkDir {
138 path: path.clone(),
139 tx,
140 })
141 .await?;
142 util::wait_for_it(rx).await?;
143 self.with_path(path.to_relative())
144 }
145
146 pub async fn watch(&self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
147 let (tx, rx) = tokio::sync::oneshot::channel();
148 self.tx.send(FileCommand::Watch { tx }).await?;
149 Ok(util::wait_for_it(rx).await?)
150 }
151
152 pub async fn walk(&self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
153 let (tx, rx) = tokio::sync::oneshot::channel();
154 self.tx.send(FileCommand::Walk { tx }).await?;
155 Ok(util::wait_for_it(rx).await?)
156 }
157
158 pub fn close(&self) {
159 self.tx.try_send(FileCommand::Shutdown);
160 }
161}
162
163#[derive(Debug, Clone)]
164pub enum FileEventKind {
165 Discovered,
166 Create,
167 Update,
168 Delete,
169}
170
171#[derive(Debug, Clone)]
172pub struct FileEvent {
173 pub path: String,
174 pub event_kind: FileEventKind,
175 pub file_kind: FileKind,
176}
177
178#[derive(Clone)]
179pub struct MemoryFileAccess {
180 map: HashMap<Path, Arc<Vec<u8>>>,
181}
182
183impl MemoryFileAccess {
184 pub fn new() -> Self {
185 MemoryFileAccess {
186 map: HashMap::new(),
187 }
188 }
189}
190
191pub struct LocalFileAccess {
192 base_dir: String,
193 rx: tokio::sync::mpsc::Receiver<FileCommand>,
194}
195
196impl LocalFileAccess {
197 pub fn new(base_dir: String) -> Result<tokio::sync::mpsc::Sender<FileCommand>, Error> {
198 let mut builder = DirBuilder::new();
199 builder.recursive(true);
200 builder.create(base_dir.clone())?;
201
202 let (tx, rx) = tokio::sync::mpsc::channel(128);
203
204 tokio::spawn(async move {
205 Self {
206 base_dir: base_dir,
207 rx: rx,
208 }
209 .run()
210 .await;
211 });
212
213 Ok(tx)
214 }
215
216 async fn run(mut self) {
217 tokio::spawn(async move {
218 while let Option::Some(command) = self.rx.recv().await {
219 if let FileCommand::Shutdown = command {
220 break;
221 }
222
223 match self.process(command).await {
224 Ok(_) => {}
225 Err(error) => {
226 eprintln!("Error in LocalFileAccess: {}", error)
227 }
228 }
229 }
230 });
231 }
232
233 async fn process(&mut self, command: FileCommand) -> Result<(), Error> {
234 match command {
235 FileCommand::List { path, tx } => {
236 tx.send(self.list(path).await).unwrap_or_default();
237 }
238 FileCommand::Read { path, tx } => {
239 tx.send(self.read(&path)).unwrap_or_default();
240 }
241 FileCommand::Write { path, data, tx } => {
242 tx.send(self.write(&path, data)).unwrap_or_default();
243 }
244 FileCommand::MkDir { path, tx } => {
248 tx.send(self.mkdir(&path)).unwrap_or_default();
249 }
250 FileCommand::Watch { tx } => {
251 tx.send(self.watch()).unwrap_or_default();
252 }
253 FileCommand::Walk { tx } => {
254 tx.send(self.walk()).unwrap_or_default();
255 }
256 FileCommand::UnZip { source, target, tx } => {
257 tx.send(self.unzip(source, target)).unwrap_or_default();
258 }
259 FileCommand::Shutdown => {
260 }
262 }
263 Ok(())
264 }
265
266 async fn list(&self, dir_path: Path) -> Result<Vec<Path>, Error> {
267 let path = self.cat_path(dir_path.to_relative().as_str())?;
268 let mut read_dir = tokio::fs::read_dir(path).await?;
269
270 let mut rtn = vec![];
271 while let Result::Ok(Option::Some(entry)) = read_dir.next_entry().await {
272 let entry = Path::make_absolute(
273 entry
274 .file_name()
275 .to_str()
276 .ok_or("expected os str to be able to change to str")?,
277 )?;
278 let entry = dir_path.cat(&entry)?;
279 rtn.push(entry);
280 }
281 Ok(rtn)
282 }
283
284 pub fn cat_path(&self, path: &str) -> Result<String, Error> {
285 if path.len() < 1 {
286 return Err("path cannot be empty".into());
287 }
288
289 let mut path_str = path.to_string();
290 if path_str.starts_with("/") {
291 path_str.remove(0);
292 }
293 let mut path_buf = PathBuf::new();
294 path_buf.push(self.base_dir.clone());
295 path_buf.push(path_str);
296 let path = path_buf.as_path().clone();
297 let path = path.to_str().ok_or("path error")?.to_string();
298
299 Ok(path)
300 }
301}
302
303impl LocalFileAccess {
304 pub fn unzip(&mut self, source: String, target: String) -> Result<(), Error> {
305 let source = format!("{}/{}", self.base_dir, source);
306 let source = File::open(source)?;
307 let mut archive = zip::ZipArchive::new(source)?;
308
309 for i in 0..archive.len() {
310 let mut zip_file = archive.by_index(i)?;
311 if zip_file.is_dir() {
312 let path = Path::from_str(format!("/{}/{}", target, zip_file.name()).as_str())?;
313 self.mkdir(&path)?;
314 } else {
315 let path = format!("{}/{}/{}", self.base_dir, target, zip_file.name());
316 let mut file = fs::File::create(path)?;
317 std::io::copy(&mut zip_file, &mut file)?;
318 }
319 }
320
321 Ok(())
322 }
323
324 pub fn read(&self, path: &Path) -> Result<Arc<Vec<u8>>, Error> {
325 let path = self.cat_path(path.to_relative().as_str())?;
326 let mut buf = vec![];
327 let mut file = match File::open(&path) {
328 Ok(file) => {file}
329 Err(error) => {
330 return Result::Err(format!("{} PATH: {}", error.to_string(), path.to_string() ).into());
331 }
332 };
333 file.read_to_end(&mut buf)?;
334 Ok(Arc::new(buf))
335 }
336
337 pub fn write(&mut self, path: &Path, data: Arc<Vec<u8>>) -> Result<(), Error> {
338 if let Option::Some(parent) = path.parent() {
339 self.mkdir(&parent)?;
340 }
341
342 let path = self.cat_path(path.to_relative().as_str())?;
343 let mut file = File::create(&path)?;
344 file.write(data.as_slice()).unwrap();
345
346 Ok(())
347 }
348
349 fn mkdir(&mut self, path: &Path) -> Result<(), Error> {
366 let path = self.cat_path(path.to_relative().as_str())?;
367 let mut builder = DirBuilder::new();
368 builder.recursive(true);
369 builder.create(path.clone())?;
370 Ok(())
371 }
372
373 fn walk(&mut self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
374 let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
375 let tokio_runtime = tokio::runtime::Builder::new_current_thread()
376 .enable_all()
377 .build()?;
378 let base_dir = self.base_dir.clone();
379
380 thread::spawn(move || {
381 for entry in WalkDir::new(base_dir.clone()) {
382 match entry {
383 Ok(dir_entry) => match dir_entry.path().to_str() {
384 Some(path) => {
385 let event = FileEvent {
386 path: path.to_string(),
387 event_kind: FileEventKind::Discovered,
388 file_kind: FileKind::Directory,
389 };
390 let event_tx = event_tx.clone();
391 tokio_runtime.block_on(async move {
392 event_tx.send(event).await;
393 })
394 }
395 None => {
396 return;
397 }
398 },
399 Err(error) => {
400 eprintln!("Error when walking filesystem: {}", error);
401 }
402 }
403 }
404 });
405
406 Ok(event_rx)
407 }
408
409 fn watch(&mut self) -> Result<tokio::sync::mpsc::Receiver<FileEvent>, Error> {
410 let (tx, rx) = mpsc::channel();
411 let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
412
413 let tokio_runtime = tokio::runtime::Builder::new_current_thread()
414 .enable_all()
415 .build()?;
416 let base_dir = self.base_dir.clone();
417
418 thread::spawn(move || {
419 let mut watcher = raw_watcher(tx).unwrap();
420 watcher
421 .watch(base_dir.clone(), RecursiveMode::Recursive)
422 .unwrap();
423
424 loop {
425 match rx.recv() {
426 Ok(RawEvent {
427 path: Some(path),
428 op: Ok(op),
429 cookie: _,
430 }) => {
431 let event_kind = match op {
432 Op::CREATE => FileEventKind::Create,
433 _CREATE_WRITE => FileEventKind::Create,
434 Op::REMOVE => FileEventKind::Delete,
435 Op::WRITE => FileEventKind::Update,
436 _x => {
437 continue;
438 }
439 };
440
441 let file_kind = match path.is_dir() {
442 true => FileKind::Directory,
443 false => FileKind::File,
444 };
445
446 let event = FileEvent {
447 path: path.to_str().unwrap().to_string(),
448 event_kind: event_kind,
449 file_kind: file_kind,
450 };
451
452 let event_tx = event_tx.clone();
453 tokio_runtime.block_on(async move {
454 event_tx.send(event).await;
455 })
456 }
457 Ok(event) => {
458 eprintln!("file_access broken event: {:?}", event);
459 }
460 Err(error) => {
461 eprintln!("WATCH ERROR: {}", error);
462 break;
463 }
464 }
465 }
466 });
467 Ok(event_rx)
468 }
469}