rdedup_lib/aio/
mod.rs

1//! Asynchronous IO operations & backends
2use std::cell::RefCell;
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::{io, thread};
8
9use dangerous_option::DangerousOption as AutoOption;
10use serde::{Deserialize, Serialize};
11use sgdata::SGData;
12use slog::{o, trace};
13use slog::{Level, Logger};
14use slog_perf::TimeReporter;
15use url::Url;
16
17pub(crate) mod local;
18pub(crate) use self::local::Local;
19
20#[cfg(feature = "backend-b2")]
21pub(crate) mod b2;
22#[cfg(feature = "backend-b2")]
23pub(crate) use self::b2::B2;
24
25pub(crate) mod backend;
26use self::backend::*;
27
28// {{{ Misc
29struct WriteArgs {
30    path: PathBuf,
31    data: SGData,
32    idempotent: bool,
33    complete_tx: Option<mpsc::Sender<io::Result<()>>>,
34}
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct Metadata {
38    pub len: u64,
39    pub is_file: bool,
40    pub(crate) created: chrono::DateTime<chrono::Utc>,
41}
42
43/// A result of async io operation
44///
45/// It behaves a bit like a future/promise. It is happening
46/// in the background, and only calling `wait` will make sure
47/// the operations completed and return result.
48#[must_use]
49pub struct AsyncIOResult<T> {
50    rx: mpsc::Receiver<io::Result<T>>,
51}
52
53impl<T> AsyncIOResult<T> {
54    /// Block until result arrives
55    pub fn wait(self) -> io::Result<T> {
56        self.rx.recv().expect("No `AsyncIO` thread response")
57    }
58}
59
60#[derive(Clone, Debug)]
61pub struct WriteStats {
62    pub new_chunks: usize,
63    pub new_bytes: u64,
64}
65// }}}
66
67// {{{ Message
68/// Message sent to a worker pool
69///
70/// Each type of job
71enum Message {
72    Write(WriteArgs),
73    Read(PathBuf, mpsc::Sender<io::Result<SGData>>),
74    ReadMetadata(PathBuf, mpsc::Sender<io::Result<Metadata>>),
75    List(PathBuf, mpsc::Sender<io::Result<Vec<PathBuf>>>),
76    ListRecursively(PathBuf, mpsc::Sender<io::Result<Vec<PathBuf>>>),
77    Remove(PathBuf, mpsc::Sender<io::Result<()>>),
78    RemoveDirAll(PathBuf, mpsc::Sender<io::Result<()>>),
79    Rename(PathBuf, PathBuf, mpsc::Sender<io::Result<()>>),
80}
81// }}}
82
83// {{{ AsyncIO
84/// A handle to a async-io worker pool
85///
86/// This object abstracts away asynchronous operations on the backend.
87#[derive(Clone)]
88pub struct AsyncIO {
89    /// Data shared between threads of the pool.
90    shared: Arc<AsyncIOShared>,
91    /// tx endpoind of mpmc queue used to send jobs
92    /// to the pool.
93    tx: AutoOption<crossbeam_channel::Sender<Message>>,
94}
95
96impl AsyncIO {
97    pub(crate) fn new(
98        backend: Box<dyn Backend + Send + Sync>,
99        log: Logger,
100    ) -> io::Result<Self> {
101        let thread_num = 4 * num_cpus::get();
102        let (tx, rx) = crossbeam_channel::bounded(thread_num);
103
104        let shared = AsyncIOThreadShared::new();
105
106        let mut spawn_res: Vec<io::Result<_>> = (0..thread_num)
107            .map(|_| {
108                let rx = rx.clone();
109                let shared = shared.clone();
110                let log = log.clone();
111                let backend = backend.new_thread()?;
112                Ok(thread::spawn(move || {
113                    let mut thread =
114                        AsyncIOThread::new(shared, rx, backend, log);
115                    thread.run();
116                }))
117            })
118            .collect();
119
120        drop(rx);
121
122        let mut join = vec![];
123
124        for r in spawn_res.drain(..) {
125            join.push(r?);
126        }
127
128        let shared = AsyncIOShared {
129            join,
130            log: log.clone(),
131            stats: shared,
132            backend,
133        };
134
135        Ok(AsyncIO {
136            shared: Arc::new(shared),
137            tx: AutoOption::new(tx),
138        })
139    }
140
141    pub(crate) fn lock_exclusive(&self) -> io::Result<Box<dyn Lock>> {
142        self.shared.backend.lock_exclusive()
143    }
144
145    pub(crate) fn lock_shared(&self) -> io::Result<Box<dyn Lock>> {
146        self.shared.backend.lock_shared()
147    }
148
149    pub fn stats(&self) -> AsyncIOThreadShared {
150        self.shared.stats.clone()
151    }
152
153    pub fn list(&self, path: PathBuf) -> AsyncIOResult<Vec<PathBuf>> {
154        let (tx, rx) = mpsc::channel();
155        self.tx
156            .send(Message::List(path, tx))
157            .expect("aio tx closed: list");
158        AsyncIOResult { rx }
159    }
160
161    // TODO: No need for it anymore?
162    #[allow(dead_code)]
163    pub fn list_recursively(
164        &self,
165        path: PathBuf,
166    ) -> Box<dyn Iterator<Item = io::Result<PathBuf>>> {
167        let (tx, rx) = mpsc::channel();
168        self.tx
169            .send(Message::ListRecursively(path, tx))
170            .expect("aio tx closed: list_recursively");
171
172        let iter = rx.into_iter().flat_map(|batch| match batch {
173            Ok(batch) => Box::new(batch.into_iter().map(Ok))
174                as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
175            Err(e) => Box::new(Some(Err(e)).into_iter())
176                as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
177        });
178        Box::new(iter)
179    }
180
181    pub fn write(&self, path: PathBuf, sg: SGData) -> AsyncIOResult<()> {
182        let (tx, rx) = mpsc::channel();
183        self.tx
184            .send(Message::Write(WriteArgs {
185                path,
186                data: sg,
187                idempotent: false,
188                complete_tx: Some(tx),
189            }))
190            .expect("aio tx closed: write");
191        AsyncIOResult { rx }
192    }
193
194    // TODO: No need for it anymore
195    #[allow(dead_code)]
196    pub fn write_idempotent(
197        &self,
198        path: PathBuf,
199        sg: SGData,
200    ) -> AsyncIOResult<()> {
201        let (tx, rx) = mpsc::channel();
202        self.tx
203            .send(Message::Write(WriteArgs {
204                path,
205                data: sg,
206                idempotent: true,
207                complete_tx: Some(tx),
208            }))
209            .expect("aio tx closed: write_idempotent");
210        AsyncIOResult { rx }
211    }
212
213    /// Will panic the worker thread if fails, but does not require
214    /// managing the result
215    // TODO: No need for it anymore
216    #[allow(dead_code)]
217    pub fn write_checked(&self, path: PathBuf, sg: SGData) {
218        self.tx
219            .send(Message::Write(WriteArgs {
220                path,
221                data: sg,
222                idempotent: false,
223                complete_tx: None,
224            }))
225            .expect("aio tx closed: write_checked");
226    }
227
228    pub fn write_checked_idempotent(&self, path: PathBuf, sg: SGData) {
229        self.tx
230            .send(Message::Write(WriteArgs {
231                path,
232                data: sg,
233                idempotent: true,
234                complete_tx: None,
235            }))
236            .expect("aio tx closed: write_checked_idempotent");
237    }
238
239    pub fn read(&self, path: PathBuf) -> AsyncIOResult<SGData> {
240        let (tx, rx) = mpsc::channel();
241        self.tx
242            .send(Message::Read(path, tx))
243            .expect("aio tx closed: read");
244        AsyncIOResult { rx }
245    }
246
247    pub(crate) fn read_metadata(
248        &self,
249        path: PathBuf,
250    ) -> AsyncIOResult<Metadata> {
251        let (tx, rx) = mpsc::channel();
252        self.tx
253            .send(Message::ReadMetadata(path, tx))
254            .expect("aio tx closed: read_metadata");
255        AsyncIOResult { rx }
256    }
257
258    pub fn remove(&self, path: PathBuf) -> AsyncIOResult<()> {
259        let (tx, rx) = mpsc::channel();
260        self.tx
261            .send(Message::Remove(path, tx))
262            .expect("aio tx closed: remove");
263        AsyncIOResult { rx }
264    }
265
266    pub fn remove_dir_all(&self, path: PathBuf) -> AsyncIOResult<()> {
267        let (tx, rx) = mpsc::channel();
268        self.tx
269            .send(Message::RemoveDirAll(path, tx))
270            .expect("aio tx closed: remove_dir_all");
271        AsyncIOResult { rx }
272    }
273
274    pub fn rename(&self, src: PathBuf, dst: PathBuf) -> AsyncIOResult<()> {
275        let (tx, rx) = mpsc::channel();
276        self.tx
277            .send(Message::Rename(src, dst, tx))
278            .expect("aio tx closed: rename");
279        AsyncIOResult { rx }
280    }
281}
282
283impl Drop for AsyncIO {
284    fn drop(&mut self) {
285        // It is important that the tx is dropped before `shared` is.
286        // Otherwise join on worker threads will hang, as they are never
287        // going to receive termination.
288        AutoOption::take_unchecked(&mut self.tx);
289    }
290}
291// }}}
292
293// {{{ AsyncIOShared & internals
294/// Arc-ed shared `AsyncIO` data
295///
296/// Bunch of stuff shared between each thread of the worker pool
297pub struct AsyncIOShared {
298    join: Vec<thread::JoinHandle<()>>,
299    log: slog::Logger,
300    stats: AsyncIOThreadShared,
301    backend: Box<dyn Backend + Send + Sync>,
302}
303
304impl Drop for AsyncIOShared {
305    fn drop(&mut self) {
306        trace!(self.log, "Waiting for all threads to finish");
307        for join in self.join.drain(..) {
308            join.join().expect("AsyncIO worker thread panicked")
309        }
310    }
311}
312
313struct AsyncIOSharedInner {
314    /// Keeps tracks of `write` stats.
315    write_stats: WriteStats,
316    /// PathBufs being currently processed by the pool.
317    /// Used to synchronize operations between each other.
318    in_progress: HashSet<PathBuf>,
319}
320
321impl Drop for AsyncIOSharedInner {
322    fn drop(&mut self) {
323        debug_assert!(self.in_progress.is_empty());
324    }
325}
326
327#[derive(Clone)]
328pub struct AsyncIOThreadShared {
329    inner: Arc<Mutex<AsyncIOSharedInner>>,
330}
331
332impl AsyncIOThreadShared {
333    pub fn new() -> Self {
334        let inner = AsyncIOSharedInner {
335            write_stats: WriteStats {
336                new_bytes: 0,
337                new_chunks: 0,
338            },
339            in_progress: Default::default(),
340        };
341
342        AsyncIOThreadShared {
343            inner: Arc::new(Mutex::new(inner)),
344        }
345    }
346
347    pub fn get_stats(&self) -> WriteStats {
348        let sh = self.inner.lock().unwrap();
349        sh.write_stats.clone()
350    }
351}
352// }}}
353
354// {{{ AsyncIOThread
355/// A single thread in the worker pool.
356struct AsyncIOThread {
357    shared: AsyncIOThreadShared,
358    rx: crossbeam_channel::Receiver<Message>,
359    log: Logger,
360    time_reporter: TimeReporter,
361    backend: RefCell<Box<dyn BackendThread>>,
362}
363
364/// Guard that removes entry from the pending paths on drop
365struct PendingGuard<'a, 'b>(&'a AsyncIOThread, &'b Path);
366
367impl<'a, 'b> Drop for PendingGuard<'a, 'b> {
368    fn drop(&mut self) {
369        let mut sh = self.0.shared.inner.lock().unwrap();
370        sh.in_progress.remove(self.1);
371    }
372}
373
374impl AsyncIOThread {
375    fn new(
376        shared: AsyncIOThreadShared,
377        rx: crossbeam_channel::Receiver<Message>,
378        backend: Box<dyn BackendThread>,
379        log: Logger,
380    ) -> Self {
381        let t = TimeReporter::new_with_level(
382            "chunk-writer",
383            log.clone(),
384            Level::Debug,
385        );
386        AsyncIOThread {
387            log: log.new(o!("module" => "asyncio")),
388            shared,
389            rx,
390            time_reporter: t,
391            backend: RefCell::new(backend),
392        }
393    }
394
395    pub fn run(&mut self) {
396        loop {
397            self.time_reporter.start("rx");
398
399            if let Ok(msg) = self.rx.recv() {
400                match msg {
401                    Message::Write(WriteArgs {
402                        path,
403                        data,
404                        idempotent,
405                        complete_tx,
406                    }) => self.write(path, data, idempotent, complete_tx),
407                    Message::Read(path, tx) => self.read(path, tx),
408                    Message::ReadMetadata(path, tx) => {
409                        self.read_metadata(path, tx)
410                    }
411                    Message::List(path, tx) => self.list(path, tx),
412                    Message::ListRecursively(path, tx) => {
413                        self.list_recursively(path, tx)
414                    }
415                    Message::Remove(path, tx) => self.remove(path, tx),
416                    Message::RemoveDirAll(path, tx) => {
417                        self.remove_dir_all(path, tx)
418                    }
419                    Message::Rename(src_path, dst_path, tx) => {
420                        self.rename(src_path, dst_path, tx)
421                    }
422                }
423            } else {
424                break;
425            }
426        }
427    }
428
429    fn write_inner(
430        &mut self,
431        path: PathBuf,
432        sg: SGData,
433        idempotent: bool,
434    ) -> io::Result<()> {
435        // check `in_progress` and add atomically
436        // if not already there
437        loop {
438            let mut sh = self.shared.inner.lock().unwrap();
439
440            if sh.in_progress.contains(&path) {
441                if idempotent {
442                    return Ok(());
443                } else {
444                    // a bit lame, but will do, since this should not really
445                    // happen in practice anyway
446                    drop(sh);
447                    thread::sleep(std::time::Duration::from_millis(1000));
448                }
449            } else {
450                sh.in_progress.insert(path.clone());
451                break;
452            }
453        }
454
455        let len = sg.len();
456        let res = self
457            .backend
458            .borrow_mut()
459            .write(path.clone(), sg, idempotent);
460        {
461            let mut sh = self.shared.inner.lock().unwrap();
462            sh.in_progress.remove(&path);
463            sh.write_stats.new_bytes += len as u64;
464            sh.write_stats.new_chunks += 1;
465        }
466
467        res
468    }
469
470    fn write(
471        &mut self,
472        path: PathBuf,
473        sg: SGData,
474        idempotent: bool,
475        tx: Option<mpsc::Sender<io::Result<()>>>,
476    ) {
477        trace!(self.log, "write"; "path" => %path.display());
478
479        self.time_reporter.start("read");
480        let res = self.write_inner(path, sg, idempotent);
481
482        if let Some(tx) = tx {
483            self.time_reporter.start("write send response");
484            tx.send(res).expect("send failed")
485        } else {
486            res.unwrap();
487        }
488    }
489
490    fn pending_wait_and_insert<'a, 'path>(
491        &'a self,
492        path: &'path Path,
493    ) -> PendingGuard<'a, 'path> {
494        loop {
495            let mut sh = self.shared.inner.lock().unwrap();
496
497            if sh.in_progress.contains(path) {
498                // a bit lame, but will do, since this should not really
499                // happen in practice anyway
500                drop(sh);
501                thread::sleep(std::time::Duration::from_millis(1000));
502            } else {
503                sh.in_progress.insert(path.to_path_buf());
504                break;
505            }
506        }
507        PendingGuard(self, &path)
508    }
509
510    fn read(&mut self, path: PathBuf, tx: mpsc::Sender<io::Result<SGData>>) {
511        trace!(self.log, "read"; "path" => %path.display());
512
513        self.time_reporter.start("read");
514        let res = {
515            let _guard = self.pending_wait_and_insert(&path);
516            self.backend.borrow_mut().read(path.clone())
517        };
518        self.time_reporter.start("read send response");
519        tx.send(res).expect("send failed")
520    }
521
522    fn read_metadata(
523        &mut self,
524        path: PathBuf,
525        tx: mpsc::Sender<io::Result<Metadata>>,
526    ) {
527        trace!(self.log, "read-metadata"; "path" => %path.display());
528
529        self.time_reporter.start("read-metadata");
530        let res = {
531            let _guard = self.pending_wait_and_insert(&path);
532            self.backend.borrow_mut().read_metadata(path.clone())
533        };
534
535        self.time_reporter.start("read send response");
536        tx.send(res).expect("send failed")
537    }
538
539    fn list(
540        &mut self,
541        path: PathBuf,
542        tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
543    ) {
544        trace!(self.log, "list"; "path" => %path.display());
545
546        self.time_reporter.start("list");
547        let res = self.backend.borrow_mut().list(path);
548        self.time_reporter.start("list send response");
549        tx.send(res).expect("send failed")
550    }
551
552    fn list_recursively(
553        &mut self,
554        path: PathBuf,
555        tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
556    ) {
557        trace!(self.log, "list"; "path" => %path.display());
558        self.time_reporter.start("list");
559
560        self.backend.borrow_mut().list_recursively(path, tx)
561    }
562
563    fn remove(&mut self, path: PathBuf, tx: mpsc::Sender<io::Result<()>>) {
564        trace!(self.log, "remove"; "path" => %path.display());
565
566        self.time_reporter.start("remove");
567        let res = {
568            let _guard = self.pending_wait_and_insert(&path);
569            self.backend.borrow_mut().remove(path.clone())
570        };
571        self.time_reporter.start("remove send response");
572        tx.send(res).expect("send failed")
573    }
574
575    fn remove_dir_all(
576        &mut self,
577        path: PathBuf,
578        tx: mpsc::Sender<io::Result<()>>,
579    ) {
580        trace!(self.log, "remove-dir-all"; "path" => %path.display());
581
582        self.time_reporter.start("remove-dir-all");
583        let res = self.backend.borrow_mut().remove_dir_all(path);
584
585        self.time_reporter.start("remove send response");
586        tx.send(res).expect("send failed")
587    }
588
589    fn rename(
590        &mut self,
591        src_path: PathBuf,
592        dst_path: PathBuf,
593        tx: mpsc::Sender<io::Result<()>>,
594    ) {
595        trace!(
596            self.log,
597            "rename";
598            "src-path" => %src_path.display(),
599            "dst-path" => %dst_path.display()
600        );
601
602        self.time_reporter.start("rename");
603        let res = {
604            let _guard = self.pending_wait_and_insert(&src_path);
605            let _guard = self.pending_wait_and_insert(&dst_path);
606            self.backend
607                .borrow_mut()
608                .rename(src_path.clone(), dst_path.clone())
609        };
610        self.time_reporter.start("remove send response");
611        tx.send(res).expect("send failed")
612    }
613}
614// }}}
615
616/// Converts the given `url` into a backend instance.
617///
618/// # Panics
619///
620/// Panics if the `url` specifies a supported backend schema that is not enabled through future
621/// flags
622//
623// ```norust
624// let s = "file:/foo/bar";
625// let s = "b2:myid#bucket";
626// ```
627pub(crate) fn backend_from_url(
628    url: &Url,
629) -> io::Result<Box<dyn Backend + Send + Sync>> {
630    if url.scheme() == "file" {
631        return Ok(Box::new(Local::new(url.to_file_path().unwrap())));
632    } else if url.scheme() == "b2" {
633        #[cfg(feature = "backend-b2")]
634        {
635            let id = url.path();
636            let bucket = url.fragment().ok_or_else(|| {
637                io::Error::new(
638                    io::ErrorKind::InvalidInput,
639                    "bucket in the url missing",
640                )
641            })?;
642            let key = std::env::var_os("RDEDUP_B2_KEY")
643                .ok_or_else(|| {
644                    io::Error::new(
645                        io::ErrorKind::InvalidInput,
646                        "RDEDUP_B2_KEY environment variable not found",
647                    )
648                })?
649                .into_string()
650                .map_err(|os_string| {
651                    io::Error::new(
652                        io::ErrorKind::InvalidInput,
653                        format!(
654                            "b2 key is not utf8 string: {}",
655                            os_string.to_string_lossy()
656                        ),
657                    )
658                })?;
659            return Ok(Box::new(B2::new(id, bucket, &key)));
660        }
661
662        #[cfg(not(feature = "backend-b2"))]
663        {
664            panic!("Backblaze B2 backend feature not enabled");
665        }
666    }
667
668    Err(io::Error::new(
669        io::ErrorKind::InvalidData,
670        format!("Unsupported scheme: {}", url.scheme()),
671    ))
672}
673
674// vim: foldmethod=marker foldmarker={{{,}}}