sqs_lambda/
s3_event_emitter.rs

1use std::error::Error;
2
3use crate::event_emitter::EventEmitter;
4use async_trait::async_trait;
5
6use rusoto_s3::{PutObjectRequest, S3};
7use std::future::Future;
8
9#[derive(Clone)]
10pub struct S3EventEmitter<S, F, OnEmission, EmissionResult>
11where
12    S: S3 + Send + 'static,
13    F: Fn(&[u8]) -> String,
14    EmissionResult:
15        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
16    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
17{
18    s3: S,
19    output_bucket: String,
20    key_fn: F,
21    on_emission: OnEmission,
22}
23
24impl<S, F, OnEmission, EmissionResult> S3EventEmitter<S, F, OnEmission, EmissionResult>
25where
26    S: S3 + Send + 'static,
27    F: Fn(&[u8]) -> String,
28    EmissionResult:
29        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
30    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
31{
32    pub fn new(
33        s3: S,
34        output_bucket: impl Into<String>,
35        key_fn: F,
36        on_emission: OnEmission,
37    ) -> Self {
38        let output_bucket = output_bucket.into();
39        Self {
40            s3,
41            output_bucket,
42            key_fn,
43            on_emission,
44        }
45    }
46}
47
48#[async_trait]
49impl<S, F, OnEmission, EmissionResult> EventEmitter
50    for S3EventEmitter<S, F, OnEmission, EmissionResult>
51where
52    S: S3 + Send + Sync + 'static,
53    F: Fn(&[u8]) -> String + Send + Sync,
54    EmissionResult:
55        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
56    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
57{
58    type Event = Vec<u8>;
59    type Error = Box<dyn Error>;
60
61    #[tracing::instrument(skip(self, events))]
62    async fn emit_event(&mut self, events: Vec<Self::Event>) -> Result<(), Self::Error> {
63        for event in events {
64            let key = (self.key_fn)(&event);
65            self.s3
66                .put_object(PutObjectRequest {
67                    body: Some(event.into()),
68                    bucket: self.output_bucket.clone(),
69                    key: key.clone(),
70                    ..Default::default()
71                })
72                .await?;
73
74            // TODO: We shouldn't panic when this happens, we should retry or move on to the next event
75            (self.on_emission)(self.output_bucket.clone(), key.clone())
76                .await
77                .expect("on_emission failed");
78        }
79
80        // let event_uploads = tokio::time::timeout(
81        //     Duration::from_secs(5),
82        //     futures::future::join_all(event_uploads)
83        // ).await?;
84
85        // let mut err = None;
86        // for upload in event_uploads {
87        //     // upload?;
88        // }
89
90        Ok(())
91    }
92}