1use crate::{
2 cli::{actions::Action, globals::GlobalArgs, progressbar::Bar},
3 s3::{S3, actions, tools::throttle_download},
4};
5use anyhow::{Context, Result, anyhow};
6use bytes::{Buf, BytesMut};
7use bytesize::ByteSize;
8use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::stream::DecryptorBE32};
9use chrono::{DateTime, Utc};
10use colored::Colorize;
11use http::{HeaderMap, header::CONTENT_TYPE};
12use secrecy::ExposeSecret;
13use serde::Serialize;
14use std::{
15 cmp::min,
16 collections::BTreeMap,
17 ffi::{OsStr, OsString},
18 path::{Path, PathBuf},
19};
20use tokio::{fs::OpenOptions, io::AsyncWriteExt};
21
22#[derive(Debug, Serialize, PartialEq, Eq)]
23struct MetadataJsonOutput {
24 bucket: Option<String>,
25 key: String,
26 version_id: Option<String>,
27 content_length: Option<u64>,
28 content_type: Option<String>,
29 etag: Option<String>,
30 last_modified: Option<String>,
31 storage_class: Option<String>,
32 checksum_crc32: Option<String>,
33 checksum_crc32c: Option<String>,
34 checksum_sha1: Option<String>,
35 checksum_sha256: Option<String>,
36 metadata: BTreeMap<String, String>,
37 headers: BTreeMap<String, String>,
38}
39
40#[derive(Debug, Serialize, PartialEq, Eq)]
41struct VersionsJsonOutput {
42 bucket: Option<String>,
43 key_prefix: String,
44 versions: Vec<VersionJsonEntry>,
45}
46
47#[derive(Debug, Serialize, PartialEq, Eq)]
48struct VersionJsonEntry {
49 key: String,
50 version_id: String,
51 is_latest: bool,
52 last_modified: String,
53 size_bytes: u64,
54 etag: String,
55 storage_class: String,
56}
57
58struct DownloadState {
59 file: tokio::fs::File,
60 pb: Bar,
61 file_size: u64,
62 downloaded: u64,
63 buffer: BytesMut,
64 decryptor: Option<DecryptorBE32<ChaCha20Poly1305>>,
65 is_encrypted: bool,
66 can_decrypt: bool,
67 cipher: Option<ChaCha20Poly1305>,
68}
69
70enum OutputFormat {
71 Text,
72 Json,
73}
74
75enum GetObjectRequest {
76 Metadata {
77 key: String,
78 version: Option<String>,
79 output: OutputFormat,
80 },
81 Versions {
82 key: String,
83 output: OutputFormat,
84 },
85 Download {
86 key: String,
87 version: Option<String>,
88 dest: Option<String>,
89 quiet: bool,
90 force: bool,
91 },
92}
93
94pub async fn handle(s3: &S3, action: Action, globals: GlobalArgs) -> Result<()> {
97 if let Action::GetObject {
98 key,
99 metadata,
100 dest,
101 quiet,
102 force,
103 json,
104 versions,
105 version,
106 } = action
107 {
108 let output = if json {
109 OutputFormat::Json
110 } else {
111 OutputFormat::Text
112 };
113
114 let request = if metadata {
115 GetObjectRequest::Metadata {
116 key,
117 version,
118 output,
119 }
120 } else if versions {
121 GetObjectRequest::Versions { key, output }
122 } else {
123 GetObjectRequest::Download {
124 key,
125 version,
126 dest,
127 quiet,
128 force,
129 }
130 };
131
132 return handle_get_action(s3, request, globals).await;
133 }
134
135 Ok(())
136}
137
138async fn handle_get_action(s3: &S3, request: GetObjectRequest, globals: GlobalArgs) -> Result<()> {
139 match request {
140 GetObjectRequest::Metadata {
141 key,
142 version,
143 output,
144 } => handle_metadata(s3, &key, version, matches!(output, OutputFormat::Json)).await,
145 GetObjectRequest::Versions { key, output } => {
146 handle_versions(s3, &key, matches!(output, OutputFormat::Json)).await
147 }
148 GetObjectRequest::Download {
149 key,
150 version,
151 dest,
152 quiet,
153 force,
154 } => download_object(s3, key, version, dest, quiet, force, globals).await,
155 }
156}
157
158async fn download_object(
159 s3: &S3,
160 key: String,
161 version: Option<String>,
162 dest: Option<String>,
163 quiet: bool,
164 force: bool,
165 globals: GlobalArgs,
166) -> Result<()> {
167 let file_name = Path::new(&key)
168 .file_name()
169 .with_context(|| format!("Failed to get file name from: {key}"))?;
170 let action = actions::GetObject::new(&key, version);
171 let mut res = action.request(s3, &globals).await?;
172 let is_encrypted = is_s3m_encrypted(res.headers());
173 let can_decrypt = is_encrypted && globals.enc_key.is_some();
174
175 log::info!(
176 "file_name: {}, is_encrypted: {}, can_decrypt: {}",
177 file_name.to_string_lossy(),
178 is_encrypted,
179 can_decrypt
180 );
181
182 let final_file_name = determine_final_filename(file_name, can_decrypt);
183 let path = get_dest(dest, &final_file_name)?;
184 if path.is_file() && !force {
185 return Err(anyhow!("file {} already exists", path.display()));
186 }
187
188 let file = create_output_file(&path, force).await?;
189 let file_size = res
190 .content_length()
191 .context("could not get content_length")?;
192 let mut state = DownloadState::new(
193 file,
194 if quiet {
195 Bar::default()
196 } else {
197 Bar::new(file_size)
198 },
199 file_size,
200 is_encrypted,
201 can_decrypt,
202 create_cipher_if_needed(&globals, can_decrypt)?,
203 );
204
205 download_response(&mut res, &mut state, &globals).await?;
206 state.finish();
207 Ok(())
208}
209
210async fn download_response(
211 res: &mut reqwest::Response,
212 state: &mut DownloadState,
213 globals: &GlobalArgs,
214) -> Result<()> {
215 while let Some(chunk) = res.chunk().await? {
216 let chunk_len = chunk.len();
217 state.process_chunk(chunk).await?;
218
219 if let Some(bandwidth_kb) = globals.throttle {
220 throttle_download(bandwidth_kb, chunk_len).await?;
221 }
222 }
223
224 Ok(())
225}
226
227impl DownloadState {
228 fn new(
229 file: tokio::fs::File,
230 pb: Bar,
231 file_size: u64,
232 is_encrypted: bool,
233 can_decrypt: bool,
234 cipher: Option<ChaCha20Poly1305>,
235 ) -> Self {
236 Self {
237 file,
238 pb,
239 file_size,
240 downloaded: 0,
241 buffer: BytesMut::new(),
242 decryptor: None,
243 is_encrypted,
244 can_decrypt,
245 cipher,
246 }
247 }
248
249 async fn process_chunk(&mut self, chunk: bytes::Bytes) -> Result<()> {
250 self.downloaded = min(self.downloaded + chunk.len() as u64, self.file_size);
251 self.buffer.extend_from_slice(&chunk);
252
253 if self.can_decrypt {
254 return self.process_encrypted_buffer().await;
255 }
256
257 if self.is_encrypted {
258 return self.write_raw_buffer().await;
259 }
260
261 self.write_raw_buffer().await
262 }
263
264 async fn process_encrypted_buffer(&mut self) -> Result<()> {
265 loop {
266 if self.decryptor.is_none() {
267 if self.buffer.len() < 8 {
268 break;
269 }
270
271 let nonce_len =
272 *self.buffer.first().context("Failed to read nonce length")? as usize;
273 if nonce_len != 7 {
274 return Err(anyhow!("Expected nonce length 7, got {nonce_len}"));
275 }
276
277 let nonce = self.buffer.get(1..8).context("Failed to get nonce bytes")?;
278 let cipher = self
279 .cipher
280 .clone()
281 .context("Cipher not initialized for decryption")?;
282 self.decryptor = Some(DecryptorBE32::from_aead(cipher, nonce.into()));
283 self.buffer.advance(8);
284 continue;
285 }
286
287 if self.buffer.len() < 4 {
288 break;
289 }
290
291 let len_bytes = self
292 .buffer
293 .get(..4)
294 .context("Failed to read chunk length")?;
295 let len = u32::from_be_bytes(
296 len_bytes
297 .try_into()
298 .map_err(|_| anyhow!("Invalid chunk length bytes"))?,
299 ) as usize;
300
301 if self.buffer.len() < 4 + len {
302 break;
303 }
304
305 let mut encrypted_chunk = self
306 .buffer
307 .get(4..4 + len)
308 .context("Failed to read encrypted chunk")?
309 .to_vec();
310
311 self.decryptor
312 .as_mut()
313 .context("Decryptor not initialized")?
314 .decrypt_next_in_place(&[], &mut encrypted_chunk)
315 .map_err(|_| anyhow!("Decryption failed, check your encryption key"))?;
316
317 self.file.write_all(&encrypted_chunk).await?;
318 self.update_progress();
319 self.buffer.advance(4 + len);
320 }
321
322 Ok(())
323 }
324
325 async fn write_raw_buffer(&mut self) -> Result<()> {
326 self.file.write_all(&self.buffer).await?;
327 self.buffer.clear();
328 self.update_progress();
329 Ok(())
330 }
331
332 fn update_progress(&self) {
333 if let Some(pb) = self.pb.progress.as_ref() {
334 pb.set_position(self.downloaded);
335 }
336 }
337
338 fn finish(&self) {
339 if let Some(pb) = self.pb.progress.as_ref() {
340 pb.finish();
341 }
342 }
343}
344
345async fn handle_metadata(s3: &S3, key: &str, version: Option<String>, json: bool) -> Result<()> {
346 let action = actions::HeadObject::new(key, version.clone());
347 let headers = action.request(s3).await?;
348 if json {
349 println!(
350 "{}",
351 serde_json::to_string_pretty(&json_metadata_output(s3, key, version, headers)?)?
352 );
353 return Ok(());
354 }
355
356 let max_key_len = headers
357 .keys()
358 .map(std::string::String::len)
359 .max()
360 .unwrap_or(0)
361 + 1;
362
363 for (k, v) in headers {
364 println!(
365 "{:<width$} {}",
366 format!("{k}:").green(),
367 v,
368 width = max_key_len
369 );
370 }
371
372 Ok(())
373}
374
375async fn handle_versions(s3: &S3, key: &str, json: bool) -> Result<()> {
376 let action = actions::ListObjectVersions::new(key);
377 let result = action.request(s3).await?;
378
379 if json {
380 println!(
381 "{}",
382 serde_json::to_string_pretty(&VersionsJsonOutput {
383 bucket: s3.bucket().map(str::to_string),
384 key_prefix: key.to_string(),
385 versions: result
386 .versions
387 .into_iter()
388 .map(|version| VersionJsonEntry {
389 key: version.key,
390 version_id: version.version_id,
391 is_latest: version.is_latest,
392 last_modified: version.last_modified,
393 size_bytes: version.size,
394 etag: version.e_tag,
395 storage_class: version.storage_class,
396 })
397 .collect(),
398 })?
399 );
400 return Ok(());
401 }
402
403 if result.versions.is_empty() {
404 println!("No versions found for key: {key}");
405 return Ok(());
406 }
407
408 for version in result.versions {
409 let dt = DateTime::parse_from_rfc3339(&version.last_modified)?;
410 let last_modified: DateTime<Utc> = DateTime::from(dt);
411 println!(
412 "{} {:>10} {:<} ID: {}",
413 format!("[{}]", last_modified.format("%F %T %Z")).green(),
414 ByteSize(version.size).to_string().yellow(),
415 if version.is_latest {
416 format!("{} (latest)", version.key)
417 } else {
418 version.key.clone()
419 },
420 version.version_id
421 );
422 }
423
424 Ok(())
425}
426
427fn determine_final_filename(file_name: &OsStr, can_decrypt: bool) -> OsString {
428 if can_decrypt {
429 let file_name_str = file_name.to_string_lossy();
430 if let Some(stripped) = file_name_str.strip_suffix(".enc") {
431 OsString::from(stripped)
432 } else {
433 file_name.to_os_string()
434 }
435 } else {
436 file_name.to_os_string()
437 }
438}
439
440async fn create_output_file(path: &Path, force: bool) -> Result<tokio::fs::File> {
441 let mut options = OpenOptions::new();
442 options.write(true).create(true);
443
444 if force {
445 options.truncate(true);
446 }
447
448 options
449 .open(path)
450 .await
451 .with_context(|| format!("could not open {}", path.display()))
452}
453
454fn create_cipher_if_needed(
455 globals: &GlobalArgs,
456 can_decrypt: bool,
457) -> Result<Option<ChaCha20Poly1305>> {
458 if can_decrypt {
459 let key_bytes = globals
460 .enc_key
461 .as_ref()
462 .context("Encryption key is required but not provided")?
463 .expose_secret()
464 .as_bytes()
465 .into();
466 Ok(Some(ChaCha20Poly1305::new(key_bytes)))
467 } else {
468 Ok(None)
469 }
470}
471
472fn get_dest(dest: Option<String>, file_name: &OsStr) -> Result<PathBuf> {
473 if let Some(d) = dest {
474 let mut path_buf = PathBuf::from(&d);
475
476 if path_buf.is_dir() {
478 path_buf.push(file_name);
479 return Ok(path_buf);
480 }
481
482 if let Some(parent) = path_buf.parent() {
484 if parent.exists() {
485 return Ok(path_buf);
486 } else if path_buf.components().count() > 1 {
487 return Err(anyhow!(
488 "parent directory {} does not exist",
489 parent.display()
490 ));
491 }
492 return Ok(Path::new(".").join(path_buf));
493 }
494 }
495
496 Ok(Path::new(".").join(file_name))
498}
499
500pub fn is_s3m_encrypted(headers: &HeaderMap) -> bool {
503 headers
504 .get(CONTENT_TYPE)
505 .and_then(|v| v.to_str().ok())
506 .is_some_and(|ct| ct.starts_with("application/vnd.s3m.encrypted"))
507}
508
509fn json_metadata_output(
510 s3: &S3,
511 key: &str,
512 version: Option<String>,
513 headers: BTreeMap<String, String>,
514) -> Result<MetadataJsonOutput> {
515 let metadata = headers
516 .iter()
517 .filter_map(|(header, value)| {
518 header
519 .strip_prefix("x-amz-meta-")
520 .map(|metadata_key| (metadata_key.to_string(), value.clone()))
521 })
522 .collect();
523
524 Ok(MetadataJsonOutput {
525 bucket: s3.bucket().map(str::to_string),
526 key: key.to_string(),
527 version_id: version,
528 content_length: headers
529 .get("content-length")
530 .and_then(|value| value.parse::<u64>().ok()),
531 content_type: headers.get("content-type").cloned(),
532 etag: headers.get("etag").cloned(),
533 last_modified: headers
534 .get("last-modified")
535 .map(|value| normalize_json_timestamp(value))
536 .transpose()?,
537 storage_class: headers.get("x-amz-storage-class").cloned(),
538 checksum_crc32: headers.get("x-amz-checksum-crc32").cloned(),
539 checksum_crc32c: headers.get("x-amz-checksum-crc32c").cloned(),
540 checksum_sha1: headers.get("x-amz-checksum-sha1").cloned(),
541 checksum_sha256: headers.get("x-amz-checksum-sha256").cloned(),
542 metadata,
543 headers,
544 })
545}
546
547fn normalize_json_timestamp(value: &str) -> Result<String> {
548 DateTime::parse_from_rfc2822(value)
549 .or_else(|_| DateTime::parse_from_rfc3339(value))
550 .map(|timestamp| timestamp.with_timezone(&Utc).to_rfc3339())
551 .map_err(|error| anyhow!("Failed to parse Last-Modified header '{value}': {error}"))
552}
553
554#[cfg(test)]
555#[allow(
556 clippy::unwrap_used,
557 clippy::expect_used,
558 clippy::panic,
559 clippy::indexing_slicing,
560 clippy::unnecessary_wraps
561)]
562mod tests {
563 use super::*;
564 use crate::s3::{Credentials, Region, S3};
565 use anyhow::Result;
566 use mockito::{Matcher, Server};
567 use secrecy::SecretString;
568
569 struct Test {
570 dest: Option<String>,
571 file_name: &'static OsStr,
572 expected: Option<PathBuf>,
573 error_expected: bool,
574 }
575
576 #[tokio::test]
577 async fn test_get_dest() -> Result<()> {
578 let tests = vec![
579 Test {
580 dest: None,
581 file_name: OsStr::new("key.json"),
582 expected: Some(Path::new(".").join("key.json")),
583 error_expected: false,
584 },
585 Test {
586 dest: Some("./file.txt".to_string()),
587 file_name: OsStr::new("key.json"),
588 expected: Some(Path::new(".").join("file.txt")),
589 error_expected: false,
590 },
591 Test {
592 dest: Some(".".to_string()),
593 file_name: OsStr::new("key.json"),
594 expected: Some(Path::new(".").join("key.json")),
595 error_expected: false,
596 },
597 Test {
598 dest: Some("file.txt".to_string()),
599 file_name: OsStr::new("key.json"),
600 expected: Some(Path::new(".").join("file.txt")),
601 error_expected: false,
602 },
603 Test {
604 dest: Some("/file.txt".to_string()),
605 file_name: OsStr::new("key.json"),
606 expected: Some(Path::new("/").join("file.txt")),
607 error_expected: false,
608 },
609 Test {
610 dest: Some("tmp/file.txt".to_string()),
611 file_name: OsStr::new("key.json"),
612 expected: None,
613 error_expected: true,
614 },
615 Test {
616 dest: Some("a/b/cfile.txt".to_string()),
617 file_name: OsStr::new("key.json"),
618 expected: None,
619 error_expected: true,
620 },
621 ];
622
623 for test in tests {
624 match get_dest(test.dest, test.file_name) {
625 Ok(res) => {
626 if test.error_expected {
627 panic!("Expected an error, but got: {res:?}");
629 } else {
630 assert_eq!(res, test.expected.unwrap());
631 }
632 }
633 Err(_) => {
634 assert!(test.error_expected, "Unexpected error");
636 }
637 }
638 }
639
640 Ok(())
641 }
642
643 #[test]
644 fn test_json_metadata_output_extracts_fields() {
645 let mut headers = BTreeMap::new();
646 headers.insert("content-length".to_string(), "42".to_string());
647 headers.insert("content-type".to_string(), "text/plain".to_string());
648 headers.insert("etag".to_string(), "\"etag\"".to_string());
649 headers.insert(
650 "last-modified".to_string(),
651 "Sat, 14 Mar 2026 08:00:00 GMT".to_string(),
652 );
653 headers.insert("x-amz-meta-owner".to_string(), "alice".to_string());
654 let s3 = S3::new(
655 &crate::s3::Credentials::new("AKIA", &secrecy::SecretString::new("secret".into())),
656 &"us-east-1".parse::<crate::s3::Region>().unwrap(),
657 Some("bucket".to_string()),
658 false,
659 );
660
661 let output =
662 json_metadata_output(&s3, "path/file.txt", Some("v1".to_string()), headers).unwrap();
663 let rendered = serde_json::to_value(output).unwrap();
664
665 assert_eq!(rendered["bucket"], "bucket");
666 assert_eq!(rendered["key"], "path/file.txt");
667 assert_eq!(rendered["version_id"], "v1");
668 assert_eq!(rendered["content_length"], 42);
669 assert_eq!(rendered["last_modified"], "2026-03-14T08:00:00+00:00");
670 assert_eq!(rendered["metadata"]["owner"], "alice");
671 }
672
673 #[test]
674 fn test_json_metadata_output_rejects_invalid_last_modified() {
675 let mut headers = BTreeMap::new();
676 headers.insert("last-modified".to_string(), "not-a-date".to_string());
677 let s3 = S3::new(
678 &crate::s3::Credentials::new("AKIA", &secrecy::SecretString::new("secret".into())),
679 &"us-east-1".parse::<crate::s3::Region>().unwrap(),
680 Some("bucket".to_string()),
681 false,
682 );
683
684 let err = json_metadata_output(&s3, "path/file.txt", None, headers)
685 .unwrap_err()
686 .to_string();
687 assert!(err.contains("Failed to parse Last-Modified header"));
688 }
689
690 fn test_s3(endpoint: String) -> S3 {
691 S3::new(
692 &Credentials::new(
693 "AKIAIOSFODNN7EXAMPLE",
694 &SecretString::new("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".into()),
695 ),
696 &Region::custom("us-west-1", endpoint),
697 Some("bucket".to_string()),
698 false,
699 )
700 }
701
702 #[tokio::test]
703 async fn test_handle_metadata_json_branch() {
704 let mut server = Server::new_async().await;
705 let _head = server
706 .mock("HEAD", "/bucket/file.txt")
707 .with_status(200)
708 .with_header("content-length", "42")
709 .with_header("content-type", "text/plain")
710 .with_header("etag", "\"head-etag\"")
711 .with_header("x-amz-meta-owner", "alice")
712 .create_async()
713 .await;
714
715 handle(
716 &test_s3(server.url()),
717 Action::GetObject {
718 dest: None,
719 metadata: true,
720 key: "file.txt".to_string(),
721 quiet: false,
722 force: false,
723 json: true,
724 versions: false,
725 version: None,
726 },
727 GlobalArgs::new(),
728 )
729 .await
730 .unwrap();
731 }
732
733 #[tokio::test]
734 async fn test_handle_versions_json_branch() {
735 let mut server = Server::new_async().await;
736 let _versions = server
737 .mock("GET", "/bucket")
738 .match_query(Matcher::AllOf(vec![
739 Matcher::UrlEncoded("prefix".into(), "prefix".into()),
740 Matcher::UrlEncoded("versions".into(), String::new()),
741 ]))
742 .with_status(200)
743 .with_header("content-type", "application/xml")
744 .with_body(
745 r#"<?xml version="1.0" encoding="UTF-8"?><ListVersionsResult><Name>bucket</Name><Prefix>prefix</Prefix><KeyMarker></KeyMarker><MaxKeys>1000</MaxKeys><IsTruncated>false</IsTruncated><Version><Key>prefix</Key><VersionId>v1</VersionId><IsLatest>true</IsLatest><LastModified>2026-03-14T00:00:00.000Z</LastModified><ETag>"etag"</ETag><Size>5</Size><Owner><ID>owner</ID></Owner><StorageClass>STANDARD</StorageClass></Version></ListVersionsResult>"#,
746 )
747 .create_async()
748 .await;
749
750 handle(
751 &test_s3(server.url()),
752 Action::GetObject {
753 dest: None,
754 metadata: false,
755 key: "prefix".to_string(),
756 quiet: false,
757 force: false,
758 json: true,
759 versions: true,
760 version: None,
761 },
762 GlobalArgs::new(),
763 )
764 .await
765 .unwrap();
766 }
767}