1#![allow(unused)]
2use std::borrow::BorrowMut;
3use std::cell::RefCell;
4use std::io::Read;
5use std::path::PathBuf;
6use std::sync::mpsc;
7use std::{fs, io};
8
9use backblaze_b2::raw::authorize::{B2Authorization, B2Credentials};
10use backblaze_b2::raw::files::{FileNameListing, MoreFileInfo};
11use backblaze_b2::raw::upload::UploadAuthorization;
12use backblaze_b2::B2Error;
13use hyper::net::HttpsConnector;
14use hyper::Client;
15use hyper_native_tls::NativeTlsClient;
16use sgdata::SGData;
17
18use super::Metadata;
19use super::{Backend, BackendThread};
20use crate::aio;
21use crate::config;
22
23pub struct Lock {
26 path: PathBuf,
27}
28
29impl Lock {
30 fn new(path: PathBuf) -> Self {
31 Lock { path }
32 }
33}
34
35impl aio::Lock for Lock {}
36
37#[derive(Debug)]
38pub struct B2 {
39 cred: B2Credentials,
40 bucket: String,
41}
42
43pub struct Auth {
44 auth: B2Authorization,
45 upload_auth: UploadAuthorization,
46}
47
48pub struct B2Thread {
49 cred: B2Credentials,
50 auth: RefCell<Option<Auth>>,
51 client: Client,
52 bucket: String,
53}
54
55fn retry<F, R>(instance: Option<&B2Thread>, f: F) -> io::Result<R>
57where
58 F: Fn() -> Result<R, B2Error>,
59{
60 let mut backoff = 1;
61 let mut err_counter = 0;
62 loop {
63 let res = f();
64
65 match res {
66 Ok(ok) => return Ok(ok),
67 Err(e) => {
68 err_counter += 1;
69
70 if err_counter > 5 {
71 return Err(e).map_err(|e| {
72 io::Error::new(
73 io::ErrorKind::ConnectionAborted,
74 format!(
75 "Gave up b2 operation after {} retries: {}",
76 err_counter, e
77 ),
78 )
79 });
80 }
81
82 if e.should_back_off() {
83 std::thread::sleep(std::time::Duration::from_secs(backoff));
84 backoff *= 2;
85 } else {
86 backoff = 1;
87 }
88
89 if e.should_obtain_new_authentication() {
90 if let Some(instance) = instance.as_ref() {
91 let _ = instance.reauth();
92 }
93 }
94 }
95 }
96 }
97}
98
99impl B2Thread {
100 fn reauth(&self) -> io::Result<()> {
101 let auth = retry(None, || {
102 let auth = self.cred.authorize(&self.client)?;
103 let upload_auth =
104 auth.get_upload_url(&self.bucket, &self.client)?;
105 Ok((auth, upload_auth))
106 })?;
107 *self.auth.borrow_mut() = Some(Auth {
108 auth: auth.0,
109 upload_auth: auth.1,
110 });
111
112 Ok(())
113 }
114
115 fn new_from_cred(cred: &B2Credentials, bucket: String) -> io::Result<Self> {
116 let ssl = NativeTlsClient::new().map_err(|e| {
117 io::Error::new(
118 io::ErrorKind::ConnectionAborted,
119 format!("Couldn't create `NativeTlsClient`: {}", e),
120 )
121 })?;
122 let connector = HttpsConnector::new(ssl);
123 let client = Client::with_connector(connector);
124
125 let mut i = B2Thread {
126 cred: cred.clone(),
127 client,
128 auth: RefCell::new(None),
129 bucket,
130 };
131
132 i.reauth()?;
133
134 Ok(i)
135 }
136}
137
138impl Backend for B2 {
139 fn lock_exclusive(&self) -> io::Result<Box<dyn aio::Lock>> {
140 Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
141 }
142
143 fn lock_shared(&self) -> io::Result<Box<dyn aio::Lock>> {
144 Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
145 }
146
147 fn new_thread(&self) -> io::Result<Box<dyn BackendThread>> {
148 Ok(Box::new(B2Thread::new_from_cred(
149 &self.cred,
150 self.bucket.clone(),
151 )?))
152 }
153}
154
155impl B2 {
156 pub(crate) fn new(id: &str, bucket: &str, key: &str) -> Self {
157 let cred = B2Credentials {
158 id: id.into(),
159 key: key.into(),
160 };
161
162 B2 {
163 cred,
164 bucket: bucket.into(),
165 }
166 }
167}
168
169impl BackendThread for B2Thread {
170 fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
171 fs::remove_dir_all(&path)
172 }
173
174 fn rename(
175 &mut self,
176 src_path: PathBuf,
177 dst_path: PathBuf,
178 ) -> io::Result<()> {
179 match fs::rename(&src_path, &dst_path) {
180 Ok(_) => Ok(()),
181 Err(_e) => {
182 fs::create_dir_all(dst_path.parent().unwrap())?;
183 fs::rename(&src_path, &dst_path)
184 }
185 }
186 }
187
188 fn write(
189 &mut self,
190 path: PathBuf,
191 sg: SGData,
192 idempotent: bool,
193 ) -> io::Result<()> {
194 Ok(())
195 }
196
197 fn read(&mut self, path: PathBuf) -> io::Result<SGData> {
198 Ok(SGData::empty())
199 }
200
201 fn remove(&mut self, path: PathBuf) -> io::Result<()> {
202 Ok(())
203 }
204
205 fn read_metadata(&mut self, path: PathBuf) -> io::Result<Metadata> {
206 use chrono::TimeZone;
207 let file_info: MoreFileInfo<serde_json::value::Value> =
208 retry(Some(self), || {
209 self.auth
210 .borrow_mut()
211 .as_ref()
212 .unwrap()
213 .auth
214 .get_file_info(&path.to_string_lossy(), &self.client)
215 })?;
216
217 let MoreFileInfo {
218 content_length,
219 action,
220 upload_timestamp,
221 ..
222 } = file_info;
223 let created = chrono::Utc.timestamp(upload_timestamp as i64, 0);
224
225 Ok(Metadata {
226 len: content_length,
227 is_file: action == backblaze_b2::raw::files::FileType::File,
228 created,
229 })
230 }
231
232 fn list(&mut self, path: PathBuf) -> io::Result<Vec<PathBuf>> {
233 let mut list: FileNameListing<serde_json::value::Value> =
234 retry(Some(self), || {
235 self.auth
236 .borrow_mut()
237 .as_ref()
238 .unwrap()
239 .auth
240 .list_all_file_names(
241 &self.bucket,
242 1000,
243 Some(&path.to_string_lossy()),
244 None,
245 &self.client,
246 )
247 })?;
248
249 let FileNameListing {
250 mut folders,
251 mut files,
252 ..
253 } = list;
254
255 let v = folders
256 .drain(..)
257 .map(|i| i.file_name)
258 .chain(files.drain(..).map(|i| i.file_name))
259 .map(PathBuf::from)
260 .collect();
261 Ok(v)
262 }
263
264 fn list_recursively(
265 &mut self,
266 path: PathBuf,
267 tx: mpsc::Sender<io::Result<Vec<PathBuf>>>,
268 ) {
269 unimplemented!();
270 }
271}