Skip to main content

ossify/ops/common/
replication.rs

1//! Replication rule types shared by `PutBucketReplication`, `GetBucketReplication`
2//! and `GetBucketReplicationProgress`.
3//!
4//! See <https://www.alibabacloud.com/help/en/oss/developer-reference/putbucketreplication>
5//! for the authoritative element reference.
6
7use serde::{Deserialize, Deserializer, Serialize};
8use serde_with::skip_serializing_none;
9
10/// Replication action(s) propagated to the destination bucket.
11#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
12pub enum ReplicationAction {
13    /// PUT / DELETE / ABORT operations are all replicated.
14    #[default]
15    #[serde(rename = "ALL")]
16    All,
17    /// Only write (PUT) operations are replicated.
18    #[serde(rename = "PUT")]
19    Put,
20}
21
22/// Data transfer link used for cross-region replication.
23///
24/// This type uses a manual `Deserialize` impl instead of the `#[derive]`
25/// version because quick-xml 0.39 treats derived enums specially when they
26/// appear as the text content of an element (`<Type>oss_acc</Type>`): it tries
27/// to interpret the element *tag* (`Type`) as the variant name rather than the
28/// text. The manual impl deserializes to a `String` first and then picks the
29/// variant.
30#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize)]
31pub enum TransferType {
32    /// Default OSS transfer link.
33    #[default]
34    #[serde(rename = "internal")]
35    Internal,
36    /// Transfer acceleration (CRR only).
37    #[serde(rename = "oss_acc")]
38    OssAcc,
39}
40
41impl<'de> Deserialize<'de> for TransferType {
42    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
43    where
44        D: Deserializer<'de>,
45    {
46        let s = String::deserialize(deserializer)?;
47        match s.as_str() {
48            "internal" => Ok(TransferType::Internal),
49            "oss_acc" => Ok(TransferType::OssAcc),
50            other => Err(serde::de::Error::custom(format!(
51                "unknown TransferType `{other}`, expected `internal` or `oss_acc`"
52            ))),
53        }
54    }
55}
56
57/// Whether a replication rule also replicates historical data.
58#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
59#[serde(rename_all = "lowercase")]
60pub enum HistoricalObjectReplication {
61    #[default]
62    Enabled,
63    Disabled,
64}
65
66/// Status of a replication rule itself (returned by GetBucketReplication).
67#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "lowercase")]
69pub enum ReplicationRuleStatus {
70    /// OSS is preparing the replication tasks.
71    Starting,
72    /// Replication is in effect.
73    Doing,
74    /// The rule has been deleted and OSS is cleaning up.
75    Closing,
76}
77
78/// Enable/disable flag used by RTC and SseKmsEncryptedObjects. OSS uses
79/// both lowercase (`enabled` / `disabled`) on the RTC `<Status>` element
80/// and Pascal-case (`Enabled` / `Disabled`) on the SSE-KMS `<Status>`
81/// element. Response of GetBucketReplication may also return `enabling`
82/// for RTC transitions. We model the union conservatively.
83#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
84pub enum RtcStatus {
85    #[serde(alias = "enabled", alias = "Enabled")]
86    #[serde(rename = "enabled")]
87    Enabled,
88    #[default]
89    #[serde(alias = "disabled", alias = "Disabled")]
90    #[serde(rename = "disabled")]
91    Disabled,
92    /// Transitional state surfaced only by GetBucketReplication.
93    #[serde(alias = "Enabling")]
94    #[serde(rename = "enabling")]
95    Enabling,
96}
97
98/// `<RTC>` container.
99#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "PascalCase")]
101pub struct Rtc {
102    pub status: RtcStatus,
103}
104
105/// `<PrefixSet>` container. OSS wraps the list in a `<Prefix>` repetition.
106#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "PascalCase")]
108pub struct PrefixSet {
109    #[serde(rename = "Prefix", default, skip_serializing_if = "Vec::is_empty")]
110    pub prefixes: Vec<String>,
111}
112
113/// `<Destination>` container for a replication rule.
114#[skip_serializing_none]
115#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "PascalCase")]
117pub struct ReplicationDestination {
118    pub bucket: String,
119    pub location: String,
120    pub transfer_type: Option<TransferType>,
121}
122
123/// `<SseKmsEncryptedObjects>` container.
124#[skip_serializing_none]
125#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "PascalCase")]
127pub struct SseKmsEncryptedObjects {
128    pub status: Option<SseKmsStatus>,
129}
130
131/// The subset of `RtcStatus` allowed on SseKmsEncryptedObjects: Pascal-case
132/// `Enabled` / `Disabled`.
133#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
134pub enum SseKmsStatus {
135    #[default]
136    Enabled,
137    Disabled,
138}
139
140/// `<SourceSelectionCriteria>` container.
141#[skip_serializing_none]
142#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
143#[serde(rename_all = "PascalCase")]
144pub struct SourceSelectionCriteria {
145    pub sse_kms_encrypted_objects: Option<SseKmsEncryptedObjects>,
146}
147
148/// `<EncryptionConfiguration>` container.
149#[skip_serializing_none]
150#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
151#[serde(rename_all = "PascalCase")]
152pub struct ReplicationEncryptionConfiguration {
153    #[serde(rename = "ReplicaKmsKeyID")]
154    pub replica_kms_key_id: Option<String>,
155}
156
157/// Tag filtering policy (within `<UserTaggings>`).
158#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
159pub enum UserTaggingFilterType {
160    #[serde(rename = "AND")]
161    And,
162    #[serde(rename = "OR")]
163    Or,
164}
165
166/// Single `<UserTagging>` entry.
167#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(rename_all = "PascalCase")]
169pub struct UserTagging {
170    pub key: String,
171    pub value: String,
172}
173
174impl UserTagging {
175    pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
176        Self {
177            key: key.into(),
178            value: value.into(),
179        }
180    }
181}
182
183/// `<UserTaggings>` container.
184#[skip_serializing_none]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[serde(rename_all = "PascalCase")]
187pub struct UserTaggings {
188    pub filter_type: Option<UserTaggingFilterType>,
189    #[serde(rename = "UserTagging", default, skip_serializing_if = "Vec::is_empty")]
190    pub user_taggings: Vec<UserTagging>,
191}
192
193/// Replication progress information (returned by GetBucketReplicationProgress).
194#[skip_serializing_none]
195#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
196#[serde(rename_all = "PascalCase")]
197pub struct ReplicationProgressInfo {
198    /// Percentage of replicated historical data as a string (e.g. "0.85").
199    pub historical_object: Option<String>,
200    /// GMT timestamp indicating replication cutoff for new objects.
201    pub new_object: Option<String>,
202}
203
204/// A single `<Rule>` inside `<ReplicationConfiguration>`.
205#[skip_serializing_none]
206#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
207#[serde(rename_all = "PascalCase")]
208pub struct ReplicationRule {
209    #[serde(rename = "ID")]
210    pub id: Option<String>,
211    #[serde(rename = "RTC")]
212    pub rtc: Option<Rtc>,
213    pub prefix_set: Option<PrefixSet>,
214    pub action: Option<ReplicationAction>,
215    pub destination: Option<ReplicationDestination>,
216    pub historical_object_replication: Option<HistoricalObjectReplication>,
217    pub sync_role: Option<String>,
218    pub source_selection_criteria: Option<SourceSelectionCriteria>,
219    pub encryption_configuration: Option<ReplicationEncryptionConfiguration>,
220    pub user_taggings: Option<UserTaggings>,
221    /// Populated by GetBucketReplication only.
222    pub status: Option<ReplicationRuleStatus>,
223    /// Populated by GetBucketReplicationProgress only.
224    pub progress: Option<ReplicationProgressInfo>,
225}
226
227/// Root `<ReplicationConfiguration>` element. Supports both
228/// PutBucketReplication (request body) and GetBucketReplication (response
229/// body).
230#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
231#[serde(rename = "ReplicationConfiguration")]
232pub struct ReplicationConfiguration {
233    #[serde(rename = "Rule", default, skip_serializing_if = "Vec::is_empty")]
234    pub rules: Vec<ReplicationRule>,
235}
236
237impl ReplicationConfiguration {
238    pub fn new() -> Self {
239        Self::default()
240    }
241
242    pub fn with_rules(rules: Vec<ReplicationRule>) -> Self {
243        Self { rules }
244    }
245}
246
247fn unwrap_locations<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
248where
249    D: Deserializer<'de>,
250{
251    Vec::<String>::deserialize(deserializer)
252}
253
254fn unwrap_transfer_types<'de, D>(deserializer: D) -> std::result::Result<Vec<TransferType>, D::Error>
255where
256    D: Deserializer<'de>,
257{
258    #[derive(Deserialize)]
259    struct Inner {
260        #[serde(rename = "Type", default)]
261        ty: Vec<TransferType>,
262    }
263    Ok(Inner::deserialize(deserializer)?.ty)
264}
265
266/// `<LocationTransferType>` entry inside `<LocationTransferTypeConstraint>`.
267#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
268#[serde(rename_all = "PascalCase")]
269pub struct LocationTransferType {
270    pub location: String,
271    /// Inner `<TransferTypes><Type>...</Type></TransferTypes>` flattened to a
272    /// `Vec<TransferType>`.
273    #[serde(default, deserialize_with = "unwrap_transfer_types")]
274    pub transfer_types: Vec<TransferType>,
275}
276
277/// `<LocationTransferTypeConstraint>` container.
278#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
279#[serde(rename_all = "PascalCase")]
280pub struct LocationTransferTypeConstraint {
281    #[serde(rename = "LocationTransferType", default)]
282    pub location_transfer_types: Vec<LocationTransferType>,
283}
284
285/// Response body for `GetBucketReplicationLocation` (XML root
286/// `<ReplicationLocation>`).
287#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
288#[serde(rename_all = "PascalCase")]
289pub struct ReplicationLocation {
290    #[serde(rename = "Location", default, deserialize_with = "unwrap_locations")]
291    pub locations: Vec<String>,
292    pub location_transfer_type_constraint: Option<LocationTransferTypeConstraint>,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn parse_full_rule() {
301        let xml = r#"<ReplicationConfiguration>
302  <Rule>
303    <ID>test_replication_1</ID>
304    <PrefixSet>
305      <Prefix>source1</Prefix>
306      <Prefix>video</Prefix>
307    </PrefixSet>
308    <UserTaggings>
309      <FilterType>OR</FilterType>
310      <UserTagging><Key>key1</Key><Value>value1</Value></UserTagging>
311      <UserTagging><Key>key2</Key><Value>value2</Value></UserTagging>
312    </UserTaggings>
313    <Action>PUT</Action>
314    <Destination>
315      <Bucket>destbucket</Bucket>
316      <Location>oss-cn-beijing</Location>
317      <TransferType>oss_acc</TransferType>
318    </Destination>
319    <Status>doing</Status>
320    <HistoricalObjectReplication>enabled</HistoricalObjectReplication>
321    <SyncRole>aliyunramrole</SyncRole>
322    <RTC>
323      <Status>enabled</Status>
324    </RTC>
325  </Rule>
326</ReplicationConfiguration>"#;
327        let parsed: ReplicationConfiguration = quick_xml::de::from_str(xml).unwrap();
328        assert_eq!(parsed.rules.len(), 1);
329        let rule = &parsed.rules[0];
330        assert_eq!(rule.id.as_deref(), Some("test_replication_1"));
331        assert_eq!(rule.action, Some(ReplicationAction::Put));
332        let dest = rule.destination.as_ref().unwrap();
333        assert_eq!(dest.bucket, "destbucket");
334        assert_eq!(dest.transfer_type, Some(TransferType::OssAcc));
335        assert_eq!(rule.status, Some(ReplicationRuleStatus::Doing));
336        assert_eq!(rule.rtc.as_ref().unwrap().status, RtcStatus::Enabled);
337        let prefixes = &rule.prefix_set.as_ref().unwrap().prefixes;
338        assert_eq!(prefixes, &vec!["source1".to_string(), "video".to_string()]);
339        let tags = rule.user_taggings.as_ref().unwrap();
340        assert_eq!(tags.filter_type, Some(UserTaggingFilterType::Or));
341        assert_eq!(tags.user_taggings.len(), 2);
342    }
343
344    #[test]
345    fn serialize_minimal_rule_round_trip() {
346        let cfg = ReplicationConfiguration::with_rules(vec![ReplicationRule {
347            prefix_set: Some(PrefixSet {
348                prefixes: vec!["source1".to_string()],
349            }),
350            action: Some(ReplicationAction::Put),
351            destination: Some(ReplicationDestination {
352                bucket: "destbucket".to_string(),
353                location: "oss-cn-beijing".to_string(),
354                transfer_type: Some(TransferType::OssAcc),
355            }),
356            historical_object_replication: Some(HistoricalObjectReplication::Enabled),
357            sync_role: Some("aliyunramrole".to_string()),
358            ..Default::default()
359        }]);
360        let xml = quick_xml::se::to_string(&cfg).unwrap();
361        assert!(xml.contains("<ReplicationConfiguration>"));
362        assert!(xml.contains("<Action>PUT</Action>"));
363        assert!(xml.contains("<TransferType>oss_acc</TransferType>"));
364        let back: ReplicationConfiguration = quick_xml::de::from_str(&xml).unwrap();
365        assert_eq!(back, cfg);
366    }
367
368    #[test]
369    fn parse_replication_location() {
370        let xml = r#"<ReplicationLocation>
371  <Location>oss-cn-beijing</Location>
372  <Location>oss-cn-qingdao</Location>
373  <LocationTransferTypeConstraint>
374    <LocationTransferType>
375      <Location>oss-cn-hongkong</Location>
376      <TransferTypes><Type>oss_acc</Type></TransferTypes>
377    </LocationTransferType>
378  </LocationTransferTypeConstraint>
379</ReplicationLocation>"#;
380        let parsed: ReplicationLocation = quick_xml::de::from_str(xml).unwrap();
381        assert_eq!(
382            parsed.locations,
383            vec!["oss-cn-beijing".to_string(), "oss-cn-qingdao".to_string()]
384        );
385        let constraint = parsed.location_transfer_type_constraint.unwrap();
386        assert_eq!(constraint.location_transfer_types.len(), 1);
387        assert_eq!(constraint.location_transfer_types[0].location, "oss-cn-hongkong");
388        assert_eq!(constraint.location_transfer_types[0].transfer_types, vec![TransferType::OssAcc]);
389    }
390
391    #[test]
392    fn parse_progress() {
393        let xml = r#"<ReplicationProgress>
394  <Rule>
395    <ID>test_replication_1</ID>
396    <PrefixSet><Prefix>video</Prefix></PrefixSet>
397    <Action>PUT</Action>
398    <Destination>
399      <Bucket>target</Bucket>
400      <Location>oss-cn-beijing</Location>
401      <TransferType>oss_acc</TransferType>
402    </Destination>
403    <Status>doing</Status>
404    <HistoricalObjectReplication>enabled</HistoricalObjectReplication>
405    <Progress>
406      <HistoricalObject>0.85</HistoricalObject>
407      <NewObject>2015-09-24T15:28:14.000Z</NewObject>
408    </Progress>
409  </Rule>
410</ReplicationProgress>"#;
411        // The wrapper type is identical to ReplicationConfiguration (just a
412        // different root name). We reuse the same struct here.
413        #[derive(Deserialize)]
414        #[serde(rename = "ReplicationProgress", rename_all = "PascalCase")]
415        struct ReplicationProgress {
416            #[serde(rename = "Rule", default)]
417            rules: Vec<ReplicationRule>,
418        }
419        let parsed: ReplicationProgress = quick_xml::de::from_str(xml).unwrap();
420        assert_eq!(parsed.rules.len(), 1);
421        let progress = parsed.rules[0].progress.as_ref().unwrap();
422        assert_eq!(progress.historical_object.as_deref(), Some("0.85"));
423    }
424}