rdedup_lib/aio/
b2.rs

1#![allow(unused)]
2use std::borrow::BorrowMut;
3use std::cell::RefCell;
4use std::io::Read;
5use std::path::PathBuf;
6use std::sync::mpsc;
7use std::{fs, io};
8
9use backblaze_b2::raw::authorize::{B2Authorization, B2Credentials};
10use backblaze_b2::raw::files::{FileNameListing, MoreFileInfo};
11use backblaze_b2::raw::upload::UploadAuthorization;
12use backblaze_b2::B2Error;
13use hyper::net::HttpsConnector;
14use hyper::Client;
15use hyper_native_tls::NativeTlsClient;
16use sgdata::SGData;
17
18use super::Metadata;
19use super::{Backend, BackendThread};
20use crate::aio;
21use crate::config;
22
23// TODO: make a thread, that keeps updating
24// a timestamp file on the backend
25pub struct Lock {
26    path: PathBuf,
27}
28
29impl Lock {
30    fn new(path: PathBuf) -> Self {
31        Lock { path }
32    }
33}
34
35impl aio::Lock for Lock {}
36
37#[derive(Debug)]
38pub struct B2 {
39    cred: B2Credentials,
40    bucket: String,
41}
42
43pub struct Auth {
44    auth: B2Authorization,
45    upload_auth: UploadAuthorization,
46}
47
48pub struct B2Thread {
49    cred: B2Credentials,
50    auth: RefCell<Option<Auth>>,
51    client: Client,
52    bucket: String,
53}
54
55/// Retry operations that can fail due to network/service issues
56fn retry<F, R>(instance: Option<&B2Thread>, f: F) -> io::Result<R>
57where
58    F: Fn() -> Result<R, B2Error>,
59{
60    let mut backoff = 1;
61    let mut err_counter = 0;
62    loop {
63        let res = f();
64
65        match res {
66            Ok(ok) => return Ok(ok),
67            Err(e) => {
68                err_counter += 1;
69
70                if err_counter > 5 {
71                    return Err(e).map_err(|e| {
72                        io::Error::new(
73                            io::ErrorKind::ConnectionAborted,
74                            format!(
75                                "Gave up b2 operation after {} retries: {}",
76                                err_counter, e
77                            ),
78                        )
79                    });
80                }
81
82                if e.should_back_off() {
83                    std::thread::sleep(std::time::Duration::from_secs(backoff));
84                    backoff *= 2;
85                } else {
86                    backoff = 1;
87                }
88
89                if e.should_obtain_new_authentication() {
90                    if let Some(instance) = instance.as_ref() {
91                        let _ = instance.reauth();
92                    }
93                }
94            }
95        }
96    }
97}
98
99impl B2Thread {
100    fn reauth(&self) -> io::Result<()> {
101        let auth = retry(None, || {
102            let auth = self.cred.authorize(&self.client)?;
103            let upload_auth =
104                auth.get_upload_url(&self.bucket, &self.client)?;
105            Ok((auth, upload_auth))
106        })?;
107        *self.auth.borrow_mut() = Some(Auth {
108            auth: auth.0,
109            upload_auth: auth.1,
110        });
111
112        Ok(())
113    }
114
115    fn new_from_cred(cred: &B2Credentials, bucket: String) -> io::Result<Self> {
116        let ssl = NativeTlsClient::new().map_err(|e| {
117            io::Error::new(
118                io::ErrorKind::ConnectionAborted,
119                format!("Couldn't create `NativeTlsClient`: {}", e),
120            )
121        })?;
122        let connector = HttpsConnector::new(ssl);
123        let client = Client::with_connector(connector);
124
125        let mut i = B2Thread {
126            cred: cred.clone(),
127            client,
128            auth: RefCell::new(None),
129            bucket,
130        };
131
132        i.reauth()?;
133
134        Ok(i)
135    }
136}
137
138impl Backend for B2 {
139    fn lock_exclusive(&self) -> io::Result<Box<dyn aio::Lock>> {
140        Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
141    }
142
143    fn lock_shared(&self) -> io::Result<Box<dyn aio::Lock>> {
144        Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
145    }
146
147    fn new_thread(&self) -> io::Result<Box<dyn BackendThread>> {
148        Ok(Box::new(B2Thread::new_from_cred(
149            &self.cred,
150            self.bucket.clone(),
151        )?))
152    }
153}
154
155impl B2 {
156    pub(crate) fn new(id: &str, bucket: &str, key: &str) -> Self {
157        let cred = B2Credentials {
158            id: id.into(),
159            key: key.into(),
160        };
161
162        B2 {
163            cred,
164            bucket: bucket.into(),
165        }
166    }
167}
168
169impl BackendThread for B2Thread {
170    fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
171        fs::remove_dir_all(&path)
172    }
173
174    fn rename(
175        &mut self,
176        src_path: PathBuf,
177        dst_path: PathBuf,
178    ) -> io::Result<()> {
179        match fs::rename(&src_path, &dst_path) {
180            Ok(_) => Ok(()),
181            Err(_e) => {
182                fs::create_dir_all(dst_path.parent().unwrap())?;
183                fs::rename(&src_path, &dst_path)
184            }
185        }
186    }
187
188    fn write(
189        &mut self,
190        path: PathBuf,
191        sg: SGData,
192        idempotent: bool,
193    ) -> io::Result<()> {
194        Ok(())
195    }
196
197    fn read(&mut self, path: PathBuf) -> io::Result<SGData> {
198        Ok(SGData::empty())
199    }
200
201    fn remove(&mut self, path: PathBuf) -> io::Result<()> {
202        Ok(())
203    }
204
205    fn read_metadata(&mut self, path: PathBuf) -> io::Result<Metadata> {
206        use chrono::TimeZone;
207        let file_info: MoreFileInfo<serde_json::value::Value> =
208            retry(Some(self), || {
209                self.auth
210                    .borrow_mut()
211                    .as_ref()
212                    .unwrap()
213                    .auth
214                    .get_file_info(&path.to_string_lossy(), &self.client)
215            })?;
216
217        let MoreFileInfo {
218            content_length,
219            action,
220            upload_timestamp,
221            ..
222        } = file_info;
223        let created = chrono::Utc.timestamp(upload_timestamp as i64, 0);
224
225        Ok(Metadata {
226            len: content_length,
227            is_file: action == backblaze_b2::raw::files::FileType::File,
228            created,
229        })
230    }
231
232    fn list(&mut self, path: PathBuf) -> io::Result<Vec<PathBuf>> {
233        let mut list: FileNameListing<serde_json::value::Value> =
234            retry(Some(self), || {
235                self.auth
236                    .borrow_mut()
237                    .as_ref()
238                    .unwrap()
239                    .auth
240                    .list_all_file_names(
241                        &self.bucket,
242                        1000,
243                        Some(&path.to_string_lossy()),
244                        None,
245                        &self.client,
246                    )
247            })?;
248
249        let FileNameListing {
250            mut folders,
251            mut files,
252            ..
253        } = list;
254
255        let v = folders
256            .drain(..)
257            .map(|i| i.file_name)
258            .chain(files.drain(..).map(|i| i.file_name))
259            .map(PathBuf::from)
260            .collect();
261        Ok(v)
262    }
263
264    fn list_recursively(
265        &mut self,
266        path: PathBuf,
267        tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
268    ) {
269        unimplemented!();
270    }
271}