1use crate::{
4 error::{Error, Result},
5 types::{SupabaseConfig, Timestamp},
6};
7use bytes::Bytes;
8
9#[cfg(target_arch = "wasm32")]
10use reqwest::Client as HttpClient;
11#[cfg(not(target_arch = "wasm32"))]
12use reqwest::{multipart, Client as HttpClient};
13use serde::{Deserialize, Serialize};
14use std::{collections::HashMap, sync::Arc};
15
16use tracing::{debug, info, warn};
17use url::Url;
18
19use tokio::time::{sleep, Duration};
21
22#[derive(Debug, Clone)]
24pub struct Storage {
25 http_client: Arc<HttpClient>,
26 config: Arc<SupabaseConfig>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct Bucket {
32 pub id: String,
33 pub name: String,
34 pub owner: Option<String>,
35 pub public: bool,
36 pub file_size_limit: Option<u64>,
37 pub allowed_mime_types: Option<Vec<String>>,
38 pub created_at: Timestamp,
39 pub updated_at: Timestamp,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct FileObject {
45 pub name: String,
46 pub id: Option<String>,
47 pub updated_at: Option<Timestamp>,
48 pub created_at: Option<Timestamp>,
49 pub last_accessed_at: Option<Timestamp>,
50 pub metadata: Option<HashMap<String, serde_json::Value>>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct UploadResponse {
56 #[serde(rename = "Key")]
57 pub key: String,
58 #[serde(rename = "Id")]
59 pub id: Option<String>,
60}
61
62#[derive(Debug, Clone, Default)]
64pub struct FileOptions {
65 pub cache_control: Option<String>,
66 pub content_type: Option<String>,
67 pub upsert: bool,
68}
69
70#[derive(Debug, Clone)]
72pub struct TransformOptions {
73 pub width: Option<u32>,
74 pub height: Option<u32>,
75 pub resize: Option<ResizeMode>,
76 pub format: Option<ImageFormat>,
77 pub quality: Option<u8>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "lowercase")]
83pub enum ResizeMode {
84 Cover,
85 Contain,
86 Fill,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "lowercase")]
92pub enum ImageFormat {
93 Webp,
94 Jpeg,
95 Png,
96 Avif,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct UploadSession {
102 pub upload_id: String,
103 pub part_size: u64,
104 pub total_size: u64,
105 pub uploaded_parts: Vec<UploadedPart>,
106 pub bucket_id: String,
107 pub object_path: String,
108 pub created_at: Timestamp,
109 pub expires_at: Timestamp,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct UploadedPart {
115 pub part_number: u32,
116 pub etag: String,
117 pub size: u64,
118}
119
120#[derive(Debug, Clone)]
122pub struct ResumableUploadConfig {
123 pub chunk_size: u64,
125 pub max_retries: u32,
127 pub retry_delay: u64,
129 pub verify_checksums: bool,
131}
132
133impl Default for ResumableUploadConfig {
134 fn default() -> Self {
135 Self {
136 chunk_size: 5 * 1024 * 1024, max_retries: 3,
138 retry_delay: 1000,
139 verify_checksums: true,
140 }
141 }
142}
143
144pub type UploadProgressCallback = Arc<dyn Fn(u64, u64) + Send + Sync>;
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct FileMetadata {
150 pub tags: Option<HashMap<String, String>>,
151 pub custom_metadata: Option<HashMap<String, serde_json::Value>>,
152 pub description: Option<String>,
153 pub category: Option<String>,
154 pub searchable_content: Option<String>,
155}
156
157#[derive(Debug, Clone, Default, Serialize)]
159pub struct SearchOptions {
160 pub tags: Option<HashMap<String, String>>,
161 pub category: Option<String>,
162 pub content_search: Option<String>,
163 pub limit: Option<u32>,
164 pub offset: Option<u32>,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
169pub enum StorageEvent {
170 #[serde(rename = "file_uploaded")]
171 FileUploaded,
172 #[serde(rename = "file_deleted")]
173 FileDeleted,
174 #[serde(rename = "file_updated")]
175 FileUpdated,
176 #[serde(rename = "bucket_created")]
177 BucketCreated,
178 #[serde(rename = "bucket_deleted")]
179 BucketDeleted,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct StorageEventMessage {
185 pub event: StorageEvent,
186 pub bucket_id: String,
187 pub object_path: Option<String>,
188 pub object_metadata: Option<FileObject>,
189 pub timestamp: Timestamp,
190 pub user_id: Option<String>,
191}
192
193pub type StorageEventCallback = Arc<dyn Fn(StorageEventMessage) + Send + Sync>;
195
196impl Storage {
197 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
199 debug!("Initializing Storage module");
200
201 Ok(Self {
202 http_client,
203 config,
204 })
205 }
206
207 fn get_admin_key(&self) -> &str {
209 self.config
210 .service_role_key
211 .as_ref()
212 .unwrap_or(&self.config.key)
213 }
214
215 pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
217 debug!("Listing all storage buckets");
218
219 let url = format!("{}/storage/v1/bucket", self.config.url);
220 let response = self.http_client.get(&url).send().await?;
221
222 if !response.status().is_success() {
223 let status = response.status();
224 let error_msg = match response.text().await {
225 Ok(text) => text,
226 Err(_) => format!("List buckets failed with status: {}", status),
227 };
228 return Err(Error::storage(error_msg));
229 }
230
231 let buckets: Vec<Bucket> = response.json().await?;
232 info!("Listed {} buckets successfully", buckets.len());
233
234 Ok(buckets)
235 }
236
237 pub async fn get_bucket(&self, bucket_id: &str) -> Result<Bucket> {
239 debug!("Getting bucket info for: {}", bucket_id);
240
241 let url = format!("{}/storage/v1/bucket/{}", self.config.url, bucket_id);
242 let response = self.http_client.get(&url).send().await?;
243
244 if !response.status().is_success() {
245 let status = response.status();
246 let error_msg = match response.text().await {
247 Ok(text) => text,
248 Err(_) => format!("Get bucket failed with status: {}", status),
249 };
250 return Err(Error::storage(error_msg));
251 }
252
253 let bucket: Bucket = response.json().await?;
254 info!("Retrieved bucket info for: {}", bucket_id);
255
256 Ok(bucket)
257 }
258
259 pub async fn create_bucket(&self, id: &str, name: &str, public: bool) -> Result<Bucket> {
261 debug!("Creating bucket: {} ({})", name, id);
262
263 let payload = serde_json::json!({
264 "id": id,
265 "name": name,
266 "public": public
267 });
268
269 let url = format!("{}/storage/v1/bucket", self.config.url);
270 let response = self
271 .http_client
272 .post(&url)
273 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
274 .json(&payload)
275 .send()
276 .await?;
277
278 if !response.status().is_success() {
279 let status = response.status();
280 let error_msg = match response.text().await {
281 Ok(text) => text,
282 Err(_) => format!("Create bucket failed with status: {}", status),
283 };
284 return Err(Error::storage(error_msg));
285 }
286
287 let bucket: Bucket = response.json().await?;
288 info!("Created bucket successfully: {}", id);
289
290 Ok(bucket)
291 }
292
293 pub async fn update_bucket(&self, id: &str, public: Option<bool>) -> Result<()> {
295 debug!("Updating bucket: {}", id);
296
297 let mut payload = serde_json::Map::new();
298 if let Some(public) = public {
299 payload.insert("public".to_string(), serde_json::Value::Bool(public));
300 }
301
302 let url = format!("{}/storage/v1/bucket/{}", self.config.url, id);
303 let response = self
304 .http_client
305 .put(&url)
306 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
307 .json(&payload)
308 .send()
309 .await?;
310
311 if !response.status().is_success() {
312 let status = response.status();
313 let error_msg = match response.text().await {
314 Ok(text) => text,
315 Err(_) => format!("Update bucket failed with status: {}", status),
316 };
317 return Err(Error::storage(error_msg));
318 }
319
320 info!("Updated bucket successfully: {}", id);
321 Ok(())
322 }
323
324 pub async fn delete_bucket(&self, id: &str) -> Result<()> {
326 debug!("Deleting bucket: {}", id);
327
328 let url = format!("{}/storage/v1/bucket/{}", self.config.url, id);
329 let response = self
330 .http_client
331 .delete(&url)
332 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
333 .send()
334 .await?;
335
336 if !response.status().is_success() {
337 let status = response.status();
338 let error_msg = match response.text().await {
339 Ok(text) => text,
340 Err(_) => format!("Delete bucket failed with status: {}", status),
341 };
342 return Err(Error::storage(error_msg));
343 }
344
345 info!("Deleted bucket successfully: {}", id);
346 Ok(())
347 }
348
349 pub async fn list(&self, bucket_id: &str, path: Option<&str>) -> Result<Vec<FileObject>> {
351 debug!("Listing files in bucket: {}", bucket_id);
352
353 let url = format!("{}/storage/v1/object/list/{}", self.config.url, bucket_id);
354
355 let payload = serde_json::json!({
356 "prefix": path.unwrap_or("")
357 });
358
359 let response = self.http_client.post(&url).json(&payload).send().await?;
360
361 if !response.status().is_success() {
362 let status = response.status();
363 let error_msg = match response.text().await {
364 Ok(text) => text,
365 Err(_) => format!("List files failed with status: {}", status),
366 };
367 return Err(Error::storage(error_msg));
368 }
369
370 let files: Vec<FileObject> = response.json().await?;
371 info!("Listed {} files in bucket: {}", files.len(), bucket_id);
372
373 Ok(files)
374 }
375
376 #[cfg(not(target_arch = "wasm32"))]
378 pub async fn upload(
379 &self,
380 bucket_id: &str,
381 path: &str,
382 file_body: Bytes,
383 options: Option<FileOptions>,
384 ) -> Result<UploadResponse> {
385 debug!("Uploading file to bucket: {} at path: {}", bucket_id, path);
386
387 let options = options.unwrap_or_default();
388
389 let url = format!(
390 "{}/storage/v1/object/{}/{}",
391 self.config.url, bucket_id, path
392 );
393
394 let mut form = multipart::Form::new().part(
395 "file",
396 multipart::Part::bytes(file_body.to_vec()).file_name(path.to_string()),
397 );
398
399 if let Some(content_type) = options.content_type {
400 form = form.part("contentType", multipart::Part::text(content_type));
401 }
402
403 if let Some(cache_control) = options.cache_control {
404 form = form.part("cacheControl", multipart::Part::text(cache_control));
405 }
406
407 let mut request = self.http_client.post(&url).multipart(form);
408
409 if options.upsert {
410 request = request.header("x-upsert", "true");
411 }
412
413 let response = request.send().await?;
414
415 if !response.status().is_success() {
416 let status = response.status();
417 let error_msg = match response.text().await {
418 Ok(text) => text,
419 Err(_) => format!("Upload failed with status: {}", status),
420 };
421 return Err(Error::storage(error_msg));
422 }
423
424 let upload_response: UploadResponse = response.json().await?;
425 info!("Uploaded file successfully: {}", path);
426 Ok(upload_response)
427 }
428
429 #[cfg(target_arch = "wasm32")]
433 pub async fn upload(
434 &self,
435 bucket_id: &str,
436 path: &str,
437 file_body: Bytes,
438 options: Option<FileOptions>,
439 ) -> Result<UploadResponse> {
440 debug!(
441 "Uploading file to bucket: {} at path: {} (WASM)",
442 bucket_id, path
443 );
444
445 let options = options.unwrap_or_default();
446
447 let url = format!(
448 "{}/storage/v1/object/{}/{}",
449 self.config.url, bucket_id, path
450 );
451
452 let mut request = self.http_client.post(&url).body(file_body);
453
454 if let Some(content_type) = options.content_type {
455 request = request.header("Content-Type", content_type);
456 }
457
458 if let Some(cache_control) = options.cache_control {
459 request = request.header("Cache-Control", cache_control);
460 }
461
462 if options.upsert {
463 request = request.header("x-upsert", "true");
464 }
465
466 let response = request.send().await?;
467
468 if !response.status().is_success() {
469 let status = response.status();
470 let error_msg = match response.text().await {
471 Ok(text) => text,
472 Err(_) => format!("Upload failed with status: {}", status),
473 };
474 return Err(Error::storage(error_msg));
475 }
476
477 let upload_response: UploadResponse = response.json().await?;
478 info!("Uploaded file successfully: {}", path);
479
480 Ok(upload_response)
481 }
482
483 #[cfg(all(not(target_arch = "wasm32"), feature = "native"))]
485 pub async fn upload_file<P: AsRef<std::path::Path>>(
486 &self,
487 bucket_id: &str,
488 path: &str,
489 file_path: P,
490 options: Option<FileOptions>,
491 ) -> Result<UploadResponse> {
492 debug!("Uploading file from path: {:?}", file_path.as_ref());
493
494 let file_bytes = tokio::fs::read(file_path)
495 .await
496 .map_err(|e| Error::storage(format!("Failed to read file: {}", e)))?;
497
498 self.upload(bucket_id, path, Bytes::from(file_bytes), options)
499 .await
500 }
501
502 pub async fn download(&self, bucket_id: &str, path: &str) -> Result<Bytes> {
504 debug!(
505 "Downloading file from bucket: {} at path: {}",
506 bucket_id, path
507 );
508
509 let url = format!(
510 "{}/storage/v1/object/{}/{}",
511 self.config.url, bucket_id, path
512 );
513
514 let response = self.http_client.get(&url).send().await?;
515
516 if !response.status().is_success() {
517 let error_msg = format!("Download failed with status: {}", response.status());
518 return Err(Error::storage(error_msg));
519 }
520
521 let bytes = response.bytes().await?;
522 info!("Downloaded file successfully: {}", path);
523
524 Ok(bytes)
525 }
526
527 pub async fn remove(&self, bucket_id: &str, paths: &[&str]) -> Result<()> {
529 debug!("Deleting files from bucket: {}", bucket_id);
530
531 let url = format!("{}/storage/v1/object/{}", self.config.url, bucket_id);
532
533 let payload = serde_json::json!({
534 "prefixes": paths
535 });
536
537 let response = self.http_client.delete(&url).json(&payload).send().await?;
538
539 if !response.status().is_success() {
540 let error_msg = format!("Delete files failed with status: {}", response.status());
541 return Err(Error::storage(error_msg));
542 }
543
544 info!("Deleted {} files successfully", paths.len());
545 Ok(())
546 }
547
548 pub async fn r#move(&self, bucket_id: &str, from_path: &str, to_path: &str) -> Result<()> {
550 debug!("Moving file from {} to {}", from_path, to_path);
551
552 let url = format!("{}/storage/v1/object/move", self.config.url);
553
554 let payload = serde_json::json!({
555 "bucketId": bucket_id,
556 "sourceKey": from_path,
557 "destinationKey": to_path
558 });
559
560 let response = self.http_client.post(&url).json(&payload).send().await?;
561
562 if !response.status().is_success() {
563 let status = response.status();
564 let error_msg = match response.text().await {
565 Ok(text) => text,
566 Err(_) => format!("Move failed with status: {}", status),
567 };
568 return Err(Error::storage(error_msg));
569 }
570
571 info!("Moved file successfully from {} to {}", from_path, to_path);
572 Ok(())
573 }
574
575 pub async fn copy(&self, bucket_id: &str, from_path: &str, to_path: &str) -> Result<()> {
577 debug!("Copying file from {} to {}", from_path, to_path);
578
579 let url = format!("{}/storage/v1/object/copy", self.config.url);
580
581 let payload = serde_json::json!({
582 "bucketId": bucket_id,
583 "sourceKey": from_path,
584 "destinationKey": to_path
585 });
586
587 let response = self.http_client.post(&url).json(&payload).send().await?;
588
589 if !response.status().is_success() {
590 let status = response.status();
591 let error_msg = match response.text().await {
592 Ok(text) => text,
593 Err(_) => format!("Copy failed with status: {}", status),
594 };
595 return Err(Error::storage(error_msg));
596 }
597
598 info!("Copied file successfully from {} to {}", from_path, to_path);
599 Ok(())
600 }
601
602 pub fn get_public_url(&self, bucket_id: &str, path: &str) -> String {
604 format!(
605 "{}/storage/v1/object/public/{}/{}",
606 self.config.url, bucket_id, path
607 )
608 }
609
610 pub async fn create_signed_url(
612 &self,
613 bucket_id: &str,
614 path: &str,
615 expires_in: u32,
616 transform: Option<TransformOptions>,
617 ) -> Result<String> {
618 debug!(
619 "Creating signed URL for bucket: {} path: {} expires_in: {}",
620 bucket_id, path, expires_in
621 );
622
623 let url = format!(
624 "{}/storage/v1/object/sign/{}/{}",
625 self.config.url, bucket_id, path
626 );
627
628 let mut payload = serde_json::json!({
629 "expiresIn": expires_in
630 });
631
632 if let Some(transform_opts) = transform {
633 let mut transform_params = serde_json::Map::new();
634
635 if let Some(width) = transform_opts.width {
636 transform_params.insert("width".to_string(), serde_json::Value::from(width));
637 }
638 if let Some(height) = transform_opts.height {
639 transform_params.insert("height".to_string(), serde_json::Value::from(height));
640 }
641 if let Some(resize) = transform_opts.resize {
642 transform_params.insert("resize".to_string(), serde_json::to_value(resize)?);
643 }
644 if let Some(format) = transform_opts.format {
645 transform_params.insert("format".to_string(), serde_json::to_value(format)?);
646 }
647 if let Some(quality) = transform_opts.quality {
648 transform_params.insert("quality".to_string(), serde_json::Value::from(quality));
649 }
650
651 payload["transform"] = serde_json::Value::Object(transform_params);
652 }
653
654 let response = self.http_client.post(&url).json(&payload).send().await?;
655
656 if !response.status().is_success() {
657 let error_msg = format!(
658 "Create signed URL failed with status: {}",
659 response.status()
660 );
661 return Err(Error::storage(error_msg));
662 }
663
664 let response_data: serde_json::Value = response.json().await?;
665 let signed_url = response_data["signedURL"]
666 .as_str()
667 .ok_or_else(|| Error::storage("Invalid signed URL response"))?;
668
669 info!("Created signed URL successfully");
670 Ok(signed_url.to_string())
671 }
672
673 pub fn get_public_url_transformed(
675 &self,
676 bucket_id: &str,
677 path: &str,
678 options: TransformOptions,
679 ) -> Result<String> {
680 let mut url = Url::parse(&self.get_public_url(bucket_id, path))?;
681
682 if let Some(width) = options.width {
683 url.query_pairs_mut()
684 .append_pair("width", &width.to_string());
685 }
686
687 if let Some(height) = options.height {
688 url.query_pairs_mut()
689 .append_pair("height", &height.to_string());
690 }
691
692 if let Some(resize) = options.resize {
693 let resize_str = match resize {
694 ResizeMode::Cover => "cover",
695 ResizeMode::Contain => "contain",
696 ResizeMode::Fill => "fill",
697 };
698 url.query_pairs_mut().append_pair("resize", resize_str);
699 }
700
701 if let Some(format) = options.format {
702 let format_str = match format {
703 ImageFormat::Webp => "webp",
704 ImageFormat::Jpeg => "jpeg",
705 ImageFormat::Png => "png",
706 ImageFormat::Avif => "avif",
707 };
708 url.query_pairs_mut().append_pair("format", format_str);
709 }
710
711 if let Some(quality) = options.quality {
712 url.query_pairs_mut()
713 .append_pair("quality", &quality.to_string());
714 }
715
716 Ok(url.to_string())
717 }
718
719 #[cfg(not(target_arch = "wasm32"))]
745 pub async fn start_resumable_upload(
746 &self,
747 bucket_id: &str,
748 path: &str,
749 total_size: u64,
750 config: Option<ResumableUploadConfig>,
751 options: Option<FileOptions>,
752 ) -> Result<UploadSession> {
753 let config = config.unwrap_or_default();
754 let options = options.unwrap_or_default();
755
756 debug!(
757 "Starting resumable upload for bucket: {} path: {} size: {}",
758 bucket_id, path, total_size
759 );
760
761 let url = format!(
762 "{}/storage/v1/object/{}/{}/resumable",
763 self.config.url, bucket_id, path
764 );
765
766 let payload = serde_json::json!({
767 "totalSize": total_size,
768 "chunkSize": config.chunk_size,
769 "contentType": options.content_type,
770 "cacheControl": options.cache_control,
771 "upsert": options.upsert
772 });
773
774 let response = self.http_client.post(&url).json(&payload).send().await?;
775
776 if !response.status().is_success() {
777 let error_msg = format!(
778 "Start resumable upload failed with status: {}",
779 response.status()
780 );
781 return Err(Error::storage(error_msg));
782 }
783
784 let session: UploadSession = response.json().await?;
785 info!("Started resumable upload session: {}", session.upload_id);
786
787 Ok(session)
788 }
789
790 #[cfg(not(target_arch = "wasm32"))]
810 pub async fn upload_chunk(
811 &self,
812 session: &UploadSession,
813 part_number: u32,
814 chunk_data: Bytes,
815 ) -> Result<UploadedPart> {
816 debug!(
817 "Uploading chunk {} for session: {} size: {}",
818 part_number,
819 session.upload_id,
820 chunk_data.len()
821 );
822
823 let url = format!(
824 "{}/storage/v1/object/{}/{}/resumable/{}",
825 self.config.url, session.bucket_id, session.object_path, session.upload_id
826 );
827
828 let chunk_size = chunk_data.len() as u64;
829
830 let response = self
831 .http_client
832 .put(&url)
833 .header("Content-Type", "application/octet-stream")
834 .header("X-Part-Number", part_number.to_string())
835 .body(chunk_data)
836 .send()
837 .await?;
838
839 if !response.status().is_success() {
840 let error_msg = format!("Upload chunk failed with status: {}", response.status());
841 return Err(Error::storage(error_msg));
842 }
843
844 let etag = response
845 .headers()
846 .get("etag")
847 .and_then(|h| h.to_str().ok())
848 .unwrap_or("")
849 .to_string();
850
851 let part = UploadedPart {
852 part_number,
853 etag,
854 size: chunk_size,
855 };
856
857 info!("Uploaded chunk {} successfully", part_number);
858 Ok(part)
859 }
860
861 #[cfg(not(target_arch = "wasm32"))]
874 pub async fn complete_resumable_upload(
875 &self,
876 session: &UploadSession,
877 ) -> Result<UploadResponse> {
878 debug!(
879 "Completing resumable upload for session: {}",
880 session.upload_id
881 );
882
883 let url = format!(
884 "{}/storage/v1/object/{}/{}/resumable/{}/complete",
885 self.config.url, session.bucket_id, session.object_path, session.upload_id
886 );
887
888 let payload = serde_json::json!({
889 "parts": session.uploaded_parts
890 });
891
892 let response = self.http_client.post(&url).json(&payload).send().await?;
893
894 if !response.status().is_success() {
895 let error_msg = format!(
896 "Complete resumable upload failed with status: {}",
897 response.status()
898 );
899 return Err(Error::storage(error_msg));
900 }
901
902 let upload_response: UploadResponse = response.json().await?;
903 info!("Completed resumable upload: {}", upload_response.key);
904
905 Ok(upload_response)
906 }
907
908 #[cfg(all(not(target_arch = "wasm32"), feature = "native"))]
942 pub async fn upload_large_file<P: AsRef<std::path::Path>>(
943 &self,
944 bucket_id: &str,
945 path: &str,
946 file_path: P,
947 config: Option<ResumableUploadConfig>,
948 options: Option<FileOptions>,
949 progress_callback: Option<UploadProgressCallback>,
950 ) -> Result<UploadResponse> {
951 let config = config.unwrap_or_default();
952
953 debug!("Starting large file upload from: {:?}", file_path.as_ref());
954
955 let metadata = tokio::fs::metadata(&file_path)
957 .await
958 .map_err(|e| Error::storage(format!("Failed to get file metadata: {}", e)))?;
959
960 let total_size = metadata.len();
961
962 if total_size <= config.chunk_size {
963 return self.upload_file(bucket_id, path, file_path, options).await;
965 }
966
967 let mut session = self
969 .start_resumable_upload(bucket_id, path, total_size, Some(config.clone()), options)
970 .await?;
971
972 let mut file = tokio::fs::File::open(&file_path)
974 .await
975 .map_err(|e| Error::storage(format!("Failed to open file: {}", e)))?;
976
977 let mut uploaded_size = 0u64;
978 let mut part_number = 1u32;
979
980 loop {
982 let remaining_size = total_size - uploaded_size;
983 if remaining_size == 0 {
984 break;
985 }
986
987 let chunk_size = std::cmp::min(config.chunk_size, remaining_size);
988 let mut buffer = vec![0u8; chunk_size as usize];
989
990 use tokio::io::AsyncReadExt;
992 let bytes_read = file
993 .read_exact(&mut buffer)
994 .await
995 .map_err(|e| Error::storage(format!("Failed to read file chunk: {}", e)))?;
996
997 if bytes_read == 0 {
998 break;
999 }
1000
1001 buffer.truncate(bytes_read);
1002 let chunk_data = Bytes::from(buffer);
1003
1004 let mut attempts = 0;
1006 let part = loop {
1007 attempts += 1;
1008
1009 match self
1010 .upload_chunk(&session, part_number, chunk_data.clone())
1011 .await
1012 {
1013 Ok(part) => break part,
1014 Err(e) if attempts < config.max_retries => {
1015 warn!(
1016 "Upload chunk {} failed (attempt {}), retrying: {}",
1017 part_number, attempts, e
1018 );
1019 sleep(Duration::from_millis(config.retry_delay)).await;
1020 continue;
1021 }
1022 Err(e) => return Err(e),
1023 }
1024 };
1025
1026 session.uploaded_parts.push(part);
1027 uploaded_size += chunk_size;
1028 part_number += 1;
1029
1030 if let Some(callback) = &progress_callback {
1032 callback(uploaded_size, total_size);
1033 }
1034
1035 debug!(
1036 "Uploaded chunk {}, progress: {}/{}",
1037 part_number - 1,
1038 uploaded_size,
1039 total_size
1040 );
1041 }
1042
1043 let response = self.complete_resumable_upload(&session).await?;
1045
1046 info!("Large file upload completed: {}", response.key);
1047 Ok(response)
1048 }
1049
1050 #[cfg(not(target_arch = "wasm32"))]
1052 pub async fn get_upload_session(&self, upload_id: &str) -> Result<UploadSession> {
1053 debug!("Getting upload session status: {}", upload_id);
1054
1055 let url = format!("{}/storage/v1/resumable/{}", self.config.url, upload_id);
1056
1057 let response = self.http_client.get(&url).send().await?;
1058
1059 if !response.status().is_success() {
1060 let error_msg = format!(
1061 "Get upload session failed with status: {}",
1062 response.status()
1063 );
1064 return Err(Error::storage(error_msg));
1065 }
1066
1067 let session: UploadSession = response.json().await?;
1068 Ok(session)
1069 }
1070
1071 #[cfg(not(target_arch = "wasm32"))]
1073 pub async fn cancel_upload_session(&self, upload_id: &str) -> Result<()> {
1074 debug!("Cancelling upload session: {}", upload_id);
1075
1076 let url = format!("{}/storage/v1/resumable/{}", self.config.url, upload_id);
1077
1078 let response = self.http_client.delete(&url).send().await?;
1079
1080 if !response.status().is_success() {
1081 let error_msg = format!(
1082 "Cancel upload session failed with status: {}",
1083 response.status()
1084 );
1085 return Err(Error::storage(error_msg));
1086 }
1087
1088 info!("Cancelled upload session: {}", upload_id);
1089 Ok(())
1090 }
1091
1092 pub async fn update_file_metadata(
1121 &self,
1122 bucket_id: &str,
1123 path: &str,
1124 metadata: &FileMetadata,
1125 ) -> Result<()> {
1126 debug!(
1127 "Updating file metadata for bucket: {} path: {}",
1128 bucket_id, path
1129 );
1130
1131 let url = format!(
1132 "{}/storage/v1/object/{}/{}/metadata",
1133 self.config.url, bucket_id, path
1134 );
1135
1136 let response = self.http_client.put(&url).json(metadata).send().await?;
1137
1138 if !response.status().is_success() {
1139 let error_msg = format!(
1140 "Update file metadata failed with status: {}",
1141 response.status()
1142 );
1143 return Err(Error::storage(error_msg));
1144 }
1145
1146 info!("Updated file metadata successfully");
1147 Ok(())
1148 }
1149
1150 pub async fn search_files(
1175 &self,
1176 bucket_id: &str,
1177 search_options: &SearchOptions,
1178 ) -> Result<Vec<FileObject>> {
1179 debug!("Searching files in bucket: {}", bucket_id);
1180
1181 let url = format!("{}/storage/v1/object/{}/search", self.config.url, bucket_id);
1182
1183 let response = self
1184 .http_client
1185 .post(&url)
1186 .json(search_options)
1187 .send()
1188 .await?;
1189
1190 if !response.status().is_success() {
1191 let error_msg = format!("Search files failed with status: {}", response.status());
1192 return Err(Error::storage(error_msg));
1193 }
1194
1195 let files: Vec<FileObject> = response.json().await?;
1196 info!("Found {} files matching search criteria", files.len());
1197
1198 Ok(files)
1199 }
1200
1201 pub async fn create_policy(&self, policy: &StoragePolicy) -> Result<()> {
1225 debug!("Creating storage policy: {}", policy.name);
1226
1227 let url = format!("{}/rest/v1/rpc/create_storage_policy", self.config.url);
1228
1229 let payload = serde_json::json!({
1230 "policy_name": policy.name,
1231 "bucket_name": policy.bucket_id,
1232 "operation": policy.operation,
1233 "definition": policy.definition,
1234 "check_expression": policy.check
1235 });
1236
1237 let response = self
1238 .http_client
1239 .post(&url)
1240 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1241 .header("apikey", self.get_admin_key())
1242 .json(&payload)
1243 .send()
1244 .await?;
1245
1246 if !response.status().is_success() {
1247 let error_msg = format!(
1248 "Create storage policy failed with status: {}",
1249 response.status()
1250 );
1251 return Err(Error::storage(error_msg));
1252 }
1253
1254 info!("Created storage policy: {}", policy.name);
1255 Ok(())
1256 }
1257
1258 pub async fn update_policy(&self, policy: &StoragePolicy) -> Result<()> {
1278 debug!("Updating storage policy: {}", policy.name);
1279
1280 let url = format!("{}/rest/v1/rpc/update_storage_policy", self.config.url);
1281
1282 let payload = serde_json::json!({
1283 "policy_name": policy.name,
1284 "bucket_name": policy.bucket_id,
1285 "operation": policy.operation,
1286 "definition": policy.definition,
1287 "check_expression": policy.check
1288 });
1289
1290 let response = self
1291 .http_client
1292 .put(&url)
1293 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1294 .header("apikey", self.get_admin_key())
1295 .json(&payload)
1296 .send()
1297 .await?;
1298
1299 if !response.status().is_success() {
1300 let error_msg = format!(
1301 "Update storage policy failed with status: {}",
1302 response.status()
1303 );
1304 return Err(Error::storage(error_msg));
1305 }
1306
1307 info!("Updated storage policy: {}", policy.name);
1308 Ok(())
1309 }
1310
1311 pub async fn delete_policy(&self, bucket_id: &str, policy_name: &str) -> Result<()> {
1321 debug!(
1322 "Deleting storage policy: {} from bucket: {}",
1323 policy_name, bucket_id
1324 );
1325
1326 let url = format!("{}/rest/v1/rpc/delete_storage_policy", self.config.url);
1327
1328 let payload = serde_json::json!({
1329 "policy_name": policy_name,
1330 "bucket_name": bucket_id
1331 });
1332
1333 let response = self
1334 .http_client
1335 .delete(&url)
1336 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1337 .header("apikey", self.get_admin_key())
1338 .json(&payload)
1339 .send()
1340 .await?;
1341
1342 if !response.status().is_success() {
1343 let error_msg = format!(
1344 "Delete storage policy failed with status: {}",
1345 response.status()
1346 );
1347 return Err(Error::storage(error_msg));
1348 }
1349
1350 info!("Deleted storage policy: {}", policy_name);
1351 Ok(())
1352 }
1353
1354 pub async fn list_policies(&self, bucket_id: &str) -> Result<Vec<StoragePolicy>> {
1365 debug!("Listing storage policies for bucket: {}", bucket_id);
1366
1367 let url = format!("{}/rest/v1/rpc/list_storage_policies", self.config.url);
1368
1369 let payload = serde_json::json!({
1370 "bucket_name": bucket_id
1371 });
1372
1373 let response = self
1374 .http_client
1375 .post(&url)
1376 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1377 .header("apikey", self.get_admin_key())
1378 .json(&payload)
1379 .send()
1380 .await?;
1381
1382 if !response.status().is_success() {
1383 let error_msg = format!(
1384 "List storage policies failed with status: {}",
1385 response.status()
1386 );
1387 return Err(Error::storage(error_msg));
1388 }
1389
1390 let policies: Vec<StoragePolicy> = response.json().await?;
1391 info!(
1392 "Listed {} storage policies for bucket: {}",
1393 policies.len(),
1394 bucket_id
1395 );
1396
1397 Ok(policies)
1398 }
1399
1400 pub async fn test_policy_access(
1423 &self,
1424 bucket_id: &str,
1425 object_path: &str,
1426 operation: PolicyOperation,
1427 user_id: &str,
1428 ) -> Result<bool> {
1429 debug!(
1430 "Testing policy access for user: {} on object: {}",
1431 user_id, object_path
1432 );
1433
1434 let url = format!("{}/rest/v1/rpc/test_storage_policy_access", self.config.url);
1435
1436 let payload = serde_json::json!({
1437 "bucket_name": bucket_id,
1438 "object_name": object_path,
1439 "operation": operation,
1440 "user_id": user_id
1441 });
1442
1443 let response = self
1444 .http_client
1445 .post(&url)
1446 .header("Authorization", format!("Bearer {}", self.get_admin_key()))
1447 .header("apikey", self.get_admin_key())
1448 .json(&payload)
1449 .send()
1450 .await?;
1451
1452 if !response.status().is_success() {
1453 let error_msg = format!(
1454 "Test policy access failed with status: {}",
1455 response.status()
1456 );
1457 return Err(Error::storage(error_msg));
1458 }
1459
1460 let result: serde_json::Value = response.json().await?;
1461 let can_access = result["can_access"].as_bool().unwrap_or(false);
1462
1463 info!("Policy access test result: {}", can_access);
1464 Ok(can_access)
1465 }
1466
1467 pub fn generate_policy_template(
1485 &self,
1486 bucket_id: &str,
1487 policy_name: &str,
1488 template: PolicyTemplate,
1489 ) -> StoragePolicy {
1490 let (operation, definition, check) = match template {
1491 PolicyTemplate::PublicRead => (PolicyOperation::Select, "true".to_string(), None),
1492 PolicyTemplate::AuthenticatedRead => (
1493 PolicyOperation::Select,
1494 "auth.uid() IS NOT NULL".to_string(),
1495 None,
1496 ),
1497 PolicyTemplate::UserFolderAccess => (
1498 PolicyOperation::All,
1499 "auth.uid()::text = (storage.foldername(name))[1]".to_string(),
1500 Some("auth.uid() IS NOT NULL".to_string()),
1501 ),
1502 PolicyTemplate::AdminFullAccess => (
1503 PolicyOperation::All,
1504 "auth.role() = 'admin'".to_string(),
1505 None,
1506 ),
1507 PolicyTemplate::ReadOnlyForRole(role) => (
1508 PolicyOperation::Select,
1509 format!("auth.role() = '{}'", role),
1510 None,
1511 ),
1512 };
1513
1514 StoragePolicy {
1515 name: policy_name.to_string(),
1516 bucket_id: bucket_id.to_string(),
1517 operation,
1518 definition,
1519 check,
1520 }
1521 }
1522}
1523
1524#[derive(Debug, Clone, Serialize, Deserialize)]
1526pub struct StoragePolicy {
1527 pub name: String,
1528 pub bucket_id: String,
1529 pub operation: PolicyOperation,
1530 pub definition: String,
1531 pub check: Option<String>,
1532}
1533
1534#[derive(Debug, Clone, Serialize, Deserialize)]
1536#[serde(rename_all = "UPPERCASE")]
1537pub enum PolicyOperation {
1538 Select,
1539 Insert,
1540 Update,
1541 Delete,
1542 All,
1543}
1544
1545#[derive(Debug, Clone)]
1547pub enum PolicyTemplate {
1548 PublicRead,
1550 AuthenticatedRead,
1552 UserFolderAccess,
1554 AdminFullAccess,
1556 ReadOnlyForRole(String),
1558}