sqs_lambda/
s3_event_emitter.rs1use std::error::Error;
2
3use crate::event_emitter::EventEmitter;
4use async_trait::async_trait;
5
6use rusoto_s3::{PutObjectRequest, S3};
7use std::future::Future;
8
9#[derive(Clone)]
10pub struct S3EventEmitter<S, F, OnEmission, EmissionResult>
11where
12 S: S3 + Send + 'static,
13 F: Fn(&[u8]) -> String,
14 EmissionResult:
15 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
16 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
17{
18 s3: S,
19 output_bucket: String,
20 key_fn: F,
21 on_emission: OnEmission,
22}
23
24impl<S, F, OnEmission, EmissionResult> S3EventEmitter<S, F, OnEmission, EmissionResult>
25where
26 S: S3 + Send + 'static,
27 F: Fn(&[u8]) -> String,
28 EmissionResult:
29 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
30 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
31{
32 pub fn new(
33 s3: S,
34 output_bucket: impl Into<String>,
35 key_fn: F,
36 on_emission: OnEmission,
37 ) -> Self {
38 let output_bucket = output_bucket.into();
39 Self {
40 s3,
41 output_bucket,
42 key_fn,
43 on_emission,
44 }
45 }
46}
47
48#[async_trait]
49impl<S, F, OnEmission, EmissionResult> EventEmitter
50 for S3EventEmitter<S, F, OnEmission, EmissionResult>
51where
52 S: S3 + Send + Sync + 'static,
53 F: Fn(&[u8]) -> String + Send + Sync,
54 EmissionResult:
55 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
56 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
57{
58 type Event = Vec<u8>;
59 type Error = Box<dyn Error>;
60
61 #[tracing::instrument(skip(self, events))]
62 async fn emit_event(&mut self, events: Vec<Self::Event>) -> Result<(), Self::Error> {
63 for event in events {
64 let key = (self.key_fn)(&event);
65 self.s3
66 .put_object(PutObjectRequest {
67 body: Some(event.into()),
68 bucket: self.output_bucket.clone(),
69 key: key.clone(),
70 ..Default::default()
71 })
72 .await?;
73
74 (self.on_emission)(self.output_bucket.clone(), key.clone())
76 .await
77 .expect("on_emission failed");
78 }
79
80 Ok(())
91 }
92}