Skip to main content

s3m/cli/actions/
object_get.rs

1use crate::{
2    cli::{actions::Action, globals::GlobalArgs, progressbar::Bar},
3    s3::{S3, actions, tools::throttle_download},
4};
5use anyhow::{Context, Result, anyhow};
6use bytes::{Buf, BytesMut};
7use bytesize::ByteSize;
8use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::stream::DecryptorBE32};
9use chrono::{DateTime, Utc};
10use colored::Colorize;
11use http::{HeaderMap, header::CONTENT_TYPE};
12use secrecy::ExposeSecret;
13use serde::Serialize;
14use std::{
15    cmp::min,
16    collections::BTreeMap,
17    ffi::{OsStr, OsString},
18    path::{Path, PathBuf},
19};
20use tokio::{fs::OpenOptions, io::AsyncWriteExt};
21
22#[derive(Debug, Serialize, PartialEq, Eq)]
23struct MetadataJsonOutput {
24    bucket: Option<String>,
25    key: String,
26    version_id: Option<String>,
27    content_length: Option<u64>,
28    content_type: Option<String>,
29    etag: Option<String>,
30    last_modified: Option<String>,
31    storage_class: Option<String>,
32    checksum_crc32: Option<String>,
33    checksum_crc32c: Option<String>,
34    checksum_sha1: Option<String>,
35    checksum_sha256: Option<String>,
36    metadata: BTreeMap<String, String>,
37    headers: BTreeMap<String, String>,
38}
39
40#[derive(Debug, Serialize, PartialEq, Eq)]
41struct VersionsJsonOutput {
42    bucket: Option<String>,
43    key_prefix: String,
44    versions: Vec<VersionJsonEntry>,
45}
46
47#[derive(Debug, Serialize, PartialEq, Eq)]
48struct VersionJsonEntry {
49    key: String,
50    version_id: String,
51    is_latest: bool,
52    last_modified: String,
53    size_bytes: u64,
54    etag: String,
55    storage_class: String,
56}
57
58struct DownloadState {
59    file: tokio::fs::File,
60    pb: Bar,
61    file_size: u64,
62    downloaded: u64,
63    buffer: BytesMut,
64    decryptor: Option<DecryptorBE32<ChaCha20Poly1305>>,
65    is_encrypted: bool,
66    can_decrypt: bool,
67    cipher: Option<ChaCha20Poly1305>,
68}
69
70enum OutputFormat {
71    Text,
72    Json,
73}
74
75enum GetObjectRequest {
76    Metadata {
77        key: String,
78        version: Option<String>,
79        output: OutputFormat,
80    },
81    Versions {
82        key: String,
83        output: OutputFormat,
84    },
85    Download {
86        key: String,
87        version: Option<String>,
88        dest: Option<String>,
89        quiet: bool,
90        force: bool,
91    },
92}
93
94/// # Errors
95/// Will return an error if the action fails
96pub async fn handle(s3: &S3, action: Action, globals: GlobalArgs) -> Result<()> {
97    if let Action::GetObject {
98        key,
99        metadata,
100        dest,
101        quiet,
102        force,
103        json,
104        versions,
105        version,
106    } = action
107    {
108        let output = if json {
109            OutputFormat::Json
110        } else {
111            OutputFormat::Text
112        };
113
114        let request = if metadata {
115            GetObjectRequest::Metadata {
116                key,
117                version,
118                output,
119            }
120        } else if versions {
121            GetObjectRequest::Versions { key, output }
122        } else {
123            GetObjectRequest::Download {
124                key,
125                version,
126                dest,
127                quiet,
128                force,
129            }
130        };
131
132        return handle_get_action(s3, request, globals).await;
133    }
134
135    Ok(())
136}
137
138async fn handle_get_action(s3: &S3, request: GetObjectRequest, globals: GlobalArgs) -> Result<()> {
139    match request {
140        GetObjectRequest::Metadata {
141            key,
142            version,
143            output,
144        } => handle_metadata(s3, &key, version, matches!(output, OutputFormat::Json)).await,
145        GetObjectRequest::Versions { key, output } => {
146            handle_versions(s3, &key, matches!(output, OutputFormat::Json)).await
147        }
148        GetObjectRequest::Download {
149            key,
150            version,
151            dest,
152            quiet,
153            force,
154        } => download_object(s3, key, version, dest, quiet, force, globals).await,
155    }
156}
157
158async fn download_object(
159    s3: &S3,
160    key: String,
161    version: Option<String>,
162    dest: Option<String>,
163    quiet: bool,
164    force: bool,
165    globals: GlobalArgs,
166) -> Result<()> {
167    let file_name = Path::new(&key)
168        .file_name()
169        .with_context(|| format!("Failed to get file name from: {key}"))?;
170    let action = actions::GetObject::new(&key, version);
171    let mut res = action.request(s3, &globals).await?;
172    let is_encrypted = is_s3m_encrypted(res.headers());
173    let can_decrypt = is_encrypted && globals.enc_key.is_some();
174
175    log::info!(
176        "file_name: {}, is_encrypted: {}, can_decrypt: {}",
177        file_name.to_string_lossy(),
178        is_encrypted,
179        can_decrypt
180    );
181
182    let final_file_name = determine_final_filename(file_name, can_decrypt);
183    let path = get_dest(dest, &final_file_name)?;
184    if path.is_file() && !force {
185        return Err(anyhow!("file {} already exists", path.display()));
186    }
187
188    let file = create_output_file(&path, force).await?;
189    let file_size = res
190        .content_length()
191        .context("could not get content_length")?;
192    let mut state = DownloadState::new(
193        file,
194        if quiet {
195            Bar::default()
196        } else {
197            Bar::new(file_size)
198        },
199        file_size,
200        is_encrypted,
201        can_decrypt,
202        create_cipher_if_needed(&globals, can_decrypt)?,
203    );
204
205    download_response(&mut res, &mut state, &globals).await?;
206    state.finish();
207    Ok(())
208}
209
210async fn download_response(
211    res: &mut reqwest::Response,
212    state: &mut DownloadState,
213    globals: &GlobalArgs,
214) -> Result<()> {
215    while let Some(chunk) = res.chunk().await? {
216        let chunk_len = chunk.len();
217        state.process_chunk(chunk).await?;
218
219        if let Some(bandwidth_kb) = globals.throttle {
220            throttle_download(bandwidth_kb, chunk_len).await?;
221        }
222    }
223
224    Ok(())
225}
226
227impl DownloadState {
228    fn new(
229        file: tokio::fs::File,
230        pb: Bar,
231        file_size: u64,
232        is_encrypted: bool,
233        can_decrypt: bool,
234        cipher: Option<ChaCha20Poly1305>,
235    ) -> Self {
236        Self {
237            file,
238            pb,
239            file_size,
240            downloaded: 0,
241            buffer: BytesMut::new(),
242            decryptor: None,
243            is_encrypted,
244            can_decrypt,
245            cipher,
246        }
247    }
248
249    async fn process_chunk(&mut self, chunk: bytes::Bytes) -> Result<()> {
250        self.downloaded = min(self.downloaded + chunk.len() as u64, self.file_size);
251        self.buffer.extend_from_slice(&chunk);
252
253        if self.can_decrypt {
254            return self.process_encrypted_buffer().await;
255        }
256
257        if self.is_encrypted {
258            return self.write_raw_buffer().await;
259        }
260
261        self.write_raw_buffer().await
262    }
263
264    async fn process_encrypted_buffer(&mut self) -> Result<()> {
265        loop {
266            if self.decryptor.is_none() {
267                if self.buffer.len() < 8 {
268                    break;
269                }
270
271                let nonce_len =
272                    *self.buffer.first().context("Failed to read nonce length")? as usize;
273                if nonce_len != 7 {
274                    return Err(anyhow!("Expected nonce length 7, got {nonce_len}"));
275                }
276
277                let nonce = self.buffer.get(1..8).context("Failed to get nonce bytes")?;
278                let cipher = self
279                    .cipher
280                    .clone()
281                    .context("Cipher not initialized for decryption")?;
282                self.decryptor = Some(DecryptorBE32::from_aead(cipher, nonce.into()));
283                self.buffer.advance(8);
284                continue;
285            }
286
287            if self.buffer.len() < 4 {
288                break;
289            }
290
291            let len_bytes = self
292                .buffer
293                .get(..4)
294                .context("Failed to read chunk length")?;
295            let len = u32::from_be_bytes(
296                len_bytes
297                    .try_into()
298                    .map_err(|_| anyhow!("Invalid chunk length bytes"))?,
299            ) as usize;
300
301            if self.buffer.len() < 4 + len {
302                break;
303            }
304
305            let mut encrypted_chunk = self
306                .buffer
307                .get(4..4 + len)
308                .context("Failed to read encrypted chunk")?
309                .to_vec();
310
311            self.decryptor
312                .as_mut()
313                .context("Decryptor not initialized")?
314                .decrypt_next_in_place(&[], &mut encrypted_chunk)
315                .map_err(|_| anyhow!("Decryption failed, check your encryption key"))?;
316
317            self.file.write_all(&encrypted_chunk).await?;
318            self.update_progress();
319            self.buffer.advance(4 + len);
320        }
321
322        Ok(())
323    }
324
325    async fn write_raw_buffer(&mut self) -> Result<()> {
326        self.file.write_all(&self.buffer).await?;
327        self.buffer.clear();
328        self.update_progress();
329        Ok(())
330    }
331
332    fn update_progress(&self) {
333        if let Some(pb) = self.pb.progress.as_ref() {
334            pb.set_position(self.downloaded);
335        }
336    }
337
338    fn finish(&self) {
339        if let Some(pb) = self.pb.progress.as_ref() {
340            pb.finish();
341        }
342    }
343}
344
345async fn handle_metadata(s3: &S3, key: &str, version: Option<String>, json: bool) -> Result<()> {
346    let action = actions::HeadObject::new(key, version.clone());
347    let headers = action.request(s3).await?;
348    if json {
349        println!(
350            "{}",
351            serde_json::to_string_pretty(&json_metadata_output(s3, key, version, headers)?)?
352        );
353        return Ok(());
354    }
355
356    let max_key_len = headers
357        .keys()
358        .map(std::string::String::len)
359        .max()
360        .unwrap_or(0)
361        + 1;
362
363    for (k, v) in headers {
364        println!(
365            "{:<width$} {}",
366            format!("{k}:").green(),
367            v,
368            width = max_key_len
369        );
370    }
371
372    Ok(())
373}
374
375async fn handle_versions(s3: &S3, key: &str, json: bool) -> Result<()> {
376    let action = actions::ListObjectVersions::new(key);
377    let result = action.request(s3).await?;
378
379    if json {
380        println!(
381            "{}",
382            serde_json::to_string_pretty(&VersionsJsonOutput {
383                bucket: s3.bucket().map(str::to_string),
384                key_prefix: key.to_string(),
385                versions: result
386                    .versions
387                    .into_iter()
388                    .map(|version| VersionJsonEntry {
389                        key: version.key,
390                        version_id: version.version_id,
391                        is_latest: version.is_latest,
392                        last_modified: version.last_modified,
393                        size_bytes: version.size,
394                        etag: version.e_tag,
395                        storage_class: version.storage_class,
396                    })
397                    .collect(),
398            })?
399        );
400        return Ok(());
401    }
402
403    if result.versions.is_empty() {
404        println!("No versions found for key: {key}");
405        return Ok(());
406    }
407
408    for version in result.versions {
409        let dt = DateTime::parse_from_rfc3339(&version.last_modified)?;
410        let last_modified: DateTime<Utc> = DateTime::from(dt);
411        println!(
412            "{} {:>10} {:<} ID: {}",
413            format!("[{}]", last_modified.format("%F %T %Z")).green(),
414            ByteSize(version.size).to_string().yellow(),
415            if version.is_latest {
416                format!("{} (latest)", version.key)
417            } else {
418                version.key.clone()
419            },
420            version.version_id
421        );
422    }
423
424    Ok(())
425}
426
427fn determine_final_filename(file_name: &OsStr, can_decrypt: bool) -> OsString {
428    if can_decrypt {
429        let file_name_str = file_name.to_string_lossy();
430        if let Some(stripped) = file_name_str.strip_suffix(".enc") {
431            OsString::from(stripped)
432        } else {
433            file_name.to_os_string()
434        }
435    } else {
436        file_name.to_os_string()
437    }
438}
439
440async fn create_output_file(path: &Path, force: bool) -> Result<tokio::fs::File> {
441    let mut options = OpenOptions::new();
442    options.write(true).create(true);
443
444    if force {
445        options.truncate(true);
446    }
447
448    options
449        .open(path)
450        .await
451        .with_context(|| format!("could not open {}", path.display()))
452}
453
454fn create_cipher_if_needed(
455    globals: &GlobalArgs,
456    can_decrypt: bool,
457) -> Result<Option<ChaCha20Poly1305>> {
458    if can_decrypt {
459        let key_bytes = globals
460            .enc_key
461            .as_ref()
462            .context("Encryption key is required but not provided")?
463            .expose_secret()
464            .as_bytes()
465            .into();
466        Ok(Some(ChaCha20Poly1305::new(key_bytes)))
467    } else {
468        Ok(None)
469    }
470}
471
472fn get_dest(dest: Option<String>, file_name: &OsStr) -> Result<PathBuf> {
473    if let Some(d) = dest {
474        let mut path_buf = PathBuf::from(&d);
475
476        // Check if the provided path is a directory
477        if path_buf.is_dir() {
478            path_buf.push(file_name);
479            return Ok(path_buf);
480        }
481
482        // If it's a file, check if the parent directory exists
483        if let Some(parent) = path_buf.parent() {
484            if parent.exists() {
485                return Ok(path_buf);
486            } else if path_buf.components().count() > 1 {
487                return Err(anyhow!(
488                    "parent directory {} does not exist",
489                    parent.display()
490                ));
491            }
492            return Ok(Path::new(".").join(path_buf));
493        }
494    }
495
496    // Use default path if dest is None
497    Ok(Path::new(".").join(file_name))
498}
499
500/// Returns `true` if the Content-Type is `application/vnd.s3m.encrypted`
501/// or starts with that (e.g., `application/vnd.s3m.encrypted`)
502pub fn is_s3m_encrypted(headers: &HeaderMap) -> bool {
503    headers
504        .get(CONTENT_TYPE)
505        .and_then(|v| v.to_str().ok())
506        .is_some_and(|ct| ct.starts_with("application/vnd.s3m.encrypted"))
507}
508
509fn json_metadata_output(
510    s3: &S3,
511    key: &str,
512    version: Option<String>,
513    headers: BTreeMap<String, String>,
514) -> Result<MetadataJsonOutput> {
515    let metadata = headers
516        .iter()
517        .filter_map(|(header, value)| {
518            header
519                .strip_prefix("x-amz-meta-")
520                .map(|metadata_key| (metadata_key.to_string(), value.clone()))
521        })
522        .collect();
523
524    Ok(MetadataJsonOutput {
525        bucket: s3.bucket().map(str::to_string),
526        key: key.to_string(),
527        version_id: version,
528        content_length: headers
529            .get("content-length")
530            .and_then(|value| value.parse::<u64>().ok()),
531        content_type: headers.get("content-type").cloned(),
532        etag: headers.get("etag").cloned(),
533        last_modified: headers
534            .get("last-modified")
535            .map(|value| normalize_json_timestamp(value))
536            .transpose()?,
537        storage_class: headers.get("x-amz-storage-class").cloned(),
538        checksum_crc32: headers.get("x-amz-checksum-crc32").cloned(),
539        checksum_crc32c: headers.get("x-amz-checksum-crc32c").cloned(),
540        checksum_sha1: headers.get("x-amz-checksum-sha1").cloned(),
541        checksum_sha256: headers.get("x-amz-checksum-sha256").cloned(),
542        metadata,
543        headers,
544    })
545}
546
547fn normalize_json_timestamp(value: &str) -> Result<String> {
548    DateTime::parse_from_rfc2822(value)
549        .or_else(|_| DateTime::parse_from_rfc3339(value))
550        .map(|timestamp| timestamp.with_timezone(&Utc).to_rfc3339())
551        .map_err(|error| anyhow!("Failed to parse Last-Modified header '{value}': {error}"))
552}
553
554#[cfg(test)]
555#[allow(
556    clippy::unwrap_used,
557    clippy::expect_used,
558    clippy::panic,
559    clippy::indexing_slicing,
560    clippy::unnecessary_wraps
561)]
562mod tests {
563    use super::*;
564    use crate::s3::{Credentials, Region, S3};
565    use anyhow::Result;
566    use mockito::{Matcher, Server};
567    use secrecy::SecretString;
568
569    struct Test {
570        dest: Option<String>,
571        file_name: &'static OsStr,
572        expected: Option<PathBuf>,
573        error_expected: bool,
574    }
575
576    #[tokio::test]
577    async fn test_get_dest() -> Result<()> {
578        let tests = vec![
579            Test {
580                dest: None,
581                file_name: OsStr::new("key.json"),
582                expected: Some(Path::new(".").join("key.json")),
583                error_expected: false,
584            },
585            Test {
586                dest: Some("./file.txt".to_string()),
587                file_name: OsStr::new("key.json"),
588                expected: Some(Path::new(".").join("file.txt")),
589                error_expected: false,
590            },
591            Test {
592                dest: Some(".".to_string()),
593                file_name: OsStr::new("key.json"),
594                expected: Some(Path::new(".").join("key.json")),
595                error_expected: false,
596            },
597            Test {
598                dest: Some("file.txt".to_string()),
599                file_name: OsStr::new("key.json"),
600                expected: Some(Path::new(".").join("file.txt")),
601                error_expected: false,
602            },
603            Test {
604                dest: Some("/file.txt".to_string()),
605                file_name: OsStr::new("key.json"),
606                expected: Some(Path::new("/").join("file.txt")),
607                error_expected: false,
608            },
609            Test {
610                dest: Some("tmp/file.txt".to_string()),
611                file_name: OsStr::new("key.json"),
612                expected: None,
613                error_expected: true,
614            },
615            Test {
616                dest: Some("a/b/cfile.txt".to_string()),
617                file_name: OsStr::new("key.json"),
618                expected: None,
619                error_expected: true,
620            },
621        ];
622
623        for test in tests {
624            match get_dest(test.dest, test.file_name) {
625                Ok(res) => {
626                    if test.error_expected {
627                        // If an error was not expected but the test passed, fail the test
628                        panic!("Expected an error, but got: {res:?}");
629                    } else {
630                        assert_eq!(res, test.expected.unwrap());
631                    }
632                }
633                Err(_) => {
634                    // If an error was not expected but the test failed, fail the test
635                    assert!(test.error_expected, "Unexpected error");
636                }
637            }
638        }
639
640        Ok(())
641    }
642
643    #[test]
644    fn test_json_metadata_output_extracts_fields() {
645        let mut headers = BTreeMap::new();
646        headers.insert("content-length".to_string(), "42".to_string());
647        headers.insert("content-type".to_string(), "text/plain".to_string());
648        headers.insert("etag".to_string(), "\"etag\"".to_string());
649        headers.insert(
650            "last-modified".to_string(),
651            "Sat, 14 Mar 2026 08:00:00 GMT".to_string(),
652        );
653        headers.insert("x-amz-meta-owner".to_string(), "alice".to_string());
654        let s3 = S3::new(
655            &crate::s3::Credentials::new("AKIA", &secrecy::SecretString::new("secret".into())),
656            &"us-east-1".parse::<crate::s3::Region>().unwrap(),
657            Some("bucket".to_string()),
658            false,
659        );
660
661        let output =
662            json_metadata_output(&s3, "path/file.txt", Some("v1".to_string()), headers).unwrap();
663        let rendered = serde_json::to_value(output).unwrap();
664
665        assert_eq!(rendered["bucket"], "bucket");
666        assert_eq!(rendered["key"], "path/file.txt");
667        assert_eq!(rendered["version_id"], "v1");
668        assert_eq!(rendered["content_length"], 42);
669        assert_eq!(rendered["last_modified"], "2026-03-14T08:00:00+00:00");
670        assert_eq!(rendered["metadata"]["owner"], "alice");
671    }
672
673    #[test]
674    fn test_json_metadata_output_rejects_invalid_last_modified() {
675        let mut headers = BTreeMap::new();
676        headers.insert("last-modified".to_string(), "not-a-date".to_string());
677        let s3 = S3::new(
678            &crate::s3::Credentials::new("AKIA", &secrecy::SecretString::new("secret".into())),
679            &"us-east-1".parse::<crate::s3::Region>().unwrap(),
680            Some("bucket".to_string()),
681            false,
682        );
683
684        let err = json_metadata_output(&s3, "path/file.txt", None, headers)
685            .unwrap_err()
686            .to_string();
687        assert!(err.contains("Failed to parse Last-Modified header"));
688    }
689
690    fn test_s3(endpoint: String) -> S3 {
691        S3::new(
692            &Credentials::new(
693                "AKIAIOSFODNN7EXAMPLE",
694                &SecretString::new("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".into()),
695            ),
696            &Region::custom("us-west-1", endpoint),
697            Some("bucket".to_string()),
698            false,
699        )
700    }
701
702    #[tokio::test]
703    async fn test_handle_metadata_json_branch() {
704        let mut server = Server::new_async().await;
705        let _head = server
706            .mock("HEAD", "/bucket/file.txt")
707            .with_status(200)
708            .with_header("content-length", "42")
709            .with_header("content-type", "text/plain")
710            .with_header("etag", "\"head-etag\"")
711            .with_header("x-amz-meta-owner", "alice")
712            .create_async()
713            .await;
714
715        handle(
716            &test_s3(server.url()),
717            Action::GetObject {
718                dest: None,
719                metadata: true,
720                key: "file.txt".to_string(),
721                quiet: false,
722                force: false,
723                json: true,
724                versions: false,
725                version: None,
726            },
727            GlobalArgs::new(),
728        )
729        .await
730        .unwrap();
731    }
732
733    #[tokio::test]
734    async fn test_handle_versions_json_branch() {
735        let mut server = Server::new_async().await;
736        let _versions = server
737            .mock("GET", "/bucket")
738            .match_query(Matcher::AllOf(vec![
739                Matcher::UrlEncoded("prefix".into(), "prefix".into()),
740                Matcher::UrlEncoded("versions".into(), String::new()),
741            ]))
742            .with_status(200)
743            .with_header("content-type", "application/xml")
744            .with_body(
745                r#"<?xml version="1.0" encoding="UTF-8"?><ListVersionsResult><Name>bucket</Name><Prefix>prefix</Prefix><KeyMarker></KeyMarker><MaxKeys>1000</MaxKeys><IsTruncated>false</IsTruncated><Version><Key>prefix</Key><VersionId>v1</VersionId><IsLatest>true</IsLatest><LastModified>2026-03-14T00:00:00.000Z</LastModified><ETag>"etag"</ETag><Size>5</Size><Owner><ID>owner</ID></Owner><StorageClass>STANDARD</StorageClass></Version></ListVersionsResult>"#,
746            )
747            .create_async()
748            .await;
749
750        handle(
751            &test_s3(server.url()),
752            Action::GetObject {
753                dest: None,
754                metadata: false,
755                key: "prefix".to_string(),
756                quiet: false,
757                force: false,
758                json: true,
759                versions: true,
760                version: None,
761            },
762            GlobalArgs::new(),
763        )
764        .await
765        .unwrap();
766    }
767}