backblaze_b2_client/
client.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    time::{Duration, SystemTime},
5};
6
7use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
8
9use crate::{
10    error::B2Error,
11    simple_client::B2SimpleClient,
12    tasks::{
13        shared::AsyncFileReader,
14        upload::{file_upload::FileUpload, FileUploadOptions},
15    },
16    util::{B2Callback, WriteLockArc},
17};
18
19#[derive(Debug, Clone)]
20pub enum B2ClientStatus {
21    /// Default state, and should be the only state if nothing else went wrong.
22    Authed,
23    /// The provided key to the client has expired, cannot re-auth, please re-create the client.
24    KeyExpired,
25}
26
27pub struct B2Client {
28    client: Arc<B2SimpleClient>,
29    uploading_files: Arc<RwLock<Vec<Option<Arc<FileUpload>>>>>,
30    reauth_handle: JoinHandle<()>,
31    status: WriteLockArc<B2ClientStatus>,
32}
33
34impl B2Client {
35    pub async fn new(key_id: String, application_key: String) -> Result<Self, B2Error> {
36        let key_id: Arc<str> = Arc::from(key_id.into_boxed_str());
37        let application_key: Arc<str> = Arc::from(application_key.into_boxed_str());
38        let status = WriteLockArc::new(B2ClientStatus::Authed);
39
40        let client = Arc::new(B2SimpleClient::new(&key_id, &application_key).await?);
41
42        let reauth_client = client.clone();
43        let status_expire = status.clone();
44
45        let reauth_handle = tokio::spawn(async move {
46            let client = reauth_client.clone();
47            let status = status_expire.clone();
48
49            loop {
50                let now = SystemTime::now();
51                // 10 hours
52                let mut end_time = SystemTime::now() + Duration::from_secs(60 * 60 * 10);
53                let mut expiring = false;
54
55                if let Some(timestamp) = client.auth_data().application_key_expiration_timestamp {
56                    let end = SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp);
57
58                    if end < end_time {
59                        expiring = true;
60                        end_time = end;
61                    }
62                }
63
64                let wait = match end_time.duration_since(now) {
65                    Ok(dur) => dur,
66                    Err(error) => error.duration(),
67                };
68
69                sleep(wait).await;
70
71                if expiring {
72                    status.set(B2ClientStatus::KeyExpired).await;
73                    break;
74                }
75
76                let _ = client.authorize_account(&key_id, &application_key).await;
77            }
78        });
79
80        let uploading_files = Arc::new(RwLock::new(vec![]));
81
82        Ok(Self {
83            client,
84            reauth_handle,
85            uploading_files,
86            status,
87        })
88    }
89
90    /// Gets current client status
91    pub fn status(&self) -> B2ClientStatus {
92        (*self.status).clone()
93    }
94
95    /// Returns reference to inner basic client
96    pub fn basic_client(&self) -> Arc<B2SimpleClient> {
97        self.client.clone()
98    }
99
100    /// Creates files upload tracker and returns reference to it. <br><br>
101    /// Tracker doesn't start upload automatically, it needs to be started manually.
102    pub async fn create_upload<T>(
103        &self,
104        file: T,
105        file_name: String,
106        bucket_id: String,
107        optional_info: Option<HashMap<String, String>>,
108        file_size: u64,
109        options: Option<FileUploadOptions>,
110    ) -> Arc<FileUpload>
111    where
112        T: AsyncFileReader + 'static,
113    {
114        let file_handle = FileUpload::new(
115            file,
116            file_name,
117            bucket_id,
118            optional_info,
119            file_size,
120            options.unwrap_or_else(|| FileUploadOptions::default()),
121            self.client.clone(),
122        );
123
124        self.push_upload(file_handle.clone()).await;
125        let id = file_handle.id();
126        let uploading_files = self.uploading_files.clone();
127
128        file_handle
129            .add_finish_callback(B2Callback::from_async_fn(move |_| {
130                let uploading_files = uploading_files.clone();
131
132                async move {
133                    B2Client::abort_upload_inner(uploading_files, id).await;
134                }
135            }))
136            .await;
137
138        file_handle
139    }
140
141    /// Gets the list of current tracked upload tasks
142    pub async fn get_current_tracked_uploads(&self) -> Vec<Arc<FileUpload>> {
143        let lock_guard = self.uploading_files.read().await;
144
145        lock_guard.iter().filter_map(|e| e.clone()).collect()
146    }
147
148    /// Aborts a specific upload using its ID
149    pub async fn abort_upload(&self, upload_id: u64) {
150        B2Client::abort_upload_inner(self.uploading_files.clone(), upload_id).await;
151    }
152
153    async fn push_upload(&self, upload: Arc<FileUpload>) {
154        let lock_guard = self.uploading_files.read().await;
155        let set_index = lock_guard.iter().position(|slot| slot.is_none());
156        drop(lock_guard);
157
158        let mut lock_guard = self.uploading_files.write().await;
159
160        match set_index {
161            Some(index) => lock_guard[index] = Some(upload),
162            None => lock_guard.push(Some(upload)),
163        };
164    }
165
166    async fn abort_upload_inner(
167        uploads: Arc<RwLock<Vec<Option<Arc<FileUpload>>>>>,
168        upload_id: u64,
169    ) {
170        let uploads_lock = uploads.read().await;
171        let upload_to_remove = uploads_lock.iter().position(|slot| match slot {
172            Some(upload) => upload.id() == upload_id,
173            None => false,
174        });
175        drop(uploads_lock);
176
177        if let Some(index) = upload_to_remove {
178            let mut uploads_lock = uploads.write().await;
179            uploads_lock[index] = None;
180        }
181    }
182}
183
184impl Drop for B2Client {
185    fn drop(&mut self) {
186        self.reauth_handle.abort();
187    }
188}