s3sync/pipeline/
versioning_info_collector.rs1use std::collections::HashMap;
2
3use anyhow::{Context, Result};
4use aws_sdk_s3::operation::head_object::HeadObjectOutput;
5use aws_sdk_s3::types::ObjectVersion;
6use aws_smithy_types::DateTime;
7use aws_smithy_types_convert::date_time::DateTimeExt;
8use tracing::debug;
9
10use crate::storage::Storage;
11use crate::types::SyncStatistics::SyncSkip;
12use crate::types::event_callback::{EventData, EventType};
13use crate::types::{ObjectVersions, S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY, S3syncObject};
14use crate::{Config, types};
15
16pub struct VersioningInfoCollector {
17 config: Config,
18 worker_index: u16,
19 target: Storage,
20}
21
22type HeadObjectOutputMap = HashMap<String, HeadObjectOutput>;
23
24impl VersioningInfoCollector {
25 pub fn new(config: Config, target: Storage, worker_index: u16) -> Self {
26 Self {
27 config,
28 target,
29 worker_index,
30 }
31 }
32
33 pub async fn collect_object_versions_to_sync(
34 &self,
35 source_packed_object_versions: &S3syncObject,
36 ) -> Result<ObjectVersions> {
37 let source_object_versions = types::unpack_object_versions(source_packed_object_versions);
38
39 if self.config.point_in_time.is_some() {
41 let mut object_versions_to_sync = ObjectVersions::new();
42 for source_object in source_object_versions {
43 object_versions_to_sync.push(source_object);
44 }
45
46 return Ok(object_versions_to_sync);
47 }
48
49 let key = source_packed_object_versions.key();
50 let target_object_versions = self
51 .target
52 .get_object_versions(key, self.config.max_keys)
53 .await?;
54
55 let target_latest_version_deleted = is_latest_version_deleted(&target_object_versions);
56
57 let target_head_object_output_map = self
58 .build_head_object_output_map(&target_object_versions)
59 .await?;
60
61 let mut object_versions_to_sync = ObjectVersions::new();
62 for source_object in source_object_versions {
63 if let S3syncObject::DeleteMarker(marker) = &source_object {
64 if !target_latest_version_deleted || !object_versions_to_sync.is_empty() {
66 object_versions_to_sync.push(source_object);
67 } else {
68 let source_version_id = marker.version_id().unwrap();
69 let source_last_modified =
70 DateTime::from_millis(source_object.last_modified().to_millis()?)
71 .to_chrono_utc()?
72 .to_rfc3339();
73 debug!(
74 worker_index = self.worker_index,
75 key = key,
76 source_version_id = source_version_id,
77 source_last_modified = source_last_modified,
78 "latest version in the target storage has already been deleted."
79 );
80 }
81 continue;
82 }
83
84 let source_version_id = source_object.version_id().unwrap();
85 if does_not_contain_version_id(&target_head_object_output_map, source_version_id) {
86 object_versions_to_sync.push(source_object);
87 } else {
88 let source_last_modified =
89 DateTime::from_millis(source_object.last_modified().to_millis()?)
90 .to_chrono_utc()?
91 .to_rfc3339();
92
93 debug!(
94 worker_index = self.worker_index,
95 key = key,
96 source_version_id = source_version_id,
97 source_last_modified = source_last_modified,
98 "already synced."
99 );
100
101 let target_head_object_output = target_head_object_output_map
102 .get(source_version_id)
103 .expect("The version id must exist in the map.");
104
105 let mut event_data = EventData::new(EventType::SYNC_FILTERED);
106 event_data.key = Some(key.to_string());
107 event_data.source_version_id = source_object.version_id().map(|v| v.to_string());
109 event_data.source_last_modified = Some(*source_object.last_modified());
110 event_data.source_size = Some(source_object.size() as u64);
111 event_data.source_etag = source_object.e_tag().map(|s| s.to_string());
113 event_data.target_version_id = target_head_object_output.version_id.clone();
115 event_data.target_last_modified = target_head_object_output.last_modified;
116 event_data.target_size = target_head_object_output.content_length.map(|c| c as u64);
118 event_data.target_etag = target_head_object_output.e_tag.clone();
120 event_data.message =
121 Some("Object filtered. This version already synced.".to_string());
122 self.config.event_manager.trigger_event(event_data).await;
123
124 let _ = self
125 .target
126 .get_stats_sender()
127 .send(SyncSkip {
128 key: key.to_string(),
129 })
130 .await;
131 }
132 }
133
134 Ok(object_versions_to_sync)
135 }
136
137 async fn build_head_object_output_map(
138 &self,
139 target_object_versions: &Vec<ObjectVersion>,
140 ) -> Result<HeadObjectOutputMap> {
141 let mut head_object_output_map = HashMap::new();
142 for object in target_object_versions {
143 let target_version_id = object.version_id().unwrap().to_string();
144
145 let head_object_output = self
146 .target
147 .head_object(
148 object.key().unwrap(),
149 Some(target_version_id),
150 None,
151 None,
152 self.config.target_sse_c.clone(),
153 self.config.target_sse_c_key.clone(),
154 self.config.target_sse_c_key_md5.clone(),
155 )
156 .await
157 .context("head_object() failed.")?;
158 if let Some(metadata) = head_object_output.metadata() {
159 if let Some(source_version_id) = metadata.get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
160 {
161 head_object_output_map
162 .insert(source_version_id.to_string(), head_object_output);
163 }
164 }
165 }
166
167 Ok(head_object_output_map)
168 }
169}
170
171fn is_latest_version_deleted(object_versions: &Vec<ObjectVersion>) -> bool {
172 for object in object_versions {
173 if object.is_latest().unwrap() {
174 return false;
175 }
176 }
177 true
178}
179
180fn does_not_contain_version_id(
181 head_object_output_map: &HeadObjectOutputMap,
182 version_id: &str,
183) -> bool {
184 head_object_output_map.get(version_id).is_none()
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use aws_sdk_s3::primitives::DateTime;
191 use tracing_subscriber::EnvFilter;
192
193 #[test]
194 fn is_latest_version_deleted_true() {
195 init_dummy_tracing_subscriber();
196
197 let object_versions = vec![
198 ObjectVersion::builder().is_latest(false).build(),
199 ObjectVersion::builder().is_latest(false).build(),
200 ObjectVersion::builder().is_latest(false).build(),
201 ObjectVersion::builder().is_latest(false).build(),
202 ];
203
204 assert!(is_latest_version_deleted(&object_versions));
205 }
206
207 #[test]
208 fn is_latest_version_deleted_false() {
209 init_dummy_tracing_subscriber();
210
211 let object_versions = vec![
212 ObjectVersion::builder().is_latest(true).build(),
213 ObjectVersion::builder().is_latest(false).build(),
214 ObjectVersion::builder().is_latest(false).build(),
215 ObjectVersion::builder().is_latest(false).build(),
216 ];
217
218 assert!(!is_latest_version_deleted(&object_versions));
219 }
220
221 #[test]
222 fn does_not_contain_version_id_true() {
223 init_dummy_tracing_subscriber();
224
225 let mut object_versions_map = HeadObjectOutputMap::new();
226 object_versions_map.insert(
227 "version1".to_string(),
228 HeadObjectOutput::builder()
229 .version_id("version1")
230 .last_modified(DateTime::from_secs(10))
231 .build(),
232 );
233 object_versions_map.insert(
234 "version2".to_string(),
235 HeadObjectOutput::builder()
236 .version_id("version2")
237 .last_modified(DateTime::from_secs(11))
238 .build(),
239 );
240 object_versions_map.insert(
241 "version3".to_string(),
242 HeadObjectOutput::builder()
243 .version_id("version3")
244 .last_modified(DateTime::from_secs(11))
245 .build(),
246 );
247
248 assert!(does_not_contain_version_id(
249 &object_versions_map,
250 "version4"
251 ));
252 assert!(does_not_contain_version_id(
253 &object_versions_map,
254 "version5"
255 ));
256 assert!(does_not_contain_version_id(
257 &object_versions_map,
258 "version6"
259 ));
260 }
261
262 #[test]
263 fn does_not_contain_version_id_false() {
264 init_dummy_tracing_subscriber();
265
266 let mut object_versions_map = HeadObjectOutputMap::new();
267 object_versions_map.insert(
268 "version1".to_string(),
269 HeadObjectOutput::builder()
270 .version_id("version1")
271 .last_modified(DateTime::from_secs(10))
272 .build(),
273 );
274 object_versions_map.insert(
275 "version2".to_string(),
276 HeadObjectOutput::builder()
277 .version_id("version2")
278 .last_modified(DateTime::from_secs(11))
279 .build(),
280 );
281 object_versions_map.insert(
282 "version3".to_string(),
283 HeadObjectOutput::builder()
284 .version_id("version3")
285 .last_modified(DateTime::from_secs(11))
286 .build(),
287 );
288
289 assert!(!does_not_contain_version_id(
290 &object_versions_map,
291 "version1"
292 ));
293 assert!(!does_not_contain_version_id(
294 &object_versions_map,
295 "version2"
296 ));
297 assert!(!does_not_contain_version_id(
298 &object_versions_map,
299 "version3"
300 ));
301 }
302
303 fn init_dummy_tracing_subscriber() {
304 let _ = tracing_subscriber::fmt()
305 .with_env_filter(
306 EnvFilter::try_from_default_env()
307 .or_else(|_| EnvFilter::try_new("dummy=trace"))
308 .unwrap(),
309 )
310 .try_init();
311 }
312}