1use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use aws_sdk_s3::Client;
7use chrono::{DateTime, NaiveDateTime, Utc};
8use thiserror::Error as ErrorTrait;
9
10use crate::{FileEntry, FileSource};
11
12#[derive(Debug, ErrorTrait)]
14pub enum S3Error {
15 #[error("One of the objects returned does not have a key")]
16 ObjectMissingKey,
17
18 #[error("One of the objects returned has an incorrect prefix")]
19 ObjectWrongPrefix,
20
21 #[error(transparent)]
22 ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError),
23
24 #[error(transparent)]
25 S3Error(#[from] aws_sdk_s3::Error),
26}
27
28pub struct S3Files {
32 client: Client,
33 bucket: String,
34 prefix: PathBuf,
35 use_etag_as_hash: bool,
36}
37
38impl S3Files {
39 pub fn new<S: AsRef<str>, P: AsRef<Path>>(
44 client: Client,
45 bucket: S,
46 prefix: P,
47 use_etag_as_hash: bool,
48 ) -> Self {
49 S3Files {
50 client,
51 bucket: bucket.as_ref().to_owned(),
52 prefix: prefix.as_ref().to_owned(),
53 use_etag_as_hash,
54 }
55 }
56}
57
58#[async_trait]
59impl FileSource for S3Files {
60 type Error = S3Error;
61
62 async fn list_files(&mut self) -> Result<Vec<FileEntry>, Self::Error> {
63 let empty_path: PathBuf = PathBuf::new();
64
65 let response = self
66 .client
67 .list_objects_v2()
68 .bucket(self.bucket.clone())
69 .prefix(self.prefix.display().to_string())
70 .send()
71 .await
72 .map_err(aws_sdk_s3::Error::from)?;
73
74 let mut files = vec![];
75
76 if let Some(contents) = response.contents {
77 for object in contents {
78 let key: PathBuf = object
79 .key
80 .as_ref()
81 .map(PathBuf::from)
82 .ok_or(S3Error::ObjectMissingKey)?
83 .strip_prefix(&self.prefix)
84 .map_err(|_| S3Error::ObjectWrongPrefix)?
85 .to_owned();
86
87 if key != empty_path {
88 let modified = object.last_modified.and_then(|date_time| {
89 NaiveDateTime::from_timestamp_opt(
90 date_time.secs(),
91 date_time.subsec_nanos(),
92 )
93 .map(|x| x.and_utc())
94 });
95
96 let md5_hash = match self.use_etag_as_hash {
97 true => object.e_tag.and_then(|etag| {
98 let digest: Option<u128> =
99 u128::from_str_radix(etag.trim_matches('"'), 16).ok();
100
101 digest
102 }),
103 false => None,
104 };
105
106 files.push(FileEntry {
107 path: key,
108 size: u64::try_from(object.size).ok(),
109 modified,
110 md5_hash,
111 });
112 }
113 }
114 }
115
116 Ok(files)
117 }
118
119 async fn read_file<P: AsRef<Path> + Send>(&mut self, path: P) -> Result<Vec<u8>, Self::Error> {
120 let mut key = self.prefix.clone();
121 key.push(path.as_ref());
122 let key = key.display().to_string();
123
124 let output = self
125 .client
126 .get_object()
127 .bucket(self.bucket.clone())
128 .key(key)
129 .send()
130 .await
131 .map_err(aws_sdk_s3::Error::from)?;
132
133 let stream = output.body.collect().await?.to_vec();
134
135 Ok(stream)
136 }
137
138 async fn write_file<P: AsRef<Path> + Send>(
139 &mut self,
140 path: P,
141 bytes: &[u8],
142 ) -> Result<(), Self::Error> {
143 let mut key = self.prefix.clone();
144 key.push(path.as_ref());
145 let key = key.display().to_string();
146
147 let stream = aws_sdk_s3::primitives::ByteStream::from(bytes.to_owned());
148
149 self.client
150 .put_object()
151 .bucket(self.bucket.clone())
152 .key(key)
153 .body(stream)
154 .send()
155 .await
156 .map_err(aws_sdk_s3::Error::from)?;
157
158 Ok(())
159 }
160
161 async fn set_modified<P: AsRef<Path> + Send>(
162 &mut self,
163 _path: P,
164 _modified: Option<DateTime<Utc>>,
165 ) -> Result<bool, Self::Error> {
166 Ok(false)
167 }
168}