1use std::cell::RefCell;
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::{io, thread};
8
9use dangerous_option::DangerousOption as AutoOption;
10use serde::{Deserialize, Serialize};
11use sgdata::SGData;
12use slog::{o, trace};
13use slog::{Level, Logger};
14use slog_perf::TimeReporter;
15use url::Url;
16
17pub(crate) mod local;
18pub(crate) use self::local::Local;
19
20#[cfg(feature = "backend-b2")]
21pub(crate) mod b2;
22#[cfg(feature = "backend-b2")]
23pub(crate) use self::b2::B2;
24
25pub(crate) mod backend;
26use self::backend::*;
27
28struct WriteArgs {
30 path: PathBuf,
31 data: SGData,
32 idempotent: bool,
33 complete_tx: Option<mpsc::Sender<io::Result<()>>>,
34}
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct Metadata {
38 pub len: u64,
39 pub is_file: bool,
40 pub(crate) created: chrono::DateTime<chrono::Utc>,
41}
42
43#[must_use]
49pub struct AsyncIOResult<T> {
50 rx: mpsc::Receiver<io::Result<T>>,
51}
52
53impl<T> AsyncIOResult<T> {
54 pub fn wait(self) -> io::Result<T> {
56 self.rx.recv().expect("No `AsyncIO` thread response")
57 }
58}
59
60#[derive(Clone, Debug)]
61pub struct WriteStats {
62 pub new_chunks: usize,
63 pub new_bytes: u64,
64}
65enum Message {
72 Write(WriteArgs),
73 Read(PathBuf, mpsc::Sender<io::Result<SGData>>),
74 ReadMetadata(PathBuf, mpsc::Sender<io::Result<Metadata>>),
75 List(PathBuf, mpsc::Sender<io::Result<Vec<PathBuf>>>),
76 ListRecursively(PathBuf, mpsc::Sender<io::Result<Vec<PathBuf>>>),
77 Remove(PathBuf, mpsc::Sender<io::Result<()>>),
78 RemoveDirAll(PathBuf, mpsc::Sender<io::Result<()>>),
79 Rename(PathBuf, PathBuf, mpsc::Sender<io::Result<()>>),
80}
81#[derive(Clone)]
88pub struct AsyncIO {
89 shared: Arc<AsyncIOShared>,
91 tx: AutoOption<crossbeam_channel::Sender<Message>>,
94}
95
96impl AsyncIO {
97 pub(crate) fn new(
98 backend: Box<dyn Backend + Send + Sync>,
99 log: Logger,
100 ) -> io::Result<Self> {
101 let thread_num = 4 * num_cpus::get();
102 let (tx, rx) = crossbeam_channel::bounded(thread_num);
103
104 let shared = AsyncIOThreadShared::new();
105
106 let mut spawn_res: Vec<io::Result<_>> = (0..thread_num)
107 .map(|_| {
108 let rx = rx.clone();
109 let shared = shared.clone();
110 let log = log.clone();
111 let backend = backend.new_thread()?;
112 Ok(thread::spawn(move || {
113 let mut thread =
114 AsyncIOThread::new(shared, rx, backend, log);
115 thread.run();
116 }))
117 })
118 .collect();
119
120 drop(rx);
121
122 let mut join = vec![];
123
124 for r in spawn_res.drain(..) {
125 join.push(r?);
126 }
127
128 let shared = AsyncIOShared {
129 join,
130 log: log.clone(),
131 stats: shared,
132 backend,
133 };
134
135 Ok(AsyncIO {
136 shared: Arc::new(shared),
137 tx: AutoOption::new(tx),
138 })
139 }
140
141 pub(crate) fn lock_exclusive(&self) -> io::Result<Box<dyn Lock>> {
142 self.shared.backend.lock_exclusive()
143 }
144
145 pub(crate) fn lock_shared(&self) -> io::Result<Box<dyn Lock>> {
146 self.shared.backend.lock_shared()
147 }
148
149 pub fn stats(&self) -> AsyncIOThreadShared {
150 self.shared.stats.clone()
151 }
152
153 pub fn list(&self, path: PathBuf) -> AsyncIOResult<Vec<PathBuf>> {
154 let (tx, rx) = mpsc::channel();
155 self.tx
156 .send(Message::List(path, tx))
157 .expect("aio tx closed: list");
158 AsyncIOResult { rx }
159 }
160
161 #[allow(dead_code)]
163 pub fn list_recursively(
164 &self,
165 path: PathBuf,
166 ) -> Box<dyn Iterator<Item = io::Result<PathBuf>>> {
167 let (tx, rx) = mpsc::channel();
168 self.tx
169 .send(Message::ListRecursively(path, tx))
170 .expect("aio tx closed: list_recursively");
171
172 let iter = rx.into_iter().flat_map(|batch| match batch {
173 Ok(batch) => Box::new(batch.into_iter().map(Ok))
174 as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
175 Err(e) => Box::new(Some(Err(e)).into_iter())
176 as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
177 });
178 Box::new(iter)
179 }
180
181 pub fn write(&self, path: PathBuf, sg: SGData) -> AsyncIOResult<()> {
182 let (tx, rx) = mpsc::channel();
183 self.tx
184 .send(Message::Write(WriteArgs {
185 path,
186 data: sg,
187 idempotent: false,
188 complete_tx: Some(tx),
189 }))
190 .expect("aio tx closed: write");
191 AsyncIOResult { rx }
192 }
193
194 #[allow(dead_code)]
196 pub fn write_idempotent(
197 &self,
198 path: PathBuf,
199 sg: SGData,
200 ) -> AsyncIOResult<()> {
201 let (tx, rx) = mpsc::channel();
202 self.tx
203 .send(Message::Write(WriteArgs {
204 path,
205 data: sg,
206 idempotent: true,
207 complete_tx: Some(tx),
208 }))
209 .expect("aio tx closed: write_idempotent");
210 AsyncIOResult { rx }
211 }
212
213 #[allow(dead_code)]
217 pub fn write_checked(&self, path: PathBuf, sg: SGData) {
218 self.tx
219 .send(Message::Write(WriteArgs {
220 path,
221 data: sg,
222 idempotent: false,
223 complete_tx: None,
224 }))
225 .expect("aio tx closed: write_checked");
226 }
227
228 pub fn write_checked_idempotent(&self, path: PathBuf, sg: SGData) {
229 self.tx
230 .send(Message::Write(WriteArgs {
231 path,
232 data: sg,
233 idempotent: true,
234 complete_tx: None,
235 }))
236 .expect("aio tx closed: write_checked_idempotent");
237 }
238
239 pub fn read(&self, path: PathBuf) -> AsyncIOResult<SGData> {
240 let (tx, rx) = mpsc::channel();
241 self.tx
242 .send(Message::Read(path, tx))
243 .expect("aio tx closed: read");
244 AsyncIOResult { rx }
245 }
246
247 pub(crate) fn read_metadata(
248 &self,
249 path: PathBuf,
250 ) -> AsyncIOResult<Metadata> {
251 let (tx, rx) = mpsc::channel();
252 self.tx
253 .send(Message::ReadMetadata(path, tx))
254 .expect("aio tx closed: read_metadata");
255 AsyncIOResult { rx }
256 }
257
258 pub fn remove(&self, path: PathBuf) -> AsyncIOResult<()> {
259 let (tx, rx) = mpsc::channel();
260 self.tx
261 .send(Message::Remove(path, tx))
262 .expect("aio tx closed: remove");
263 AsyncIOResult { rx }
264 }
265
266 pub fn remove_dir_all(&self, path: PathBuf) -> AsyncIOResult<()> {
267 let (tx, rx) = mpsc::channel();
268 self.tx
269 .send(Message::RemoveDirAll(path, tx))
270 .expect("aio tx closed: remove_dir_all");
271 AsyncIOResult { rx }
272 }
273
274 pub fn rename(&self, src: PathBuf, dst: PathBuf) -> AsyncIOResult<()> {
275 let (tx, rx) = mpsc::channel();
276 self.tx
277 .send(Message::Rename(src, dst, tx))
278 .expect("aio tx closed: rename");
279 AsyncIOResult { rx }
280 }
281}
282
283impl Drop for AsyncIO {
284 fn drop(&mut self) {
285 AutoOption::take_unchecked(&mut self.tx);
289 }
290}
291pub struct AsyncIOShared {
298 join: Vec<thread::JoinHandle<()>>,
299 log: slog::Logger,
300 stats: AsyncIOThreadShared,
301 backend: Box<dyn Backend + Send + Sync>,
302}
303
304impl Drop for AsyncIOShared {
305 fn drop(&mut self) {
306 trace!(self.log, "Waiting for all threads to finish");
307 for join in self.join.drain(..) {
308 join.join().expect("AsyncIO worker thread panicked")
309 }
310 }
311}
312
313struct AsyncIOSharedInner {
314 write_stats: WriteStats,
316 in_progress: HashSet<PathBuf>,
319}
320
321impl Drop for AsyncIOSharedInner {
322 fn drop(&mut self) {
323 debug_assert!(self.in_progress.is_empty());
324 }
325}
326
327#[derive(Clone)]
328pub struct AsyncIOThreadShared {
329 inner: Arc<Mutex<AsyncIOSharedInner>>,
330}
331
332impl AsyncIOThreadShared {
333 pub fn new() -> Self {
334 let inner = AsyncIOSharedInner {
335 write_stats: WriteStats {
336 new_bytes: 0,
337 new_chunks: 0,
338 },
339 in_progress: Default::default(),
340 };
341
342 AsyncIOThreadShared {
343 inner: Arc::new(Mutex::new(inner)),
344 }
345 }
346
347 pub fn get_stats(&self) -> WriteStats {
348 let sh = self.inner.lock().unwrap();
349 sh.write_stats.clone()
350 }
351}
352struct AsyncIOThread {
357 shared: AsyncIOThreadShared,
358 rx: crossbeam_channel::Receiver<Message>,
359 log: Logger,
360 time_reporter: TimeReporter,
361 backend: RefCell<Box<dyn BackendThread>>,
362}
363
364struct PendingGuard<'a, 'b>(&'a AsyncIOThread, &'b Path);
366
367impl<'a, 'b> Drop for PendingGuard<'a, 'b> {
368 fn drop(&mut self) {
369 let mut sh = self.0.shared.inner.lock().unwrap();
370 sh.in_progress.remove(self.1);
371 }
372}
373
374impl AsyncIOThread {
375 fn new(
376 shared: AsyncIOThreadShared,
377 rx: crossbeam_channel::Receiver<Message>,
378 backend: Box<dyn BackendThread>,
379 log: Logger,
380 ) -> Self {
381 let t = TimeReporter::new_with_level(
382 "chunk-writer",
383 log.clone(),
384 Level::Debug,
385 );
386 AsyncIOThread {
387 log: log.new(o!("module" => "asyncio")),
388 shared,
389 rx,
390 time_reporter: t,
391 backend: RefCell::new(backend),
392 }
393 }
394
395 pub fn run(&mut self) {
396 loop {
397 self.time_reporter.start("rx");
398
399 if let Ok(msg) = self.rx.recv() {
400 match msg {
401 Message::Write(WriteArgs {
402 path,
403 data,
404 idempotent,
405 complete_tx,
406 }) => self.write(path, data, idempotent, complete_tx),
407 Message::Read(path, tx) => self.read(path, tx),
408 Message::ReadMetadata(path, tx) => {
409 self.read_metadata(path, tx)
410 }
411 Message::List(path, tx) => self.list(path, tx),
412 Message::ListRecursively(path, tx) => {
413 self.list_recursively(path, tx)
414 }
415 Message::Remove(path, tx) => self.remove(path, tx),
416 Message::RemoveDirAll(path, tx) => {
417 self.remove_dir_all(path, tx)
418 }
419 Message::Rename(src_path, dst_path, tx) => {
420 self.rename(src_path, dst_path, tx)
421 }
422 }
423 } else {
424 break;
425 }
426 }
427 }
428
429 fn write_inner(
430 &mut self,
431 path: PathBuf,
432 sg: SGData,
433 idempotent: bool,
434 ) -> io::Result<()> {
435 loop {
438 let mut sh = self.shared.inner.lock().unwrap();
439
440 if sh.in_progress.contains(&path) {
441 if idempotent {
442 return Ok(());
443 } else {
444 drop(sh);
447 thread::sleep(std::time::Duration::from_millis(1000));
448 }
449 } else {
450 sh.in_progress.insert(path.clone());
451 break;
452 }
453 }
454
455 let len = sg.len();
456 let res = self
457 .backend
458 .borrow_mut()
459 .write(path.clone(), sg, idempotent);
460 {
461 let mut sh = self.shared.inner.lock().unwrap();
462 sh.in_progress.remove(&path);
463 sh.write_stats.new_bytes += len as u64;
464 sh.write_stats.new_chunks += 1;
465 }
466
467 res
468 }
469
470 fn write(
471 &mut self,
472 path: PathBuf,
473 sg: SGData,
474 idempotent: bool,
475 tx: Option<mpsc::Sender<io::Result<()>>>,
476 ) {
477 trace!(self.log, "write"; "path" => %path.display());
478
479 self.time_reporter.start("read");
480 let res = self.write_inner(path, sg, idempotent);
481
482 if let Some(tx) = tx {
483 self.time_reporter.start("write send response");
484 tx.send(res).expect("send failed")
485 } else {
486 res.unwrap();
487 }
488 }
489
490 fn pending_wait_and_insert<'a, 'path>(
491 &'a self,
492 path: &'path Path,
493 ) -> PendingGuard<'a, 'path> {
494 loop {
495 let mut sh = self.shared.inner.lock().unwrap();
496
497 if sh.in_progress.contains(path) {
498 drop(sh);
501 thread::sleep(std::time::Duration::from_millis(1000));
502 } else {
503 sh.in_progress.insert(path.to_path_buf());
504 break;
505 }
506 }
507 PendingGuard(self, &path)
508 }
509
510 fn read(&mut self, path: PathBuf, tx: mpsc::Sender<io::Result<SGData>>) {
511 trace!(self.log, "read"; "path" => %path.display());
512
513 self.time_reporter.start("read");
514 let res = {
515 let _guard = self.pending_wait_and_insert(&path);
516 self.backend.borrow_mut().read(path.clone())
517 };
518 self.time_reporter.start("read send response");
519 tx.send(res).expect("send failed")
520 }
521
522 fn read_metadata(
523 &mut self,
524 path: PathBuf,
525 tx: mpsc::Sender<io::Result<Metadata>>,
526 ) {
527 trace!(self.log, "read-metadata"; "path" => %path.display());
528
529 self.time_reporter.start("read-metadata");
530 let res = {
531 let _guard = self.pending_wait_and_insert(&path);
532 self.backend.borrow_mut().read_metadata(path.clone())
533 };
534
535 self.time_reporter.start("read send response");
536 tx.send(res).expect("send failed")
537 }
538
539 fn list(
540 &mut self,
541 path: PathBuf,
542 tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
543 ) {
544 trace!(self.log, "list"; "path" => %path.display());
545
546 self.time_reporter.start("list");
547 let res = self.backend.borrow_mut().list(path);
548 self.time_reporter.start("list send response");
549 tx.send(res).expect("send failed")
550 }
551
552 fn list_recursively(
553 &mut self,
554 path: PathBuf,
555 tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
556 ) {
557 trace!(self.log, "list"; "path" => %path.display());
558 self.time_reporter.start("list");
559
560 self.backend.borrow_mut().list_recursively(path, tx)
561 }
562
563 fn remove(&mut self, path: PathBuf, tx: mpsc::Sender<io::Result<()>>) {
564 trace!(self.log, "remove"; "path" => %path.display());
565
566 self.time_reporter.start("remove");
567 let res = {
568 let _guard = self.pending_wait_and_insert(&path);
569 self.backend.borrow_mut().remove(path.clone())
570 };
571 self.time_reporter.start("remove send response");
572 tx.send(res).expect("send failed")
573 }
574
575 fn remove_dir_all(
576 &mut self,
577 path: PathBuf,
578 tx: mpsc::Sender<io::Result<()>>,
579 ) {
580 trace!(self.log, "remove-dir-all"; "path" => %path.display());
581
582 self.time_reporter.start("remove-dir-all");
583 let res = self.backend.borrow_mut().remove_dir_all(path);
584
585 self.time_reporter.start("remove send response");
586 tx.send(res).expect("send failed")
587 }
588
589 fn rename(
590 &mut self,
591 src_path: PathBuf,
592 dst_path: PathBuf,
593 tx: mpsc::Sender<io::Result<()>>,
594 ) {
595 trace!(
596 self.log,
597 "rename";
598 "src-path" => %src_path.display(),
599 "dst-path" => %dst_path.display()
600 );
601
602 self.time_reporter.start("rename");
603 let res = {
604 let _guard = self.pending_wait_and_insert(&src_path);
605 let _guard = self.pending_wait_and_insert(&dst_path);
606 self.backend
607 .borrow_mut()
608 .rename(src_path.clone(), dst_path.clone())
609 };
610 self.time_reporter.start("remove send response");
611 tx.send(res).expect("send failed")
612 }
613}
614pub(crate) fn backend_from_url(
628 url: &Url,
629) -> io::Result<Box<dyn Backend + Send + Sync>> {
630 if url.scheme() == "file" {
631 return Ok(Box::new(Local::new(url.to_file_path().unwrap())));
632 } else if url.scheme() == "b2" {
633 #[cfg(feature = "backend-b2")]
634 {
635 let id = url.path();
636 let bucket = url.fragment().ok_or_else(|| {
637 io::Error::new(
638 io::ErrorKind::InvalidInput,
639 "bucket in the url missing",
640 )
641 })?;
642 let key = std::env::var_os("RDEDUP_B2_KEY")
643 .ok_or_else(|| {
644 io::Error::new(
645 io::ErrorKind::InvalidInput,
646 "RDEDUP_B2_KEY environment variable not found",
647 )
648 })?
649 .into_string()
650 .map_err(|os_string| {
651 io::Error::new(
652 io::ErrorKind::InvalidInput,
653 format!(
654 "b2 key is not utf8 string: {}",
655 os_string.to_string_lossy()
656 ),
657 )
658 })?;
659 return Ok(Box::new(B2::new(id, bucket, &key)));
660 }
661
662 #[cfg(not(feature = "backend-b2"))]
663 {
664 panic!("Backblaze B2 backend feature not enabled");
665 }
666 }
667
668 Err(io::Error::new(
669 io::ErrorKind::InvalidData,
670 format!("Unsupported scheme: {}", url.scheme()),
671 ))
672}
673
674