event_notification/
event.rs

1use crate::Error;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_with::{DeserializeFromStr, SerializeDisplay};
5use smallvec::{SmallVec, smallvec};
6use std::borrow::Cow;
7use std::collections::HashMap;
8use strum::{Display, EnumString};
9use uuid::Uuid;
10
11#[derive(Serialize, Deserialize, Clone, Debug)]
12pub struct Identity {
13    #[serde(rename = "principalId")]
14    pub principal_id: String,
15}
16
17#[derive(Serialize, Deserialize, Clone, Debug)]
18pub struct Bucket {
19    pub name: String,
20    #[serde(rename = "ownerIdentity")]
21    pub owner_identity: Identity,
22    pub arn: String,
23}
24
25#[derive(Serialize, Deserialize, Clone, Debug)]
26pub struct Object {
27    pub key: String,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub size: Option<i64>,
30    #[serde(default, skip_serializing_if = "Option::is_none", rename = "eTag")]
31    pub etag: Option<String>,
32    #[serde(
33        default,
34        skip_serializing_if = "Option::is_none",
35        rename = "contentType"
36    )]
37    pub content_type: Option<String>,
38    #[serde(
39        default,
40        skip_serializing_if = "Option::is_none",
41        rename = "userMetadata"
42    )]
43    pub user_metadata: Option<HashMap<String, String>>,
44    #[serde(default, skip_serializing_if = "Option::is_none", rename = "versionId")]
45    pub version_id: Option<String>,
46    pub sequencer: String,
47}
48
49#[derive(Serialize, Deserialize, Clone, Debug)]
50pub struct Metadata {
51    #[serde(rename = "s3SchemaVersion")]
52    pub schema_version: String,
53    #[serde(rename = "configurationId")]
54    pub configuration_id: String,
55    pub bucket: Bucket,
56    pub object: Object,
57}
58
59#[derive(Serialize, Deserialize, Clone, Debug)]
60pub struct Source {
61    pub host: String,
62    pub port: String,
63    #[serde(rename = "userAgent")]
64    pub user_agent: String,
65}
66
67/// Builder for creating an Event.
68///
69/// This struct is used to build an Event object with various parameters.
70/// It provides methods to set each parameter and a build method to create the Event.
71#[derive(Default, Clone)]
72pub struct EventBuilder {
73    event_version: Option<String>,
74    event_source: Option<String>,
75    aws_region: Option<String>,
76    event_time: Option<String>,
77    event_name: Option<Name>,
78    user_identity: Option<Identity>,
79    request_parameters: Option<HashMap<String, String>>,
80    response_elements: Option<HashMap<String, String>>,
81    s3: Option<Metadata>,
82    source: Option<Source>,
83    channels: Option<SmallVec<[String; 2]>>,
84}
85
86impl EventBuilder {
87    /// create a builder that pre filled default values
88    pub fn new() -> Self {
89        Self {
90            event_version: Some(Cow::Borrowed("2.0").to_string()),
91            event_source: Some(Cow::Borrowed("aws:s3").to_string()),
92            aws_region: Some("us-east-1".to_string()),
93            event_time: Some(Utc::now().to_rfc3339()),
94            event_name: None,
95            user_identity: Some(Identity {
96                principal_id: "anonymous".to_string(),
97            }),
98            request_parameters: Some(HashMap::new()),
99            response_elements: Some(HashMap::new()),
100            s3: None,
101            source: None,
102            channels: Some(Vec::new().into()),
103        }
104    }
105
106    /// verify and set the event version
107    pub fn event_version(mut self, event_version: impl Into<String>) -> Self {
108        let event_version = event_version.into();
109        if !event_version.is_empty() {
110            self.event_version = Some(event_version);
111        }
112        self
113    }
114
115    /// verify and set the event source
116    pub fn event_source(mut self, event_source: impl Into<String>) -> Self {
117        let event_source = event_source.into();
118        if !event_source.is_empty() {
119            self.event_source = Some(event_source);
120        }
121        self
122    }
123
124    /// set up aws regions
125    pub fn aws_region(mut self, aws_region: impl Into<String>) -> Self {
126        self.aws_region = Some(aws_region.into());
127        self
128    }
129
130    /// set event time
131    pub fn event_time(mut self, event_time: impl Into<String>) -> Self {
132        self.event_time = Some(event_time.into());
133        self
134    }
135
136    /// set event name
137    pub fn event_name(mut self, event_name: Name) -> Self {
138        self.event_name = Some(event_name);
139        self
140    }
141
142    /// set user identity
143    pub fn user_identity(mut self, user_identity: Identity) -> Self {
144        self.user_identity = Some(user_identity);
145        self
146    }
147
148    /// set request parameters
149    pub fn request_parameters(mut self, request_parameters: HashMap<String, String>) -> Self {
150        self.request_parameters = Some(request_parameters);
151        self
152    }
153
154    /// set response elements
155    pub fn response_elements(mut self, response_elements: HashMap<String, String>) -> Self {
156        self.response_elements = Some(response_elements);
157        self
158    }
159
160    /// setting up s3 metadata
161    pub fn s3(mut self, s3: Metadata) -> Self {
162        self.s3 = Some(s3);
163        self
164    }
165
166    /// set event source information
167    pub fn source(mut self, source: Source) -> Self {
168        self.source = Some(source);
169        self
170    }
171
172    /// set up the sending channel
173    pub fn channels(mut self, channels: Vec<String>) -> Self {
174        self.channels = Some(channels.into());
175        self
176    }
177
178    /// Create a preconfigured builder for common object event scenarios
179    pub fn for_object_creation(s3: Metadata, source: Source) -> Self {
180        Self::new()
181            .event_name(Name::ObjectCreatedPut)
182            .s3(s3)
183            .source(source)
184    }
185
186    /// Create a preconfigured builder for object deletion events
187    pub fn for_object_removal(s3: Metadata, source: Source) -> Self {
188        Self::new()
189            .event_name(Name::ObjectRemovedDelete)
190            .s3(s3)
191            .source(source)
192    }
193
194    /// build event instance
195    ///
196    /// Verify the required fields and create a complete Event object
197    pub fn build(self) -> Result<Event, Error> {
198        let event_version = self
199            .event_version
200            .ok_or(Error::MissingField("event_version"))?;
201
202        let event_source = self
203            .event_source
204            .ok_or(Error::MissingField("event_source"))?;
205
206        let aws_region = self.aws_region.ok_or(Error::MissingField("aws_region"))?;
207
208        let event_time = self.event_time.ok_or(Error::MissingField("event_time"))?;
209
210        let event_name = self.event_name.ok_or(Error::MissingField("event_name"))?;
211
212        let user_identity = self
213            .user_identity
214            .ok_or(Error::MissingField("user_identity"))?;
215
216        let request_parameters = self.request_parameters.unwrap_or_default();
217        let response_elements = self.response_elements.unwrap_or_default();
218
219        let s3 = self.s3.ok_or(Error::MissingField("s3"))?;
220
221        let source = self.source.ok_or(Error::MissingField("source"))?;
222
223        let channels = self.channels.unwrap_or_else(|| smallvec![]);
224
225        Ok(Event {
226            event_version,
227            event_source,
228            aws_region,
229            event_time,
230            event_name,
231            user_identity,
232            request_parameters,
233            response_elements,
234            s3,
235            source,
236            id: Uuid::new_v4(),
237            timestamp: Utc::now(),
238            channels,
239        })
240    }
241}
242
243#[derive(Serialize, Deserialize, Clone, Debug)]
244pub struct Event {
245    #[serde(rename = "eventVersion")]
246    pub event_version: String,
247    #[serde(rename = "eventSource")]
248    pub event_source: String,
249    #[serde(rename = "awsRegion")]
250    pub aws_region: String,
251    #[serde(rename = "eventTime")]
252    pub event_time: String,
253    #[serde(rename = "eventName")]
254    pub event_name: Name,
255    #[serde(rename = "userIdentity")]
256    pub user_identity: Identity,
257    #[serde(rename = "requestParameters")]
258    pub request_parameters: HashMap<String, String>,
259    #[serde(rename = "responseElements")]
260    pub response_elements: HashMap<String, String>,
261    pub s3: Metadata,
262    pub source: Source,
263    pub id: Uuid,
264    pub timestamp: DateTime<Utc>,
265    pub channels: SmallVec<[String; 2]>,
266}
267
268impl Event {
269    /// create a new event builder
270    ///
271    /// Returns an EventBuilder instance pre-filled with default values
272    pub fn builder() -> EventBuilder {
273        EventBuilder::new()
274    }
275
276    /// Quickly create Event instances with necessary fields
277    ///
278    /// suitable for common s3 event scenarios
279    pub fn create(event_name: Name, s3: Metadata, source: Source, channels: Vec<String>) -> Self {
280        Self::builder()
281            .event_name(event_name)
282            .s3(s3)
283            .source(source)
284            .channels(channels)
285            .build()
286            .expect("Failed to create event, missing necessary parameters")
287    }
288
289    /// a convenient way to create a preconfigured builder
290    pub fn for_object_creation(s3: Metadata, source: Source) -> EventBuilder {
291        EventBuilder::for_object_creation(s3, source)
292    }
293
294    /// a convenient way to create a preconfigured builder
295    pub fn for_object_removal(s3: Metadata, source: Source) -> EventBuilder {
296        EventBuilder::for_object_removal(s3, source)
297    }
298
299    /// Determine whether an event belongs to a specific type
300    pub fn is_type(&self, event_type: Name) -> bool {
301        let mask = event_type.mask();
302        (self.event_name.mask() & mask) != 0
303    }
304
305    /// Determine whether an event needs to be sent to a specific channel
306    pub fn is_for_channel(&self, channel: &str) -> bool {
307        self.channels.iter().any(|c| c == channel)
308    }
309}
310
311#[derive(Serialize, Deserialize, Clone, Debug)]
312pub struct Log {
313    #[serde(rename = "eventName")]
314    pub event_name: Name,
315    pub key: String,
316    pub records: Vec<Event>,
317}
318
319#[derive(
320    Debug, Clone, Copy, PartialEq, Eq, SerializeDisplay, DeserializeFromStr, Display, EnumString,
321)]
322#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
323pub enum Name {
324    ObjectAccessedGet,
325    ObjectAccessedGetRetention,
326    ObjectAccessedGetLegalHold,
327    ObjectAccessedHead,
328    ObjectAccessedAttributes,
329    ObjectCreatedCompleteMultipartUpload,
330    ObjectCreatedCopy,
331    ObjectCreatedPost,
332    ObjectCreatedPut,
333    ObjectCreatedPutRetention,
334    ObjectCreatedPutLegalHold,
335    ObjectCreatedPutTagging,
336    ObjectCreatedDeleteTagging,
337    ObjectRemovedDelete,
338    ObjectRemovedDeleteMarkerCreated,
339    ObjectRemovedDeleteAllVersions,
340    ObjectRemovedNoOp,
341    BucketCreated,
342    BucketRemoved,
343    ObjectReplicationFailed,
344    ObjectReplicationComplete,
345    ObjectReplicationMissedThreshold,
346    ObjectReplicationReplicatedAfterThreshold,
347    ObjectReplicationNotTracked,
348    ObjectRestorePost,
349    ObjectRestoreCompleted,
350    ObjectTransitionFailed,
351    ObjectTransitionComplete,
352    ObjectManyVersions,
353    ObjectLargeVersions,
354    PrefixManyFolders,
355    IlmDelMarkerExpirationDelete,
356    ObjectAccessedAll,
357    ObjectCreatedAll,
358    ObjectRemovedAll,
359    ObjectReplicationAll,
360    ObjectRestoreAll,
361    ObjectTransitionAll,
362    ObjectScannerAll,
363    Everything,
364}
365
366impl Name {
367    pub fn expand(&self) -> Vec<Name> {
368        match self {
369            Name::ObjectAccessedAll => vec![
370                Name::ObjectAccessedGet,
371                Name::ObjectAccessedHead,
372                Name::ObjectAccessedGetRetention,
373                Name::ObjectAccessedGetLegalHold,
374                Name::ObjectAccessedAttributes,
375            ],
376            Name::ObjectCreatedAll => vec![
377                Name::ObjectCreatedCompleteMultipartUpload,
378                Name::ObjectCreatedCopy,
379                Name::ObjectCreatedPost,
380                Name::ObjectCreatedPut,
381                Name::ObjectCreatedPutRetention,
382                Name::ObjectCreatedPutLegalHold,
383                Name::ObjectCreatedPutTagging,
384                Name::ObjectCreatedDeleteTagging,
385            ],
386            Name::ObjectRemovedAll => vec![
387                Name::ObjectRemovedDelete,
388                Name::ObjectRemovedDeleteMarkerCreated,
389                Name::ObjectRemovedNoOp,
390                Name::ObjectRemovedDeleteAllVersions,
391            ],
392            Name::ObjectReplicationAll => vec![
393                Name::ObjectReplicationFailed,
394                Name::ObjectReplicationComplete,
395                Name::ObjectReplicationNotTracked,
396                Name::ObjectReplicationMissedThreshold,
397                Name::ObjectReplicationReplicatedAfterThreshold,
398            ],
399            Name::ObjectRestoreAll => vec![Name::ObjectRestorePost, Name::ObjectRestoreCompleted],
400            Name::ObjectTransitionAll => {
401                vec![Name::ObjectTransitionFailed, Name::ObjectTransitionComplete]
402            }
403            Name::ObjectScannerAll => vec![
404                Name::ObjectManyVersions,
405                Name::ObjectLargeVersions,
406                Name::PrefixManyFolders,
407            ],
408            Name::Everything => (1..=Name::IlmDelMarkerExpirationDelete as u32)
409                .map(|i| Name::from_repr(i).unwrap())
410                .collect(),
411            _ => vec![*self],
412        }
413    }
414
415    pub fn mask(&self) -> u64 {
416        if (*self as u32) < Name::ObjectAccessedAll as u32 {
417            1 << (*self as u32 - 1)
418        } else {
419            self.expand()
420                .iter()
421                .fold(0, |acc, n| acc | (1 << (*n as u32 - 1)))
422        }
423    }
424
425    fn from_repr(discriminant: u32) -> Option<Self> {
426        match discriminant {
427            1 => Some(Name::ObjectAccessedGet),
428            2 => Some(Name::ObjectAccessedGetRetention),
429            3 => Some(Name::ObjectAccessedGetLegalHold),
430            4 => Some(Name::ObjectAccessedHead),
431            5 => Some(Name::ObjectAccessedAttributes),
432            6 => Some(Name::ObjectCreatedCompleteMultipartUpload),
433            7 => Some(Name::ObjectCreatedCopy),
434            8 => Some(Name::ObjectCreatedPost),
435            9 => Some(Name::ObjectCreatedPut),
436            10 => Some(Name::ObjectCreatedPutRetention),
437            11 => Some(Name::ObjectCreatedPutLegalHold),
438            12 => Some(Name::ObjectCreatedPutTagging),
439            13 => Some(Name::ObjectCreatedDeleteTagging),
440            14 => Some(Name::ObjectRemovedDelete),
441            15 => Some(Name::ObjectRemovedDeleteMarkerCreated),
442            16 => Some(Name::ObjectRemovedDeleteAllVersions),
443            17 => Some(Name::ObjectRemovedNoOp),
444            18 => Some(Name::BucketCreated),
445            19 => Some(Name::BucketRemoved),
446            20 => Some(Name::ObjectReplicationFailed),
447            21 => Some(Name::ObjectReplicationComplete),
448            22 => Some(Name::ObjectReplicationMissedThreshold),
449            23 => Some(Name::ObjectReplicationReplicatedAfterThreshold),
450            24 => Some(Name::ObjectReplicationNotTracked),
451            25 => Some(Name::ObjectRestorePost),
452            26 => Some(Name::ObjectRestoreCompleted),
453            27 => Some(Name::ObjectTransitionFailed),
454            28 => Some(Name::ObjectTransitionComplete),
455            29 => Some(Name::ObjectManyVersions),
456            30 => Some(Name::ObjectLargeVersions),
457            31 => Some(Name::PrefixManyFolders),
458            32 => Some(Name::IlmDelMarkerExpirationDelete),
459            33 => Some(Name::ObjectAccessedAll),
460            34 => Some(Name::ObjectCreatedAll),
461            35 => Some(Name::ObjectRemovedAll),
462            36 => Some(Name::ObjectReplicationAll),
463            37 => Some(Name::ObjectRestoreAll),
464            38 => Some(Name::ObjectTransitionAll),
465            39 => Some(Name::ObjectScannerAll),
466            40 => Some(Name::Everything),
467            _ => None,
468        }
469    }
470}