Skip to main content

raps_oss/
objects.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Object operations for the OSS API.
5
6use anyhow::{Context, Result};
7use futures_util::StreamExt;
8use std::path::{Path, PathBuf};
9use tokio::fs::File;
10use tokio::io::{AsyncSeekExt, AsyncWriteExt};
11
12use raps_kernel::progress;
13
14use crate::OssClient;
15use crate::types::*;
16
17impl OssClient {
18    /// Upload a file to a bucket using S3 signed URLs
19    /// Automatically uses multipart upload for files larger than 5MB
20    pub async fn upload_object(
21        &self,
22        bucket_key: &str,
23        object_key: &str,
24        file_path: &Path,
25    ) -> Result<ObjectInfo> {
26        self.upload_object_with_options(bucket_key, object_key, file_path, false)
27            .await
28    }
29
30    /// Upload a file with resume option
31    /// If resume is true, will attempt to resume an interrupted upload
32    pub async fn upload_object_with_options(
33        &self,
34        bucket_key: &str,
35        object_key: &str,
36        file_path: &Path,
37        resume: bool,
38    ) -> Result<ObjectInfo> {
39        let metadata = tokio::fs::metadata(file_path)
40            .await
41            .context("Failed to get file metadata")?;
42        let file_size = metadata.len();
43
44        // Use multipart upload for files larger than threshold
45        if file_size > MultipartUploadState::MULTIPART_THRESHOLD {
46            self.upload_multipart(bucket_key, object_key, file_path, resume)
47                .await
48        } else {
49            self.upload_single_part(bucket_key, object_key, file_path)
50                .await
51        }
52    }
53
54    /// Upload a small file using single-part upload
55    pub(crate) async fn upload_single_part(
56        &self,
57        bucket_key: &str,
58        object_key: &str,
59        file_path: &Path,
60    ) -> Result<ObjectInfo> {
61        // Read file
62        let mut file = File::open(file_path)
63            .await
64            .context("Failed to open file for upload")?;
65
66        let metadata = file
67            .metadata()
68            .await
69            .context("Failed to get file metadata")?;
70        let file_size = metadata.len();
71
72        // Create progress bar (hidden in non-interactive mode)
73        let pb = progress::file_progress(file_size, &format!("Uploading {}", object_key));
74
75        // Step 1: Get signed S3 upload URL
76        pb.set_message(format!("Getting upload URL for {}", object_key));
77        let signed = self
78            .get_signed_upload_url(bucket_key, object_key, None, None)
79            .await?;
80
81        if signed.urls.is_empty() {
82            anyhow::bail!("No upload URLs returned from signed upload request");
83        }
84
85        // Step 2: Stream upload directly to S3 instead of loading into memory
86        pb.set_message(format!("Uploading {} to S3", object_key));
87        let s3_url = &signed.urls[0];
88
89        // Create a streaming body that reads the file in chunks
90        use futures_util::stream::TryStreamExt;
91        use tokio_util::codec::{BytesCodec, FramedRead};
92
93        // Reset file position to start
94        file.seek(std::io::SeekFrom::Start(0)).await?;
95
96        // Create a stream that reads the file in chunks
97        let file_stream = FramedRead::new(file, BytesCodec::new())
98            .map_ok(|bytes| bytes.freeze())
99            .map_err(std::io::Error::other);
100
101        let body = reqwest::Body::wrap_stream(file_stream);
102
103        let _upload_start = std::time::Instant::now();
104        let response = self
105            .http_client
106            .put(s3_url)
107            .header("Content-Type", "application/octet-stream")
108            .header("Content-Length", file_size.to_string())
109            .body(body)
110            .send()
111            .await
112            .context("Failed to upload to S3")?;
113        raps_kernel::profiler::record_http_request(_upload_start.elapsed());
114
115        if !response.status().is_success() {
116            let status = response.status();
117            let error_text = response.text().await.unwrap_or_default();
118            anyhow::bail!("Failed to upload to S3 ({status}): {error_text}");
119        }
120
121        pb.set_position(file_size);
122
123        // Step 3: Complete the upload
124        pb.set_message(format!("Completing upload for {}", object_key));
125        let object_info = self
126            .complete_signed_upload(bucket_key, object_key, &signed.upload_key)
127            .await?;
128
129        pb.finish_with_message(format!("Uploaded {}", object_key));
130
131        Ok(object_info)
132    }
133
134    /// Download an object from a bucket using S3 signed URLs (new API)
135    pub async fn download_object(
136        &self,
137        bucket_key: &str,
138        object_key: &str,
139        output_path: &Path,
140    ) -> Result<()> {
141        // Step 1: Get signed S3 download URL
142        let signed = self
143            .get_signed_download_url(bucket_key, object_key, None)
144            .await?;
145
146        let download_url = signed
147            .url
148            .ok_or_else(|| anyhow::anyhow!("No download URL returned"))?;
149
150        // Step 2: Download from S3 with retry logic
151        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
152            self.http_client.get(&download_url)
153        })
154        .await?;
155
156        if !response.status().is_success() {
157            let status = response.status();
158            let error_text = response.text().await.unwrap_or_default();
159            anyhow::bail!("Failed to download from S3 ({status}): {error_text}");
160        }
161
162        let total_size = signed
163            .size
164            .unwrap_or(response.content_length().unwrap_or(0));
165
166        // Create progress bar (hidden in non-interactive mode)
167        let pb = progress::file_progress(total_size, &format!("Downloading {}", object_key));
168
169        // Validate output path stays within current directory
170        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
171        if let (Ok(canon_cwd), Ok(canon_target)) = (cwd.canonicalize(), output_path.canonicalize())
172            && !canon_target.starts_with(&canon_cwd)
173        {
174            anyhow::bail!(
175                "Path '{}' escapes working directory '{}'",
176                output_path.display(),
177                cwd.display()
178            );
179        }
180
181        // Stream download
182        let mut file = File::create(output_path)
183            .await
184            .context("Failed to create output file")?;
185
186        let mut stream = response.bytes_stream();
187        let mut downloaded: u64 = 0;
188
189        while let Some(chunk) = stream.next().await {
190            let chunk = chunk.context("Error while downloading")?;
191            file.write_all(&chunk)
192                .await
193                .context("Failed to write to file")?;
194            downloaded += chunk.len() as u64;
195            pb.set_position(downloaded);
196        }
197
198        pb.finish_with_message(format!("Downloaded {}", object_key));
199        Ok(())
200    }
201
202    /// Download an object and stream it to any async writer (stdout, file, etc.)
203    pub async fn download_object_to_writer(
204        &self,
205        bucket_key: &str,
206        object_key: &str,
207        writer: &mut (impl tokio::io::AsyncWrite + Unpin),
208    ) -> Result<()> {
209        let signed = self
210            .get_signed_download_url(bucket_key, object_key, None)
211            .await?;
212
213        let download_url = signed
214            .url
215            .ok_or_else(|| anyhow::anyhow!("No download URL returned"))?;
216
217        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
218            self.http_client.get(&download_url)
219        })
220        .await?;
221
222        if !response.status().is_success() {
223            let status = response.status();
224            let error_text = response.text().await.unwrap_or_default();
225            anyhow::bail!("Failed to download from S3 ({status}): {error_text}");
226        }
227
228        let total_size = signed
229            .size
230            .unwrap_or(response.content_length().unwrap_or(0));
231
232        let pb = progress::file_progress(total_size, &format!("Downloading {}", object_key));
233
234        let mut stream = response.bytes_stream();
235        let mut downloaded: u64 = 0;
236
237        while let Some(chunk) = stream.next().await {
238            let chunk = chunk.context("Error while downloading")?;
239            writer
240                .write_all(&chunk)
241                .await
242                .context("Failed to write output")?;
243            downloaded += chunk.len() as u64;
244            pb.set_position(downloaded);
245        }
246
247        writer.flush().await?;
248        pb.finish_with_message(format!("Downloaded {}", object_key));
249        Ok(())
250    }
251
252    /// List objects in a bucket
253    pub async fn list_objects(&self, bucket_key: &str) -> Result<Vec<ObjectItem>> {
254        const MAX_PAGES: usize = 100;
255        let token = self.auth.get_token().await?;
256        let mut all_objects = Vec::new();
257        let mut start_at: Option<String> = None;
258        let mut page = 0;
259
260        loop {
261            page += 1;
262            if page > MAX_PAGES {
263                tracing::warn!(
264                    pages = MAX_PAGES,
265                    objects = all_objects.len(),
266                    "Reached maximum page limit for object listing"
267                );
268                break;
269            }
270            let mut url = format!("{}/buckets/{}/objects", self.config.oss_url(), bucket_key);
271            if let Some(ref start) = start_at {
272                url = format!("{}?startAt={}", url, start);
273            }
274
275            let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
276                self.http_client.get(&url).bearer_auth(&token)
277            })
278            .await?;
279
280            if !response.status().is_success() {
281                let status = response.status();
282                let error_text = response.text().await.unwrap_or_default();
283                anyhow::bail!("Failed to list objects ({status}): {error_text}");
284            }
285
286            let response_text = response
287                .text()
288                .await
289                .context("Failed to read objects response")?;
290
291            let objects_response: ObjectsResponse = serde_json::from_str(&response_text)
292                .with_context(|| format!("Failed to parse objects response: {}", response_text))?;
293
294            all_objects.extend(objects_response.items);
295
296            if objects_response.next.is_none() {
297                break;
298            }
299            start_at = objects_response.next;
300        }
301
302        Ok(all_objects)
303    }
304
305    /// Delete an object from a bucket
306    pub async fn delete_object(&self, bucket_key: &str, object_key: &str) -> Result<()> {
307        let token = self.auth.get_token().await?;
308        let url = format!(
309            "{}/buckets/{}/objects/{}",
310            self.config.oss_url(),
311            bucket_key,
312            object_key
313        );
314
315        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
316            self.http_client.delete(&url).bearer_auth(&token)
317        })
318        .await?;
319
320        if !response.status().is_success() {
321            let status = response.status();
322            let error_text = response.text().await.unwrap_or_default();
323            anyhow::bail!("Failed to delete object ({status}): {error_text}");
324        }
325
326        Ok(())
327    }
328
329    /// Get detailed metadata for an object without downloading it
330    ///
331    /// Returns extended information including size, SHA1 hash, content type,
332    /// and timestamps.
333    pub async fn get_object_details(
334        &self,
335        bucket_key: &str,
336        object_key: &str,
337    ) -> Result<ObjectDetails> {
338        let token = self.auth.get_token().await?;
339        let url = format!(
340            "{}/buckets/{}/objects/{}/details",
341            self.config.oss_url(),
342            bucket_key,
343            urlencoding::encode(object_key)
344        );
345
346        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
347            self.http_client.get(&url).bearer_auth(&token)
348        })
349        .await?;
350
351        if !response.status().is_success() {
352            let status = response.status();
353            let error_text = response.text().await.unwrap_or_default();
354            anyhow::bail!("Failed to get object details ({status}): {error_text}");
355        }
356
357        let details: ObjectDetails = response
358            .json()
359            .await
360            .context("Failed to parse object details response")?;
361
362        Ok(details)
363    }
364
365    /// Get a signed S3 URL for direct download (bypasses OSS servers)
366    ///
367    /// The signed URL expires in 2 minutes by default.
368    pub async fn get_signed_download_url(
369        &self,
370        bucket_key: &str,
371        object_key: &str,
372        minutes_expiration: Option<u32>,
373    ) -> Result<SignedS3DownloadResponse> {
374        let token = self.auth.get_token().await?;
375        let mut url = format!(
376            "{}/buckets/{}/objects/{}/signeds3download",
377            self.config.oss_url(),
378            bucket_key,
379            urlencoding::encode(object_key)
380        );
381
382        if let Some(mins) = minutes_expiration {
383            url = format!("{}?minutesExpiration={}", url, mins);
384        }
385
386        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
387            self.http_client.get(&url).bearer_auth(&token)
388        })
389        .await?;
390
391        if !response.status().is_success() {
392            let status = response.status();
393            let error_text = response.text().await.unwrap_or_default();
394            anyhow::bail!(
395                "Failed to get signed download URL ({}): {}",
396                status,
397                error_text
398            );
399        }
400
401        let signed: SignedS3DownloadResponse = response
402            .json()
403            .await
404            .context("Failed to parse signed URL response")?;
405
406        Ok(signed)
407    }
408
409    /// Get a signed S3 URL for direct upload (bypasses OSS servers)
410    ///
411    /// The signed URL expires in 2 minutes by default.
412    /// Returns an upload key that must be used to complete the upload.
413    pub async fn get_signed_upload_url(
414        &self,
415        bucket_key: &str,
416        object_key: &str,
417        parts: Option<u32>,
418        minutes_expiration: Option<u32>,
419    ) -> Result<SignedS3UploadResponse> {
420        let token = self.auth.get_token().await?;
421        let mut url = format!(
422            "{}/buckets/{}/objects/{}/signeds3upload",
423            self.config.oss_url(),
424            bucket_key,
425            urlencoding::encode(object_key)
426        );
427
428        let mut params = Vec::new();
429        if let Some(p) = parts {
430            params.push(format!("parts={}", p));
431        }
432        if let Some(mins) = minutes_expiration {
433            params.push(format!("minutesExpiration={}", mins));
434        }
435        if !params.is_empty() {
436            url = format!("{}?{}", url, params.join("&"));
437        }
438
439        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
440            self.http_client.get(&url).bearer_auth(&token)
441        })
442        .await?;
443
444        if !response.status().is_success() {
445            let status = response.status();
446            let error_text = response.text().await.unwrap_or_default();
447            anyhow::bail!(
448                "Failed to get signed upload URL ({}): {}",
449                status,
450                error_text
451            );
452        }
453
454        let signed: SignedS3UploadResponse = response
455            .json()
456            .await
457            .context("Failed to parse signed URL response")?;
458
459        Ok(signed)
460    }
461
462    /// Complete an S3 signed upload
463    pub async fn complete_signed_upload(
464        &self,
465        bucket_key: &str,
466        object_key: &str,
467        upload_key: &str,
468    ) -> Result<ObjectInfo> {
469        let token = self.auth.get_token().await?;
470        let url = format!(
471            "{}/buckets/{}/objects/{}/signeds3upload",
472            self.config.oss_url(),
473            bucket_key,
474            urlencoding::encode(object_key)
475        );
476
477        let body = serde_json::json!({
478            "uploadKey": upload_key
479        });
480
481        let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
482            self.http_client
483                .post(&url)
484                .bearer_auth(&token)
485                .header("Content-Type", "application/json")
486                .json(&body)
487        })
488        .await?;
489
490        if !response.status().is_success() {
491            let status = response.status();
492            let error_text = response.text().await.unwrap_or_default();
493            anyhow::bail!(
494                "Failed to complete signed upload ({}): {}",
495                status,
496                error_text
497            );
498        }
499
500        // Get response text for debugging
501        let response_text = response
502            .text()
503            .await
504            .context("Failed to read upload completion response")?;
505
506        // Try to parse as ObjectInfo
507        let object_info: ObjectInfo = serde_json::from_str(&response_text).with_context(|| {
508            format!(
509                "Failed to parse upload completion response: {}",
510                response_text
511            )
512        })?;
513
514        Ok(object_info)
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use raps_kernel::auth::AuthClient;
522    use raps_kernel::config::Config;
523    use raps_kernel::http::HttpClientConfig;
524
525    fn create_test_oss_client() -> OssClient {
526        let config = Config {
527            client_id: "test".to_string(),
528            client_secret: "secret".to_string(),
529            base_url: "https://developer.api.autodesk.com".to_string(),
530            callback_url: "http://localhost:8080/callback".to_string(),
531            da_nickname: None,
532            http_config: HttpClientConfig::default(),
533        };
534        let auth = AuthClient::new(config.clone());
535        OssClient::new(config, auth)
536    }
537
538    #[test]
539    fn test_get_urn() {
540        let client = create_test_oss_client();
541        let urn = client.get_urn("my-bucket", "my-object.dwg");
542
543        assert!(!urn.contains("urn:adsk.objects:os.object:"));
544        assert!(!urn.contains("my-bucket"));
545        assert!(!urn.contains("my-object.dwg"));
546        assert!(!urn.contains("+"));
547        assert!(!urn.contains("/"));
548        assert!(!urn.contains("="));
549    }
550
551    #[test]
552    fn test_oss_client_url_generation() {
553        let client = create_test_oss_client();
554        let urn = client.get_urn("bucket", "object.dwg");
555        // URN should be base64 encoded
556        assert!(
557            urn.chars()
558                .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
559        );
560    }
561
562    #[test]
563    fn test_get_urn_special_characters() {
564        let client = create_test_oss_client();
565        let urn = client.get_urn("bucket-with-dash", "object with spaces.dwg");
566        // URN should handle special characters
567        assert!(!urn.is_empty());
568        assert!(
569            urn.chars()
570                .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
571        );
572    }
573
574    #[test]
575    fn test_get_urn_unicode() {
576        let client = create_test_oss_client();
577        let urn = client.get_urn("test-bucket", "\u{0444}\u{0430}\u{0439}\u{043b}.dwg"); // Cyrillic filename
578        assert!(!urn.is_empty());
579    }
580}