1use anyhow::{Context, Result};
7use futures_util::StreamExt;
8use std::path::{Path, PathBuf};
9use tokio::fs::File;
10use tokio::io::{AsyncSeekExt, AsyncWriteExt};
11
12use raps_kernel::progress;
13
14use crate::OssClient;
15use crate::types::*;
16
17impl OssClient {
18 pub async fn upload_object(
21 &self,
22 bucket_key: &str,
23 object_key: &str,
24 file_path: &Path,
25 ) -> Result<ObjectInfo> {
26 self.upload_object_with_options(bucket_key, object_key, file_path, false)
27 .await
28 }
29
30 pub async fn upload_object_with_options(
33 &self,
34 bucket_key: &str,
35 object_key: &str,
36 file_path: &Path,
37 resume: bool,
38 ) -> Result<ObjectInfo> {
39 let metadata = tokio::fs::metadata(file_path)
40 .await
41 .context("Failed to get file metadata")?;
42 let file_size = metadata.len();
43
44 if file_size > MultipartUploadState::MULTIPART_THRESHOLD {
46 self.upload_multipart(bucket_key, object_key, file_path, resume)
47 .await
48 } else {
49 self.upload_single_part(bucket_key, object_key, file_path)
50 .await
51 }
52 }
53
54 pub(crate) async fn upload_single_part(
56 &self,
57 bucket_key: &str,
58 object_key: &str,
59 file_path: &Path,
60 ) -> Result<ObjectInfo> {
61 let mut file = File::open(file_path)
63 .await
64 .context("Failed to open file for upload")?;
65
66 let metadata = file
67 .metadata()
68 .await
69 .context("Failed to get file metadata")?;
70 let file_size = metadata.len();
71
72 let pb = progress::file_progress(file_size, &format!("Uploading {}", object_key));
74
75 pb.set_message(format!("Getting upload URL for {}", object_key));
77 let signed = self
78 .get_signed_upload_url(bucket_key, object_key, None, None)
79 .await?;
80
81 if signed.urls.is_empty() {
82 anyhow::bail!("No upload URLs returned from signed upload request");
83 }
84
85 pb.set_message(format!("Uploading {} to S3", object_key));
87 let s3_url = &signed.urls[0];
88
89 use futures_util::stream::TryStreamExt;
91 use tokio_util::codec::{BytesCodec, FramedRead};
92
93 file.seek(std::io::SeekFrom::Start(0)).await?;
95
96 let file_stream = FramedRead::new(file, BytesCodec::new())
98 .map_ok(|bytes| bytes.freeze())
99 .map_err(std::io::Error::other);
100
101 let body = reqwest::Body::wrap_stream(file_stream);
102
103 let _upload_start = std::time::Instant::now();
104 let response = self
105 .http_client
106 .put(s3_url)
107 .header("Content-Type", "application/octet-stream")
108 .header("Content-Length", file_size.to_string())
109 .body(body)
110 .send()
111 .await
112 .context("Failed to upload to S3")?;
113 raps_kernel::profiler::record_http_request(_upload_start.elapsed());
114
115 if !response.status().is_success() {
116 let status = response.status();
117 let error_text = response.text().await.unwrap_or_default();
118 anyhow::bail!("Failed to upload to S3 ({status}): {error_text}");
119 }
120
121 pb.set_position(file_size);
122
123 pb.set_message(format!("Completing upload for {}", object_key));
125 let object_info = self
126 .complete_signed_upload(bucket_key, object_key, &signed.upload_key)
127 .await?;
128
129 pb.finish_with_message(format!("Uploaded {}", object_key));
130
131 Ok(object_info)
132 }
133
134 pub async fn download_object(
136 &self,
137 bucket_key: &str,
138 object_key: &str,
139 output_path: &Path,
140 ) -> Result<()> {
141 let signed = self
143 .get_signed_download_url(bucket_key, object_key, None)
144 .await?;
145
146 let download_url = signed
147 .url
148 .ok_or_else(|| anyhow::anyhow!("No download URL returned"))?;
149
150 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
152 self.http_client.get(&download_url)
153 })
154 .await?;
155
156 if !response.status().is_success() {
157 let status = response.status();
158 let error_text = response.text().await.unwrap_or_default();
159 anyhow::bail!("Failed to download from S3 ({status}): {error_text}");
160 }
161
162 let total_size = signed
163 .size
164 .unwrap_or(response.content_length().unwrap_or(0));
165
166 let pb = progress::file_progress(total_size, &format!("Downloading {}", object_key));
168
169 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
171 if let (Ok(canon_cwd), Ok(canon_target)) = (cwd.canonicalize(), output_path.canonicalize())
172 && !canon_target.starts_with(&canon_cwd)
173 {
174 anyhow::bail!(
175 "Path '{}' escapes working directory '{}'",
176 output_path.display(),
177 cwd.display()
178 );
179 }
180
181 let mut file = File::create(output_path)
183 .await
184 .context("Failed to create output file")?;
185
186 let mut stream = response.bytes_stream();
187 let mut downloaded: u64 = 0;
188
189 while let Some(chunk) = stream.next().await {
190 let chunk = chunk.context("Error while downloading")?;
191 file.write_all(&chunk)
192 .await
193 .context("Failed to write to file")?;
194 downloaded += chunk.len() as u64;
195 pb.set_position(downloaded);
196 }
197
198 pb.finish_with_message(format!("Downloaded {}", object_key));
199 Ok(())
200 }
201
202 pub async fn download_object_to_writer(
204 &self,
205 bucket_key: &str,
206 object_key: &str,
207 writer: &mut (impl tokio::io::AsyncWrite + Unpin),
208 ) -> Result<()> {
209 let signed = self
210 .get_signed_download_url(bucket_key, object_key, None)
211 .await?;
212
213 let download_url = signed
214 .url
215 .ok_or_else(|| anyhow::anyhow!("No download URL returned"))?;
216
217 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
218 self.http_client.get(&download_url)
219 })
220 .await?;
221
222 if !response.status().is_success() {
223 let status = response.status();
224 let error_text = response.text().await.unwrap_or_default();
225 anyhow::bail!("Failed to download from S3 ({status}): {error_text}");
226 }
227
228 let total_size = signed
229 .size
230 .unwrap_or(response.content_length().unwrap_or(0));
231
232 let pb = progress::file_progress(total_size, &format!("Downloading {}", object_key));
233
234 let mut stream = response.bytes_stream();
235 let mut downloaded: u64 = 0;
236
237 while let Some(chunk) = stream.next().await {
238 let chunk = chunk.context("Error while downloading")?;
239 writer
240 .write_all(&chunk)
241 .await
242 .context("Failed to write output")?;
243 downloaded += chunk.len() as u64;
244 pb.set_position(downloaded);
245 }
246
247 writer.flush().await?;
248 pb.finish_with_message(format!("Downloaded {}", object_key));
249 Ok(())
250 }
251
252 pub async fn list_objects(&self, bucket_key: &str) -> Result<Vec<ObjectItem>> {
254 const MAX_PAGES: usize = 100;
255 let token = self.auth.get_token().await?;
256 let mut all_objects = Vec::new();
257 let mut start_at: Option<String> = None;
258 let mut page = 0;
259
260 loop {
261 page += 1;
262 if page > MAX_PAGES {
263 tracing::warn!(
264 pages = MAX_PAGES,
265 objects = all_objects.len(),
266 "Reached maximum page limit for object listing"
267 );
268 break;
269 }
270 let mut url = format!("{}/buckets/{}/objects", self.config.oss_url(), bucket_key);
271 if let Some(ref start) = start_at {
272 url = format!("{}?startAt={}", url, start);
273 }
274
275 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
276 self.http_client.get(&url).bearer_auth(&token)
277 })
278 .await?;
279
280 if !response.status().is_success() {
281 let status = response.status();
282 let error_text = response.text().await.unwrap_or_default();
283 anyhow::bail!("Failed to list objects ({status}): {error_text}");
284 }
285
286 let response_text = response
287 .text()
288 .await
289 .context("Failed to read objects response")?;
290
291 let objects_response: ObjectsResponse = serde_json::from_str(&response_text)
292 .with_context(|| format!("Failed to parse objects response: {}", response_text))?;
293
294 all_objects.extend(objects_response.items);
295
296 if objects_response.next.is_none() {
297 break;
298 }
299 start_at = objects_response.next;
300 }
301
302 Ok(all_objects)
303 }
304
305 pub async fn delete_object(&self, bucket_key: &str, object_key: &str) -> Result<()> {
307 let token = self.auth.get_token().await?;
308 let url = format!(
309 "{}/buckets/{}/objects/{}",
310 self.config.oss_url(),
311 bucket_key,
312 object_key
313 );
314
315 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
316 self.http_client.delete(&url).bearer_auth(&token)
317 })
318 .await?;
319
320 if !response.status().is_success() {
321 let status = response.status();
322 let error_text = response.text().await.unwrap_or_default();
323 anyhow::bail!("Failed to delete object ({status}): {error_text}");
324 }
325
326 Ok(())
327 }
328
329 pub async fn get_object_details(
334 &self,
335 bucket_key: &str,
336 object_key: &str,
337 ) -> Result<ObjectDetails> {
338 let token = self.auth.get_token().await?;
339 let url = format!(
340 "{}/buckets/{}/objects/{}/details",
341 self.config.oss_url(),
342 bucket_key,
343 urlencoding::encode(object_key)
344 );
345
346 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
347 self.http_client.get(&url).bearer_auth(&token)
348 })
349 .await?;
350
351 if !response.status().is_success() {
352 let status = response.status();
353 let error_text = response.text().await.unwrap_or_default();
354 anyhow::bail!("Failed to get object details ({status}): {error_text}");
355 }
356
357 let details: ObjectDetails = response
358 .json()
359 .await
360 .context("Failed to parse object details response")?;
361
362 Ok(details)
363 }
364
365 pub async fn get_signed_download_url(
369 &self,
370 bucket_key: &str,
371 object_key: &str,
372 minutes_expiration: Option<u32>,
373 ) -> Result<SignedS3DownloadResponse> {
374 let token = self.auth.get_token().await?;
375 let mut url = format!(
376 "{}/buckets/{}/objects/{}/signeds3download",
377 self.config.oss_url(),
378 bucket_key,
379 urlencoding::encode(object_key)
380 );
381
382 if let Some(mins) = minutes_expiration {
383 url = format!("{}?minutesExpiration={}", url, mins);
384 }
385
386 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
387 self.http_client.get(&url).bearer_auth(&token)
388 })
389 .await?;
390
391 if !response.status().is_success() {
392 let status = response.status();
393 let error_text = response.text().await.unwrap_or_default();
394 anyhow::bail!(
395 "Failed to get signed download URL ({}): {}",
396 status,
397 error_text
398 );
399 }
400
401 let signed: SignedS3DownloadResponse = response
402 .json()
403 .await
404 .context("Failed to parse signed URL response")?;
405
406 Ok(signed)
407 }
408
409 pub async fn get_signed_upload_url(
414 &self,
415 bucket_key: &str,
416 object_key: &str,
417 parts: Option<u32>,
418 minutes_expiration: Option<u32>,
419 ) -> Result<SignedS3UploadResponse> {
420 let token = self.auth.get_token().await?;
421 let mut url = format!(
422 "{}/buckets/{}/objects/{}/signeds3upload",
423 self.config.oss_url(),
424 bucket_key,
425 urlencoding::encode(object_key)
426 );
427
428 let mut params = Vec::new();
429 if let Some(p) = parts {
430 params.push(format!("parts={}", p));
431 }
432 if let Some(mins) = minutes_expiration {
433 params.push(format!("minutesExpiration={}", mins));
434 }
435 if !params.is_empty() {
436 url = format!("{}?{}", url, params.join("&"));
437 }
438
439 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
440 self.http_client.get(&url).bearer_auth(&token)
441 })
442 .await?;
443
444 if !response.status().is_success() {
445 let status = response.status();
446 let error_text = response.text().await.unwrap_or_default();
447 anyhow::bail!(
448 "Failed to get signed upload URL ({}): {}",
449 status,
450 error_text
451 );
452 }
453
454 let signed: SignedS3UploadResponse = response
455 .json()
456 .await
457 .context("Failed to parse signed URL response")?;
458
459 Ok(signed)
460 }
461
462 pub async fn complete_signed_upload(
464 &self,
465 bucket_key: &str,
466 object_key: &str,
467 upload_key: &str,
468 ) -> Result<ObjectInfo> {
469 let token = self.auth.get_token().await?;
470 let url = format!(
471 "{}/buckets/{}/objects/{}/signeds3upload",
472 self.config.oss_url(),
473 bucket_key,
474 urlencoding::encode(object_key)
475 );
476
477 let body = serde_json::json!({
478 "uploadKey": upload_key
479 });
480
481 let response = raps_kernel::http::send_with_retry(&self.config.http_config, || {
482 self.http_client
483 .post(&url)
484 .bearer_auth(&token)
485 .header("Content-Type", "application/json")
486 .json(&body)
487 })
488 .await?;
489
490 if !response.status().is_success() {
491 let status = response.status();
492 let error_text = response.text().await.unwrap_or_default();
493 anyhow::bail!(
494 "Failed to complete signed upload ({}): {}",
495 status,
496 error_text
497 );
498 }
499
500 let response_text = response
502 .text()
503 .await
504 .context("Failed to read upload completion response")?;
505
506 let object_info: ObjectInfo = serde_json::from_str(&response_text).with_context(|| {
508 format!(
509 "Failed to parse upload completion response: {}",
510 response_text
511 )
512 })?;
513
514 Ok(object_info)
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use raps_kernel::auth::AuthClient;
522 use raps_kernel::config::Config;
523 use raps_kernel::http::HttpClientConfig;
524
525 fn create_test_oss_client() -> OssClient {
526 let config = Config {
527 client_id: "test".to_string(),
528 client_secret: "secret".to_string(),
529 base_url: "https://developer.api.autodesk.com".to_string(),
530 callback_url: "http://localhost:8080/callback".to_string(),
531 da_nickname: None,
532 http_config: HttpClientConfig::default(),
533 };
534 let auth = AuthClient::new(config.clone());
535 OssClient::new(config, auth)
536 }
537
538 #[test]
539 fn test_get_urn() {
540 let client = create_test_oss_client();
541 let urn = client.get_urn("my-bucket", "my-object.dwg");
542
543 assert!(!urn.contains("urn:adsk.objects:os.object:"));
544 assert!(!urn.contains("my-bucket"));
545 assert!(!urn.contains("my-object.dwg"));
546 assert!(!urn.contains("+"));
547 assert!(!urn.contains("/"));
548 assert!(!urn.contains("="));
549 }
550
551 #[test]
552 fn test_oss_client_url_generation() {
553 let client = create_test_oss_client();
554 let urn = client.get_urn("bucket", "object.dwg");
555 assert!(
557 urn.chars()
558 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
559 );
560 }
561
562 #[test]
563 fn test_get_urn_special_characters() {
564 let client = create_test_oss_client();
565 let urn = client.get_urn("bucket-with-dash", "object with spaces.dwg");
566 assert!(!urn.is_empty());
568 assert!(
569 urn.chars()
570 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
571 );
572 }
573
574 #[test]
575 fn test_get_urn_unicode() {
576 let client = create_test_oss_client();
577 let urn = client.get_urn("test-bucket", "\u{0444}\u{0430}\u{0439}\u{043b}.dwg"); assert!(!urn.is_empty());
579 }
580}