backblaze_b2_client/
client.rs1use std::{
2 collections::HashMap,
3 sync::Arc,
4 time::{Duration, SystemTime},
5};
6
7use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
8
9use crate::{
10 error::B2Error,
11 simple_client::B2SimpleClient,
12 tasks::{
13 shared::AsyncFileReader,
14 upload::{file_upload::FileUpload, FileUploadOptions},
15 },
16 util::{B2Callback, WriteLockArc},
17};
18
19#[derive(Debug, Clone)]
20pub enum B2ClientStatus {
21 Authed,
23 KeyExpired,
25}
26
27pub struct B2Client {
28 client: Arc<B2SimpleClient>,
29 uploading_files: Arc<RwLock<Vec<Option<Arc<FileUpload>>>>>,
30 reauth_handle: JoinHandle<()>,
31 status: WriteLockArc<B2ClientStatus>,
32}
33
34impl B2Client {
35 pub async fn new(key_id: String, application_key: String) -> Result<Self, B2Error> {
36 let key_id: Arc<str> = Arc::from(key_id.into_boxed_str());
37 let application_key: Arc<str> = Arc::from(application_key.into_boxed_str());
38 let status = WriteLockArc::new(B2ClientStatus::Authed);
39
40 let client = Arc::new(B2SimpleClient::new(&key_id, &application_key).await?);
41
42 let reauth_client = client.clone();
43 let status_expire = status.clone();
44
45 let reauth_handle = tokio::spawn(async move {
46 let client = reauth_client.clone();
47 let status = status_expire.clone();
48
49 loop {
50 let now = SystemTime::now();
51 let mut end_time = SystemTime::now() + Duration::from_secs(60 * 60 * 10);
53 let mut expiring = false;
54
55 if let Some(timestamp) = client.auth_data().application_key_expiration_timestamp {
56 let end = SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp);
57
58 if end < end_time {
59 expiring = true;
60 end_time = end;
61 }
62 }
63
64 let wait = match end_time.duration_since(now) {
65 Ok(dur) => dur,
66 Err(error) => error.duration(),
67 };
68
69 sleep(wait).await;
70
71 if expiring {
72 status.set(B2ClientStatus::KeyExpired).await;
73 break;
74 }
75
76 let _ = client.authorize_account(&key_id, &application_key).await;
77 }
78 });
79
80 let uploading_files = Arc::new(RwLock::new(vec![]));
81
82 Ok(Self {
83 client,
84 reauth_handle,
85 uploading_files,
86 status,
87 })
88 }
89
90 pub fn status(&self) -> B2ClientStatus {
92 (*self.status).clone()
93 }
94
95 pub fn basic_client(&self) -> Arc<B2SimpleClient> {
97 self.client.clone()
98 }
99
100 pub async fn create_upload<T>(
103 &self,
104 file: T,
105 file_name: String,
106 bucket_id: String,
107 optional_info: Option<HashMap<String, String>>,
108 file_size: u64,
109 options: Option<FileUploadOptions>,
110 ) -> Arc<FileUpload>
111 where
112 T: AsyncFileReader + 'static,
113 {
114 let file_handle = FileUpload::new(
115 file,
116 file_name,
117 bucket_id,
118 optional_info,
119 file_size,
120 options.unwrap_or_else(|| FileUploadOptions::default()),
121 self.client.clone(),
122 );
123
124 self.push_upload(file_handle.clone()).await;
125 let id = file_handle.id();
126 let uploading_files = self.uploading_files.clone();
127
128 file_handle
129 .add_finish_callback(B2Callback::from_async_fn(move |_| {
130 let uploading_files = uploading_files.clone();
131
132 async move {
133 B2Client::abort_upload_inner(uploading_files, id).await;
134 }
135 }))
136 .await;
137
138 file_handle
139 }
140
141 pub async fn get_current_tracked_uploads(&self) -> Vec<Arc<FileUpload>> {
143 let lock_guard = self.uploading_files.read().await;
144
145 lock_guard.iter().filter_map(|e| e.clone()).collect()
146 }
147
148 pub async fn abort_upload(&self, upload_id: u64) {
150 B2Client::abort_upload_inner(self.uploading_files.clone(), upload_id).await;
151 }
152
153 async fn push_upload(&self, upload: Arc<FileUpload>) {
154 let lock_guard = self.uploading_files.read().await;
155 let set_index = lock_guard.iter().position(|slot| slot.is_none());
156 drop(lock_guard);
157
158 let mut lock_guard = self.uploading_files.write().await;
159
160 match set_index {
161 Some(index) => lock_guard[index] = Some(upload),
162 None => lock_guard.push(Some(upload)),
163 };
164 }
165
166 async fn abort_upload_inner(
167 uploads: Arc<RwLock<Vec<Option<Arc<FileUpload>>>>>,
168 upload_id: u64,
169 ) {
170 let uploads_lock = uploads.read().await;
171 let upload_to_remove = uploads_lock.iter().position(|slot| match slot {
172 Some(upload) => upload.id() == upload_id,
173 None => false,
174 });
175 drop(uploads_lock);
176
177 if let Some(index) = upload_to_remove {
178 let mut uploads_lock = uploads.write().await;
179 uploads_lock[index] = None;
180 }
181 }
182}
183
184impl Drop for B2Client {
185 fn drop(&mut self) {
186 self.reauth_handle.abort();
187 }
188}