1use crate::{
4 error::{Error, Result},
5 types::{SupabaseConfig, Timestamp},
6};
7use bytes::Bytes;
8
9#[cfg(target_arch = "wasm32")]
10use reqwest::Client as HttpClient;
11#[cfg(not(target_arch = "wasm32"))]
12use reqwest::{multipart, Client as HttpClient};
13use serde::{Deserialize, Serialize};
14use std::{collections::HashMap, sync::Arc};
15
16#[cfg(target_arch = "wasm32")]
17use tracing::{debug, info};
18#[cfg(not(target_arch = "wasm32"))]
19use tracing::{debug, info, warn};
20use url::Url;
21
22#[cfg(target_arch = "wasm32")]
24use std::time::Duration;
25#[cfg(not(target_arch = "wasm32"))]
26use tokio::time::{sleep, Duration};
27
28#[cfg(not(target_arch = "wasm32"))]
30async fn async_sleep(duration: Duration) {
31 sleep(duration).await;
32}
33
34#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
35async fn async_sleep(duration: Duration) {
36 use gloo_timers::future::sleep as gloo_sleep;
37 gloo_sleep(duration).await;
38}
39
40#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
41#[allow(dead_code)]
42async fn async_sleep(_duration: Duration) {
43 }
45
46#[derive(Debug, Clone)]
48pub struct Storage {
49 http_client: Arc<HttpClient>,
50 config: Arc<SupabaseConfig>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Bucket {
56 pub id: String,
57 pub name: String,
58 pub owner: Option<String>,
59 pub public: bool,
60 pub file_size_limit: Option<u64>,
61 pub allowed_mime_types: Option<Vec<String>>,
62 pub created_at: Timestamp,
63 pub updated_at: Timestamp,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct FileObject {
69 pub name: String,
70 pub id: Option<String>,
71 pub updated_at: Option<Timestamp>,
72 pub created_at: Option<Timestamp>,
73 pub last_accessed_at: Option<Timestamp>,
74 pub metadata: Option<HashMap<String, serde_json::Value>>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct UploadResponse {
80 #[serde(rename = "Key")]
81 pub key: String,
82 #[serde(rename = "Id")]
83 pub id: Option<String>,
84}
85
86#[derive(Debug, Clone, Default)]
88pub struct FileOptions {
89 pub cache_control: Option<String>,
90 pub content_type: Option<String>,
91 pub upsert: bool,
92}
93
94#[derive(Debug, Clone)]
96pub struct TransformOptions {
97 pub width: Option<u32>,
98 pub height: Option<u32>,
99 pub resize: Option<ResizeMode>,
100 pub format: Option<ImageFormat>,
101 pub quality: Option<u8>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(rename_all = "lowercase")]
107pub enum ResizeMode {
108 Cover,
109 Contain,
110 Fill,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(rename_all = "lowercase")]
116pub enum ImageFormat {
117 Webp,
118 Jpeg,
119 Png,
120 Avif,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct UploadSession {
126 pub upload_id: String,
127 pub part_size: u64,
128 pub total_size: u64,
129 pub uploaded_parts: Vec<UploadedPart>,
130 pub bucket_id: String,
131 pub object_path: String,
132 pub created_at: Timestamp,
133 pub expires_at: Timestamp,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct UploadedPart {
139 pub part_number: u32,
140 pub etag: String,
141 pub size: u64,
142}
143
144#[derive(Debug, Clone)]
146pub struct ResumableUploadConfig {
147 pub chunk_size: u64,
149 pub max_retries: u32,
151 pub retry_delay: u64,
153 pub verify_checksums: bool,
155}
156
157impl Default for ResumableUploadConfig {
158 fn default() -> Self {
159 Self {
160 chunk_size: 5 * 1024 * 1024, max_retries: 3,
162 retry_delay: 1000,
163 verify_checksums: true,
164 }
165 }
166}
167
168pub type UploadProgressCallback = Arc<dyn Fn(u64, u64) + Send + Sync>;
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct FileMetadata {
174 pub tags: Option<HashMap<String, String>>,
175 pub custom_metadata: Option<HashMap<String, serde_json::Value>>,
176 pub description: Option<String>,
177 pub category: Option<String>,
178 pub searchable_content: Option<String>,
179}
180
181#[derive(Debug, Clone, Default, Serialize)]
183pub struct SearchOptions {
184 pub tags: Option<HashMap<String, String>>,
185 pub category: Option<String>,
186 pub content_search: Option<String>,
187 pub limit: Option<u32>,
188 pub offset: Option<u32>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193pub enum StorageEvent {
194 #[serde(rename = "file_uploaded")]
195 FileUploaded,
196 #[serde(rename = "file_deleted")]
197 FileDeleted,
198 #[serde(rename = "file_updated")]
199 FileUpdated,
200 #[serde(rename = "bucket_created")]
201 BucketCreated,
202 #[serde(rename = "bucket_deleted")]
203 BucketDeleted,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct StorageEventMessage {
209 pub event: StorageEvent,
210 pub bucket_id: String,
211 pub object_path: Option<String>,
212 pub object_metadata: Option<FileObject>,
213 pub timestamp: Timestamp,
214 pub user_id: Option<String>,
215}
216
217pub type StorageEventCallback = Arc<dyn Fn(StorageEventMessage) + Send + Sync>;
219
220impl Storage {
221 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
223 debug!("Initializing Storage module");
224
225 Ok(Self {
226 http_client,
227 config,
228 })
229 }
230
231 fn get_admin_key(&self) -> &str {
233 self.config
234 .service_role_key
235 .as_ref()
236 .unwrap_or(&self.config.key)
237 }
238
239 pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
241 debug!("Listing all storage buckets");
242
243 let url = format!("{}/storage/v1/bucket", self.config.url);
244 let response = self.http_client.get(&url).send().await?;
245
246 if !response.status().is_success() {
247 let status = response.status();
248 let error_msg = match response.text().await {
249 Ok(text) => text,
250 Err(_) => format!("List buckets failed with status: {}", status),
251 };
252 return Err(Error::storage(error_msg));
253 }
254
255 let buckets: Vec<Bucket> = response.json().await?;
256 info!("Listed {} buckets successfully", buckets.len());
257
258 Ok(buckets)
259 }
260
261 pub async fn get_bucket(&self, bucket_id: &str) -> Result<Bucket> {
263 debug!("Getting bucket info for: {}", bucket_id);
264
265 let url = format!("{}/storage/v1/bucket/{}", self.config.url, bucket_id);
266 let response = self.http_client.get(&url).send().await?;
267
268 if !response.status().is_success() {
269 let status = response.status();
270 let error_msg = match response.text().await {
271 Ok(text) => text,
272 Err(_) => format!("Get bucket failed with status: {}", status),
273 };
274 return Err(Error::storage(error_msg));
275 }
276
277 let bucket: Bucket = response.json().await?;
278 info!("Retrieved bucket info for: {}", bucket_id);
279
280 Ok(bucket)
281 }
282
283 pub async fn create_bucket(&self, id: &str, name: &str, public: bool) -> Result<Bucket> {
285 debug!("Creating bucket: {} ({})", name, id);
286
287 let payload = serde_json::json!({
288 "id": id,
289 "name": name,
290 "public": public
291 });
292
293 let url = format!("{}/storage/v1/bucket", self.config.url);
294 let response = self
295 .http_client
296 .post(&url)
297 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
298 .json(&payload)
299 .send()
300 .await?;
301
302 if !response.status().is_success() {
303 let status = response.status();
304 let error_msg = match response.text().await {
305 Ok(text) => text,
306 Err(_) => format!("Create bucket failed with status: {}", status),
307 };
308 return Err(Error::storage(error_msg));
309 }
310
311 let bucket: Bucket = response.json().await?;
312 info!("Created bucket successfully: {}", id);
313
314 Ok(bucket)
315 }
316
317 pub async fn update_bucket(&self, id: &str, public: Option<bool>) -> Result<()> {
319 debug!("Updating bucket: {}", id);
320
321 let mut payload = serde_json::Map::new();
322 if let Some(public) = public {
323 payload.insert("public".to_string(), serde_json::Value::Bool(public));
324 }
325
326 let url = format!("{}/storage/v1/bucket/{}", self.config.url, id);
327 let response = self
328 .http_client
329 .put(&url)
330 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
331 .json(&payload)
332 .send()
333 .await?;
334
335 if !response.status().is_success() {
336 let status = response.status();
337 let error_msg = match response.text().await {
338 Ok(text) => text,
339 Err(_) => format!("Update bucket failed with status: {}", status),
340 };
341 return Err(Error::storage(error_msg));
342 }
343
344 info!("Updated bucket successfully: {}", id);
345 Ok(())
346 }
347
348 pub async fn delete_bucket(&self, id: &str) -> Result<()> {
350 debug!("Deleting bucket: {}", id);
351
352 let url = format!("{}/storage/v1/bucket/{}", self.config.url, id);
353 let response = self
354 .http_client
355 .delete(&url)
356 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
357 .send()
358 .await?;
359
360 if !response.status().is_success() {
361 let status = response.status();
362 let error_msg = match response.text().await {
363 Ok(text) => text,
364 Err(_) => format!("Delete bucket failed with status: {}", status),
365 };
366 return Err(Error::storage(error_msg));
367 }
368
369 info!("Deleted bucket successfully: {}", id);
370 Ok(())
371 }
372
373 pub async fn list(&self, bucket_id: &str, path: Option<&str>) -> Result<Vec<FileObject>> {
375 self.list_with_auth(bucket_id, path, None).await
376 }
377
378 pub async fn list_with_auth(
404 &self,
405 bucket_id: &str,
406 path: Option<&str>,
407 user_token: Option<&str>,
408 ) -> Result<Vec<FileObject>> {
409 debug!("Listing files in bucket: {}", bucket_id);
410
411 let url = format!("{}/storage/v1/object/list/{}", self.config.url, bucket_id);
412
413 let payload = serde_json::json!({
414 "prefix": path.unwrap_or("")
415 });
416
417 let mut request = self.http_client.post(&url).json(&payload);
418
419 if let Some(token) = user_token {
421 request = request.header("Authorization", format!("Bearer {}", token));
422 }
423
424 let response = request.send().await?;
425
426 if !response.status().is_success() {
427 let status = response.status();
428 let error_msg = match response.text().await {
429 Ok(text) => text,
430 Err(_) => format!("List files failed with status: {}", status),
431 };
432 return Err(Error::storage(error_msg));
433 }
434
435 let files: Vec<FileObject> = response.json().await?;
436 info!("Listed {} files in bucket: {}", files.len(), bucket_id);
437
438 Ok(files)
439 }
440
441 #[cfg(not(target_arch = "wasm32"))]
443 pub async fn upload(
444 &self,
445 bucket_id: &str,
446 path: &str,
447 file_body: Bytes,
448 options: Option<FileOptions>,
449 ) -> Result<UploadResponse> {
450 self.upload_with_auth(bucket_id, path, file_body, options, None)
451 .await
452 }
453
454 pub async fn upload_with_auth(
486 &self,
487 bucket_id: &str,
488 path: &str,
489 file_body: Bytes,
490 options: Option<FileOptions>,
491 user_token: Option<&str>,
492 ) -> Result<UploadResponse> {
493 debug!("Uploading file to bucket: {} at path: {}", bucket_id, path);
494
495 let options = options.unwrap_or_default();
496
497 let url = format!(
498 "{}/storage/v1/object/{}/{}",
499 self.config.url, bucket_id, path
500 );
501
502 let mut form = multipart::Form::new().part(
503 "file",
504 multipart::Part::bytes(file_body.to_vec()).file_name(path.to_string()),
505 );
506
507 if let Some(content_type) = options.content_type {
508 form = form.part("contentType", multipart::Part::text(content_type));
509 }
510
511 if let Some(cache_control) = options.cache_control {
512 form = form.part("cacheControl", multipart::Part::text(cache_control));
513 }
514
515 let mut request = self.http_client.post(&url).multipart(form);
516
517 if let Some(token) = user_token {
519 request = request.header("Authorization", format!("Bearer {}", token));
520 }
521
522 if options.upsert {
523 request = request.header("x-upsert", "true");
524 }
525
526 let response = request.send().await?;
527
528 if !response.status().is_success() {
529 let status = response.status();
530 let error_msg = match response.text().await {
531 Ok(text) => text,
532 Err(_) => format!("Upload failed with status: {}", status),
533 };
534 return Err(Error::storage(error_msg));
535 }
536
537 let upload_response: UploadResponse = response.json().await?;
538 info!("Uploaded file successfully: {}", path);
539 Ok(upload_response)
540 }
541
542 #[cfg(target_arch = "wasm32")]
546 pub async fn upload(
547 &self,
548 bucket_id: &str,
549 path: &str,
550 file_body: Bytes,
551 options: Option<FileOptions>,
552 ) -> Result<UploadResponse> {
553 self.upload_with_auth(bucket_id, path, file_body, options, None)
554 .await
555 }
556
557 #[cfg(target_arch = "wasm32")]
559 pub async fn upload_with_auth(
560 &self,
561 bucket_id: &str,
562 path: &str,
563 file_body: Bytes,
564 options: Option<FileOptions>,
565 user_token: Option<&str>,
566 ) -> Result<UploadResponse> {
567 debug!(
568 "Uploading file to bucket: {} at path: {} (WASM)",
569 bucket_id, path
570 );
571
572 let options = options.unwrap_or_default();
573
574 let url = format!(
575 "{}/storage/v1/object/{}/{}",
576 self.config.url, bucket_id, path
577 );
578
579 let mut request = self.http_client.post(&url).body(file_body);
580
581 if let Some(token) = user_token {
583 request = request.header("Authorization", format!("Bearer {}", token));
584 }
585
586 if let Some(content_type) = options.content_type {
587 request = request.header("Content-Type", content_type);
588 }
589
590 if let Some(cache_control) = options.cache_control {
591 request = request.header("Cache-Control", cache_control);
592 }
593
594 if options.upsert {
595 request = request.header("x-upsert", "true");
596 }
597
598 let response = request.send().await?;
599
600 if !response.status().is_success() {
601 let status = response.status();
602 let error_msg = match response.text().await {
603 Ok(text) => text,
604 Err(_) => format!("Upload failed with status: {}", status),
605 };
606 return Err(Error::storage(error_msg));
607 }
608
609 let upload_response: UploadResponse = response.json().await?;
610 info!("Uploaded file successfully: {}", path);
611
612 Ok(upload_response)
613 }
614
615 #[cfg(all(not(target_arch = "wasm32"), feature = "native"))]
617 pub async fn upload_file<P: AsRef<std::path::Path>>(
618 &self,
619 bucket_id: &str,
620 path: &str,
621 file_path: P,
622 options: Option<FileOptions>,
623 ) -> Result<UploadResponse> {
624 debug!("Uploading file from path: {:?}", file_path.as_ref());
625
626 let file_bytes = tokio::fs::read(file_path)
627 .await
628 .map_err(|e| Error::storage(format!("Failed to read file: {}", e)))?;
629
630 self.upload(bucket_id, path, Bytes::from(file_bytes), options)
631 .await
632 }
633
634 pub async fn download(&self, bucket_id: &str, path: &str) -> Result<Bytes> {
636 self.download_with_auth(bucket_id, path, None).await
637 }
638
639 pub async fn download_with_auth(
665 &self,
666 bucket_id: &str,
667 path: &str,
668 user_token: Option<&str>,
669 ) -> Result<Bytes> {
670 debug!(
671 "Downloading file from bucket: {} at path: {}",
672 bucket_id, path
673 );
674
675 let url = format!(
676 "{}/storage/v1/object/{}/{}",
677 self.config.url, bucket_id, path
678 );
679
680 let mut request = self.http_client.get(&url);
681
682 if let Some(token) = user_token {
684 request = request.header("Authorization", format!("Bearer {}", token));
685 }
686
687 let response = request.send().await?;
688
689 if !response.status().is_success() {
690 let error_msg = format!("Download failed with status: {}", response.status());
691 return Err(Error::storage(error_msg));
692 }
693
694 let bytes = response.bytes().await?;
695 info!("Downloaded file successfully: {}", path);
696
697 Ok(bytes)
698 }
699
700 pub async fn remove(&self, bucket_id: &str, paths: &[&str]) -> Result<()> {
702 self.remove_with_auth(bucket_id, paths, None).await
703 }
704
705 pub async fn remove_with_auth(
731 &self,
732 bucket_id: &str,
733 paths: &[&str],
734 user_token: Option<&str>,
735 ) -> Result<()> {
736 debug!("Deleting files from bucket: {}", bucket_id);
737
738 let url = format!("{}/storage/v1/object/{}", self.config.url, bucket_id);
739
740 let payload = serde_json::json!({
741 "prefixes": paths
742 });
743
744 let mut request = self.http_client.delete(&url).json(&payload);
745
746 if let Some(token) = user_token {
748 request = request.header("Authorization", format!("Bearer {}", token));
749 }
750
751 let response = request.send().await?;
752
753 if !response.status().is_success() {
754 let error_msg = format!("Delete files failed with status: {}", response.status());
755 return Err(Error::storage(error_msg));
756 }
757
758 info!("Deleted {} files successfully", paths.len());
759 Ok(())
760 }
761
762 pub async fn r#move(&self, bucket_id: &str, from_path: &str, to_path: &str) -> Result<()> {
764 debug!("Moving file from {} to {}", from_path, to_path);
765
766 let url = format!("{}/storage/v1/object/move", self.config.url);
767
768 let payload = serde_json::json!({
769 "bucketId": bucket_id,
770 "sourceKey": from_path,
771 "destinationKey": to_path
772 });
773
774 let response = self.http_client.post(&url).json(&payload).send().await?;
775
776 if !response.status().is_success() {
777 let status = response.status();
778 let error_msg = match response.text().await {
779 Ok(text) => text,
780 Err(_) => format!("Move failed with status: {}", status),
781 };
782 return Err(Error::storage(error_msg));
783 }
784
785 info!("Moved file successfully from {} to {}", from_path, to_path);
786 Ok(())
787 }
788
789 pub async fn copy(&self, bucket_id: &str, from_path: &str, to_path: &str) -> Result<()> {
791 debug!("Copying file from {} to {}", from_path, to_path);
792
793 let url = format!("{}/storage/v1/object/copy", self.config.url);
794
795 let payload = serde_json::json!({
796 "bucketId": bucket_id,
797 "sourceKey": from_path,
798 "destinationKey": to_path
799 });
800
801 let response = self.http_client.post(&url).json(&payload).send().await?;
802
803 if !response.status().is_success() {
804 let status = response.status();
805 let error_msg = match response.text().await {
806 Ok(text) => text,
807 Err(_) => format!("Copy failed with status: {}", status),
808 };
809 return Err(Error::storage(error_msg));
810 }
811
812 info!("Copied file successfully from {} to {}", from_path, to_path);
813 Ok(())
814 }
815
816 pub fn get_public_url(&self, bucket_id: &str, path: &str) -> String {
818 format!(
819 "{}/storage/v1/object/public/{}/{}",
820 self.config.url, bucket_id, path
821 )
822 }
823
824 pub async fn create_signed_url(
826 &self,
827 bucket_id: &str,
828 path: &str,
829 expires_in: u32,
830 transform: Option<TransformOptions>,
831 ) -> Result<String> {
832 debug!(
833 "Creating signed URL for bucket: {} path: {} expires_in: {}",
834 bucket_id, path, expires_in
835 );
836
837 let url = format!(
838 "{}/storage/v1/object/sign/{}/{}",
839 self.config.url, bucket_id, path
840 );
841
842 let mut payload = serde_json::json!({
843 "expiresIn": expires_in
844 });
845
846 if let Some(transform_opts) = transform {
847 let mut transform_params = serde_json::Map::new();
848
849 if let Some(width) = transform_opts.width {
850 transform_params.insert("width".to_string(), serde_json::Value::from(width));
851 }
852 if let Some(height) = transform_opts.height {
853 transform_params.insert("height".to_string(), serde_json::Value::from(height));
854 }
855 if let Some(resize) = transform_opts.resize {
856 transform_params.insert("resize".to_string(), serde_json::to_value(resize)?);
857 }
858 if let Some(format) = transform_opts.format {
859 transform_params.insert("format".to_string(), serde_json::to_value(format)?);
860 }
861 if let Some(quality) = transform_opts.quality {
862 transform_params.insert("quality".to_string(), serde_json::Value::from(quality));
863 }
864
865 payload["transform"] = serde_json::Value::Object(transform_params);
866 }
867
868 let response = self.http_client.post(&url).json(&payload).send().await?;
869
870 if !response.status().is_success() {
871 let error_msg = format!(
872 "Create signed URL failed with status: {}",
873 response.status()
874 );
875 return Err(Error::storage(error_msg));
876 }
877
878 let response_data: serde_json::Value = response.json().await?;
879 let signed_url = response_data["signedURL"]
880 .as_str()
881 .ok_or_else(|| Error::storage("Invalid signed URL response"))?;
882
883 info!("Created signed URL successfully");
884 Ok(signed_url.to_string())
885 }
886
887 pub fn get_public_url_transformed(
889 &self,
890 bucket_id: &str,
891 path: &str,
892 options: TransformOptions,
893 ) -> Result<String> {
894 let mut url = Url::parse(&self.get_public_url(bucket_id, path))?;
895
896 if let Some(width) = options.width {
897 url.query_pairs_mut()
898 .append_pair("width", &width.to_string());
899 }
900
901 if let Some(height) = options.height {
902 url.query_pairs_mut()
903 .append_pair("height", &height.to_string());
904 }
905
906 if let Some(resize) = options.resize {
907 let resize_str = match resize {
908 ResizeMode::Cover => "cover",
909 ResizeMode::Contain => "contain",
910 ResizeMode::Fill => "fill",
911 };
912 url.query_pairs_mut().append_pair("resize", resize_str);
913 }
914
915 if let Some(format) = options.format {
916 let format_str = match format {
917 ImageFormat::Webp => "webp",
918 ImageFormat::Jpeg => "jpeg",
919 ImageFormat::Png => "png",
920 ImageFormat::Avif => "avif",
921 };
922 url.query_pairs_mut().append_pair("format", format_str);
923 }
924
925 if let Some(quality) = options.quality {
926 url.query_pairs_mut()
927 .append_pair("quality", &quality.to_string());
928 }
929
930 Ok(url.to_string())
931 }
932
933 #[cfg(not(target_arch = "wasm32"))]
959 pub async fn start_resumable_upload(
960 &self,
961 bucket_id: &str,
962 path: &str,
963 total_size: u64,
964 config: Option<ResumableUploadConfig>,
965 options: Option<FileOptions>,
966 ) -> Result<UploadSession> {
967 let config = config.unwrap_or_default();
968 let options = options.unwrap_or_default();
969
970 debug!(
971 "Starting resumable upload for bucket: {} path: {} size: {}",
972 bucket_id, path, total_size
973 );
974
975 let url = format!(
976 "{}/storage/v1/object/{}/{}/resumable",
977 self.config.url, bucket_id, path
978 );
979
980 let payload = serde_json::json!({
981 "totalSize": total_size,
982 "chunkSize": config.chunk_size,
983 "contentType": options.content_type,
984 "cacheControl": options.cache_control,
985 "upsert": options.upsert
986 });
987
988 let response = self.http_client.post(&url).json(&payload).send().await?;
989
990 if !response.status().is_success() {
991 let error_msg = format!(
992 "Start resumable upload failed with status: {}",
993 response.status()
994 );
995 return Err(Error::storage(error_msg));
996 }
997
998 let session: UploadSession = response.json().await?;
999 info!("Started resumable upload session: {}", session.upload_id);
1000
1001 Ok(session)
1002 }
1003
1004 #[cfg(not(target_arch = "wasm32"))]
1024 pub async fn upload_chunk(
1025 &self,
1026 session: &UploadSession,
1027 part_number: u32,
1028 chunk_data: Bytes,
1029 ) -> Result<UploadedPart> {
1030 debug!(
1031 "Uploading chunk {} for session: {} size: {}",
1032 part_number,
1033 session.upload_id,
1034 chunk_data.len()
1035 );
1036
1037 let url = format!(
1038 "{}/storage/v1/object/{}/{}/resumable/{}",
1039 self.config.url, session.bucket_id, session.object_path, session.upload_id
1040 );
1041
1042 let chunk_size = chunk_data.len() as u64;
1043
1044 let response = self
1045 .http_client
1046 .put(&url)
1047 .header("Content-Type", "application/octet-stream")
1048 .header("X-Part-Number", part_number.to_string())
1049 .body(chunk_data)
1050 .send()
1051 .await?;
1052
1053 if !response.status().is_success() {
1054 let error_msg = format!("Upload chunk failed with status: {}", response.status());
1055 return Err(Error::storage(error_msg));
1056 }
1057
1058 let etag = response
1059 .headers()
1060 .get("etag")
1061 .and_then(|h| h.to_str().ok())
1062 .unwrap_or("")
1063 .to_string();
1064
1065 let part = UploadedPart {
1066 part_number,
1067 etag,
1068 size: chunk_size,
1069 };
1070
1071 info!("Uploaded chunk {} successfully", part_number);
1072 Ok(part)
1073 }
1074
1075 #[cfg(not(target_arch = "wasm32"))]
1088 pub async fn complete_resumable_upload(
1089 &self,
1090 session: &UploadSession,
1091 ) -> Result<UploadResponse> {
1092 debug!(
1093 "Completing resumable upload for session: {}",
1094 session.upload_id
1095 );
1096
1097 let url = format!(
1098 "{}/storage/v1/object/{}/{}/resumable/{}/complete",
1099 self.config.url, session.bucket_id, session.object_path, session.upload_id
1100 );
1101
1102 let payload = serde_json::json!({
1103 "parts": session.uploaded_parts
1104 });
1105
1106 let response = self.http_client.post(&url).json(&payload).send().await?;
1107
1108 if !response.status().is_success() {
1109 let error_msg = format!(
1110 "Complete resumable upload failed with status: {}",
1111 response.status()
1112 );
1113 return Err(Error::storage(error_msg));
1114 }
1115
1116 let upload_response: UploadResponse = response.json().await?;
1117 info!("Completed resumable upload: {}", upload_response.key);
1118
1119 Ok(upload_response)
1120 }
1121
1122 #[cfg(all(not(target_arch = "wasm32"), feature = "native"))]
1156 pub async fn upload_large_file<P: AsRef<std::path::Path>>(
1157 &self,
1158 bucket_id: &str,
1159 path: &str,
1160 file_path: P,
1161 config: Option<ResumableUploadConfig>,
1162 options: Option<FileOptions>,
1163 progress_callback: Option<UploadProgressCallback>,
1164 ) -> Result<UploadResponse> {
1165 let config = config.unwrap_or_default();
1166
1167 debug!("Starting large file upload from: {:?}", file_path.as_ref());
1168
1169 let metadata = tokio::fs::metadata(&file_path)
1171 .await
1172 .map_err(|e| Error::storage(format!("Failed to get file metadata: {}", e)))?;
1173
1174 let total_size = metadata.len();
1175
1176 if total_size <= config.chunk_size {
1177 return self.upload_file(bucket_id, path, file_path, options).await;
1179 }
1180
1181 let mut session = self
1183 .start_resumable_upload(bucket_id, path, total_size, Some(config.clone()), options)
1184 .await?;
1185
1186 let mut file = tokio::fs::File::open(&file_path)
1188 .await
1189 .map_err(|e| Error::storage(format!("Failed to open file: {}", e)))?;
1190
1191 let mut uploaded_size = 0u64;
1192 let mut part_number = 1u32;
1193
1194 loop {
1196 let remaining_size = total_size - uploaded_size;
1197 if remaining_size == 0 {
1198 break;
1199 }
1200
1201 let chunk_size = std::cmp::min(config.chunk_size, remaining_size);
1202 let mut buffer = vec![0u8; chunk_size as usize];
1203
1204 use tokio::io::AsyncReadExt;
1206 let bytes_read = file
1207 .read_exact(&mut buffer)
1208 .await
1209 .map_err(|e| Error::storage(format!("Failed to read file chunk: {}", e)))?;
1210
1211 if bytes_read == 0 {
1212 break;
1213 }
1214
1215 buffer.truncate(bytes_read);
1216 let chunk_data = Bytes::from(buffer);
1217
1218 let mut attempts = 0;
1220 let part = loop {
1221 attempts += 1;
1222
1223 match self
1224 .upload_chunk(&session, part_number, chunk_data.clone())
1225 .await
1226 {
1227 Ok(part) => break part,
1228 Err(e) if attempts < config.max_retries => {
1229 warn!(
1230 "Upload chunk {} failed (attempt {}), retrying: {}",
1231 part_number, attempts, e
1232 );
1233 async_sleep(Duration::from_millis(config.retry_delay)).await;
1234 continue;
1235 }
1236 Err(e) => return Err(e),
1237 }
1238 };
1239
1240 session.uploaded_parts.push(part);
1241 uploaded_size += chunk_size;
1242 part_number += 1;
1243
1244 if let Some(callback) = &progress_callback {
1246 callback(uploaded_size, total_size);
1247 }
1248
1249 debug!(
1250 "Uploaded chunk {}, progress: {}/{}",
1251 part_number - 1,
1252 uploaded_size,
1253 total_size
1254 );
1255 }
1256
1257 let response = self.complete_resumable_upload(&session).await?;
1259
1260 info!("Large file upload completed: {}", response.key);
1261 Ok(response)
1262 }
1263
1264 #[cfg(not(target_arch = "wasm32"))]
1266 pub async fn get_upload_session(&self, upload_id: &str) -> Result<UploadSession> {
1267 debug!("Getting upload session status: {}", upload_id);
1268
1269 let url = format!("{}/storage/v1/resumable/{}", self.config.url, upload_id);
1270
1271 let response = self.http_client.get(&url).send().await?;
1272
1273 if !response.status().is_success() {
1274 let error_msg = format!(
1275 "Get upload session failed with status: {}",
1276 response.status()
1277 );
1278 return Err(Error::storage(error_msg));
1279 }
1280
1281 let session: UploadSession = response.json().await?;
1282 Ok(session)
1283 }
1284
1285 #[cfg(not(target_arch = "wasm32"))]
1287 pub async fn cancel_upload_session(&self, upload_id: &str) -> Result<()> {
1288 debug!("Cancelling upload session: {}", upload_id);
1289
1290 let url = format!("{}/storage/v1/resumable/{}", self.config.url, upload_id);
1291
1292 let response = self.http_client.delete(&url).send().await?;
1293
1294 if !response.status().is_success() {
1295 let error_msg = format!(
1296 "Cancel upload session failed with status: {}",
1297 response.status()
1298 );
1299 return Err(Error::storage(error_msg));
1300 }
1301
1302 info!("Cancelled upload session: {}", upload_id);
1303 Ok(())
1304 }
1305
1306 pub async fn update_file_metadata(
1335 &self,
1336 bucket_id: &str,
1337 path: &str,
1338 metadata: &FileMetadata,
1339 ) -> Result<()> {
1340 debug!(
1341 "Updating file metadata for bucket: {} path: {}",
1342 bucket_id, path
1343 );
1344
1345 let url = format!(
1346 "{}/storage/v1/object/{}/{}/metadata",
1347 self.config.url, bucket_id, path
1348 );
1349
1350 let response = self.http_client.put(&url).json(metadata).send().await?;
1351
1352 if !response.status().is_success() {
1353 let error_msg = format!(
1354 "Update file metadata failed with status: {}",
1355 response.status()
1356 );
1357 return Err(Error::storage(error_msg));
1358 }
1359
1360 info!("Updated file metadata successfully");
1361 Ok(())
1362 }
1363
1364 pub async fn search_files(
1389 &self,
1390 bucket_id: &str,
1391 search_options: &SearchOptions,
1392 ) -> Result<Vec<FileObject>> {
1393 debug!("Searching files in bucket: {}", bucket_id);
1394
1395 let url = format!("{}/storage/v1/object/{}/search", self.config.url, bucket_id);
1396
1397 let response = self
1398 .http_client
1399 .post(&url)
1400 .json(search_options)
1401 .send()
1402 .await?;
1403
1404 if !response.status().is_success() {
1405 let error_msg = format!("Search files failed with status: {}", response.status());
1406 return Err(Error::storage(error_msg));
1407 }
1408
1409 let files: Vec<FileObject> = response.json().await?;
1410 info!("Found {} files matching search criteria", files.len());
1411
1412 Ok(files)
1413 }
1414
1415 pub async fn create_policy(&self, policy: &StoragePolicy) -> Result<()> {
1439 debug!("Creating storage policy: {}", policy.name);
1440
1441 let url = format!("{}/rest/v1/rpc/create_storage_policy", self.config.url);
1442
1443 let payload = serde_json::json!({
1444 "policy_name": policy.name,
1445 "bucket_name": policy.bucket_id,
1446 "operation": policy.operation,
1447 "definition": policy.definition,
1448 "check_expression": policy.check
1449 });
1450
1451 let response = self
1452 .http_client
1453 .post(&url)
1454 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1455 .header("apikey", self.get_admin_key())
1456 .json(&payload)
1457 .send()
1458 .await?;
1459
1460 if !response.status().is_success() {
1461 let error_msg = format!(
1462 "Create storage policy failed with status: {}",
1463 response.status()
1464 );
1465 return Err(Error::storage(error_msg));
1466 }
1467
1468 info!("Created storage policy: {}", policy.name);
1469 Ok(())
1470 }
1471
1472 pub async fn update_policy(&self, policy: &StoragePolicy) -> Result<()> {
1492 debug!("Updating storage policy: {}", policy.name);
1493
1494 let url = format!("{}/rest/v1/rpc/update_storage_policy", self.config.url);
1495
1496 let payload = serde_json::json!({
1497 "policy_name": policy.name,
1498 "bucket_name": policy.bucket_id,
1499 "operation": policy.operation,
1500 "definition": policy.definition,
1501 "check_expression": policy.check
1502 });
1503
1504 let response = self
1505 .http_client
1506 .put(&url)
1507 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1508 .header("apikey", self.get_admin_key())
1509 .json(&payload)
1510 .send()
1511 .await?;
1512
1513 if !response.status().is_success() {
1514 let error_msg = format!(
1515 "Update storage policy failed with status: {}",
1516 response.status()
1517 );
1518 return Err(Error::storage(error_msg));
1519 }
1520
1521 info!("Updated storage policy: {}", policy.name);
1522 Ok(())
1523 }
1524
1525 pub async fn delete_policy(&self, bucket_id: &str, policy_name: &str) -> Result<()> {
1535 debug!(
1536 "Deleting storage policy: {} from bucket: {}",
1537 policy_name, bucket_id
1538 );
1539
1540 let url = format!("{}/rest/v1/rpc/delete_storage_policy", self.config.url);
1541
1542 let payload = serde_json::json!({
1543 "policy_name": policy_name,
1544 "bucket_name": bucket_id
1545 });
1546
1547 let response = self
1548 .http_client
1549 .delete(&url)
1550 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1551 .header("apikey", self.get_admin_key())
1552 .json(&payload)
1553 .send()
1554 .await?;
1555
1556 if !response.status().is_success() {
1557 let error_msg = format!(
1558 "Delete storage policy failed with status: {}",
1559 response.status()
1560 );
1561 return Err(Error::storage(error_msg));
1562 }
1563
1564 info!("Deleted storage policy: {}", policy_name);
1565 Ok(())
1566 }
1567
1568 pub async fn list_policies(&self, bucket_id: &str) -> Result<Vec<StoragePolicy>> {
1579 debug!("Listing storage policies for bucket: {}", bucket_id);
1580
1581 let url = format!("{}/rest/v1/rpc/list_storage_policies", self.config.url);
1582
1583 let payload = serde_json::json!({
1584 "bucket_name": bucket_id
1585 });
1586
1587 let response = self
1588 .http_client
1589 .post(&url)
1590 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1591 .header("apikey", self.get_admin_key())
1592 .json(&payload)
1593 .send()
1594 .await?;
1595
1596 if !response.status().is_success() {
1597 let error_msg = format!(
1598 "List storage policies failed with status: {}",
1599 response.status()
1600 );
1601 return Err(Error::storage(error_msg));
1602 }
1603
1604 let policies: Vec<StoragePolicy> = response.json().await?;
1605 info!(
1606 "Listed {} storage policies for bucket: {}",
1607 policies.len(),
1608 bucket_id
1609 );
1610
1611 Ok(policies)
1612 }
1613
1614 pub async fn test_policy_access(
1637 &self,
1638 bucket_id: &str,
1639 object_path: &str,
1640 operation: PolicyOperation,
1641 user_id: &str,
1642 ) -> Result<bool> {
1643 debug!(
1644 "Testing policy access for user: {} on object: {}",
1645 user_id, object_path
1646 );
1647
1648 let url = format!("{}/rest/v1/rpc/test_storage_policy_access", self.config.url);
1649
1650 let payload = serde_json::json!({
1651 "bucket_name": bucket_id,
1652 "object_name": object_path,
1653 "operation": operation,
1654 "user_id": user_id
1655 });
1656
1657 let response = self
1658 .http_client
1659 .post(&url)
1660 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1661 .header("apikey", self.get_admin_key())
1662 .json(&payload)
1663 .send()
1664 .await?;
1665
1666 if !response.status().is_success() {
1667 let error_msg = format!(
1668 "Test policy access failed with status: {}",
1669 response.status()
1670 );
1671 return Err(Error::storage(error_msg));
1672 }
1673
1674 let result: serde_json::Value = response.json().await?;
1675 let can_access = result["can_access"].as_bool().unwrap_or(false);
1676
1677 info!("Policy access test result: {}", can_access);
1678 Ok(can_access)
1679 }
1680
1681 pub fn generate_policy_template(
1699 &self,
1700 bucket_id: &str,
1701 policy_name: &str,
1702 template: PolicyTemplate,
1703 ) -> StoragePolicy {
1704 let (operation, definition, check) = match template {
1705 PolicyTemplate::PublicRead => (PolicyOperation::Select, "true".to_string(), None),
1706 PolicyTemplate::AuthenticatedRead => (
1707 PolicyOperation::Select,
1708 "auth.uid() IS NOT NULL".to_string(),
1709 None,
1710 ),
1711 PolicyTemplate::UserFolderAccess => (
1712 PolicyOperation::All,
1713 "auth.uid()::text = (storage.foldername(name))[1]".to_string(),
1714 Some("auth.uid() IS NOT NULL".to_string()),
1715 ),
1716 PolicyTemplate::AdminFullAccess => (
1717 PolicyOperation::All,
1718 "auth.role() = 'admin'".to_string(),
1719 None,
1720 ),
1721 PolicyTemplate::ReadOnlyForRole(role) => (
1722 PolicyOperation::Select,
1723 format!("auth.role() = '{}'", role),
1724 None,
1725 ),
1726 };
1727
1728 StoragePolicy {
1729 name: policy_name.to_string(),
1730 bucket_id: bucket_id.to_string(),
1731 operation,
1732 definition,
1733 check,
1734 }
1735 }
1736}
1737
1738#[derive(Debug, Clone, Serialize, Deserialize)]
1740pub struct StoragePolicy {
1741 pub name: String,
1742 pub bucket_id: String,
1743 pub operation: PolicyOperation,
1744 pub definition: String,
1745 pub check: Option<String>,
1746}
1747
1748#[derive(Debug, Clone, Serialize, Deserialize)]
1750#[serde(rename_all = "UPPERCASE")]
1751pub enum PolicyOperation {
1752 Select,
1753 Insert,
1754 Update,
1755 Delete,
1756 All,
1757}
1758
1759#[derive(Debug, Clone)]
1761pub enum PolicyTemplate {
1762 PublicRead,
1764 AuthenticatedRead,
1766 UserFolderAccess,
1768 AdminFullAccess,
1770 ReadOnlyForRole(String),
1772}