s3sync/pipeline/
versioning_info_collector.rs

1use std::collections::HashMap;
2
3use anyhow::{Context, Result};
4use aws_sdk_s3::operation::head_object::HeadObjectOutput;
5use aws_sdk_s3::types::ObjectVersion;
6use aws_smithy_types::DateTime;
7use aws_smithy_types_convert::date_time::DateTimeExt;
8use tracing::debug;
9
10use crate::storage::Storage;
11use crate::types::SyncStatistics::SyncSkip;
12use crate::types::event_callback::{EventData, EventType};
13use crate::types::{ObjectVersions, S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY, S3syncObject};
14use crate::{Config, types};
15
16pub struct VersioningInfoCollector {
17    config: Config,
18    worker_index: u16,
19    target: Storage,
20}
21
22type HeadObjectOutputMap = HashMap<String, HeadObjectOutput>;
23
24impl VersioningInfoCollector {
25    pub fn new(config: Config, target: Storage, worker_index: u16) -> Self {
26        Self {
27            config,
28            target,
29            worker_index,
30        }
31    }
32
33    pub async fn collect_object_versions_to_sync(
34        &self,
35        source_packed_object_versions: &S3syncObject,
36    ) -> Result<ObjectVersions> {
37        let source_object_versions = types::unpack_object_versions(source_packed_object_versions);
38
39        // If point-in-time is set, we do not need to check the target storage.
40        if self.config.point_in_time.is_some() {
41            let mut object_versions_to_sync = ObjectVersions::new();
42            for source_object in source_object_versions {
43                object_versions_to_sync.push(source_object);
44            }
45
46            return Ok(object_versions_to_sync);
47        }
48
49        let key = source_packed_object_versions.key();
50        let target_object_versions = self
51            .target
52            .get_object_versions(key, self.config.max_keys)
53            .await?;
54
55        let target_latest_version_deleted = is_latest_version_deleted(&target_object_versions);
56
57        let target_head_object_output_map = self
58            .build_head_object_output_map(&target_object_versions)
59            .await?;
60
61        let mut object_versions_to_sync = ObjectVersions::new();
62        for source_object in source_object_versions {
63            if let S3syncObject::DeleteMarker(marker) = &source_object {
64                // delete marker is always at the end of the array
65                if !target_latest_version_deleted || !object_versions_to_sync.is_empty() {
66                    object_versions_to_sync.push(source_object);
67                } else {
68                    let source_version_id = marker.version_id().unwrap();
69                    let source_last_modified =
70                        DateTime::from_millis(source_object.last_modified().to_millis()?)
71                            .to_chrono_utc()?
72                            .to_rfc3339();
73                    debug!(
74                        worker_index = self.worker_index,
75                        key = key,
76                        source_version_id = source_version_id,
77                        source_last_modified = source_last_modified,
78                        "latest version in the target storage has already been deleted."
79                    );
80                }
81                continue;
82            }
83
84            let source_version_id = source_object.version_id().unwrap();
85            if does_not_contain_version_id(&target_head_object_output_map, source_version_id) {
86                object_versions_to_sync.push(source_object);
87            } else {
88                let source_last_modified =
89                    DateTime::from_millis(source_object.last_modified().to_millis()?)
90                        .to_chrono_utc()?
91                        .to_rfc3339();
92
93                debug!(
94                    worker_index = self.worker_index,
95                    key = key,
96                    source_version_id = source_version_id,
97                    source_last_modified = source_last_modified,
98                    "already synced."
99                );
100
101                let target_head_object_output = target_head_object_output_map
102                    .get(source_version_id)
103                    .expect("The version id must exist in the map.");
104
105                let mut event_data = EventData::new(EventType::SYNC_FILTERED);
106                event_data.key = Some(key.to_string());
107                // skipcq: RS-W1070
108                event_data.source_version_id = source_object.version_id().map(|v| v.to_string());
109                event_data.source_last_modified = Some(*source_object.last_modified());
110                event_data.source_size = Some(source_object.size() as u64);
111                // skipcq: RS-W1070
112                event_data.source_etag = source_object.e_tag().map(|s| s.to_string());
113                // skipcq: RS-W1070
114                event_data.target_version_id = target_head_object_output.version_id.clone();
115                event_data.target_last_modified = target_head_object_output.last_modified;
116                // skipcq: RS-W1070
117                event_data.target_size = target_head_object_output.content_length.map(|c| c as u64);
118                // skipcq: RS-W1070
119                event_data.target_etag = target_head_object_output.e_tag.clone();
120                event_data.message =
121                    Some("Object filtered. This version already synced.".to_string());
122                self.config.event_manager.trigger_event(event_data).await;
123
124                let _ = self
125                    .target
126                    .get_stats_sender()
127                    .send(SyncSkip {
128                        key: key.to_string(),
129                    })
130                    .await;
131            }
132        }
133
134        Ok(object_versions_to_sync)
135    }
136
137    async fn build_head_object_output_map(
138        &self,
139        target_object_versions: &Vec<ObjectVersion>,
140    ) -> Result<HeadObjectOutputMap> {
141        let mut head_object_output_map = HashMap::new();
142        for object in target_object_versions {
143            let target_version_id = object.version_id().unwrap().to_string();
144
145            let head_object_output = self
146                .target
147                .head_object(
148                    object.key().unwrap(),
149                    Some(target_version_id),
150                    None,
151                    None,
152                    self.config.target_sse_c.clone(),
153                    self.config.target_sse_c_key.clone(),
154                    self.config.target_sse_c_key_md5.clone(),
155                )
156                .await
157                .context("head_object() failed.")?;
158            if let Some(metadata) = head_object_output.metadata() {
159                if let Some(source_version_id) = metadata.get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
160                {
161                    head_object_output_map
162                        .insert(source_version_id.to_string(), head_object_output);
163                }
164            }
165        }
166
167        Ok(head_object_output_map)
168    }
169}
170
171fn is_latest_version_deleted(object_versions: &Vec<ObjectVersion>) -> bool {
172    for object in object_versions {
173        if object.is_latest().unwrap() {
174            return false;
175        }
176    }
177    true
178}
179
180fn does_not_contain_version_id(
181    head_object_output_map: &HeadObjectOutputMap,
182    version_id: &str,
183) -> bool {
184    head_object_output_map.get(version_id).is_none()
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use aws_sdk_s3::primitives::DateTime;
191    use tracing_subscriber::EnvFilter;
192
193    #[test]
194    fn is_latest_version_deleted_true() {
195        init_dummy_tracing_subscriber();
196
197        let object_versions = vec![
198            ObjectVersion::builder().is_latest(false).build(),
199            ObjectVersion::builder().is_latest(false).build(),
200            ObjectVersion::builder().is_latest(false).build(),
201            ObjectVersion::builder().is_latest(false).build(),
202        ];
203
204        assert!(is_latest_version_deleted(&object_versions));
205    }
206
207    #[test]
208    fn is_latest_version_deleted_false() {
209        init_dummy_tracing_subscriber();
210
211        let object_versions = vec![
212            ObjectVersion::builder().is_latest(true).build(),
213            ObjectVersion::builder().is_latest(false).build(),
214            ObjectVersion::builder().is_latest(false).build(),
215            ObjectVersion::builder().is_latest(false).build(),
216        ];
217
218        assert!(!is_latest_version_deleted(&object_versions));
219    }
220
221    #[test]
222    fn does_not_contain_version_id_true() {
223        init_dummy_tracing_subscriber();
224
225        let mut object_versions_map = HeadObjectOutputMap::new();
226        object_versions_map.insert(
227            "version1".to_string(),
228            HeadObjectOutput::builder()
229                .version_id("version1")
230                .last_modified(DateTime::from_secs(10))
231                .build(),
232        );
233        object_versions_map.insert(
234            "version2".to_string(),
235            HeadObjectOutput::builder()
236                .version_id("version2")
237                .last_modified(DateTime::from_secs(11))
238                .build(),
239        );
240        object_versions_map.insert(
241            "version3".to_string(),
242            HeadObjectOutput::builder()
243                .version_id("version3")
244                .last_modified(DateTime::from_secs(11))
245                .build(),
246        );
247
248        assert!(does_not_contain_version_id(
249            &object_versions_map,
250            "version4"
251        ));
252        assert!(does_not_contain_version_id(
253            &object_versions_map,
254            "version5"
255        ));
256        assert!(does_not_contain_version_id(
257            &object_versions_map,
258            "version6"
259        ));
260    }
261
262    #[test]
263    fn does_not_contain_version_id_false() {
264        init_dummy_tracing_subscriber();
265
266        let mut object_versions_map = HeadObjectOutputMap::new();
267        object_versions_map.insert(
268            "version1".to_string(),
269            HeadObjectOutput::builder()
270                .version_id("version1")
271                .last_modified(DateTime::from_secs(10))
272                .build(),
273        );
274        object_versions_map.insert(
275            "version2".to_string(),
276            HeadObjectOutput::builder()
277                .version_id("version2")
278                .last_modified(DateTime::from_secs(11))
279                .build(),
280        );
281        object_versions_map.insert(
282            "version3".to_string(),
283            HeadObjectOutput::builder()
284                .version_id("version3")
285                .last_modified(DateTime::from_secs(11))
286                .build(),
287        );
288
289        assert!(!does_not_contain_version_id(
290            &object_versions_map,
291            "version1"
292        ));
293        assert!(!does_not_contain_version_id(
294            &object_versions_map,
295            "version2"
296        ));
297        assert!(!does_not_contain_version_id(
298            &object_versions_map,
299            "version3"
300        ));
301    }
302
303    fn init_dummy_tracing_subscriber() {
304        let _ = tracing_subscriber::fmt()
305            .with_env_filter(
306                EnvFilter::try_from_default_env()
307                    .or_else(|_| EnvFilter::try_new("dummy=trace"))
308                    .unwrap(),
309            )
310            .try_init();
311    }
312}