1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
use async_trait::async_trait; use rusoto_s3::{S3, PutObjectRequest}; use futures::compat::Future01CompatExt; use std::error::Error; #[async_trait] pub trait EventEmitter { type Event; type Error: std::fmt::Debug; async fn emit_event(&mut self, completed_events: Self::Event) -> Result<(), Self::Error>; } #[derive(Clone)] pub struct S3EventEmitter<S, F> where S: S3 + Send + 'static, F: Fn(&[u8]) -> String, { s3: S, output_bucket: String, key_fn: F } impl<S, F> S3EventEmitter<S, F> where S: S3 + Send + 'static, F: Fn(&[u8]) -> String, { pub fn new(s3: S, output_bucket: impl Into<String>, key_fn: F) -> Self { let output_bucket = output_bucket.into(); Self { s3, output_bucket, key_fn } } } #[async_trait] impl<S, F> EventEmitter for S3EventEmitter<S, F> where S: S3 + Send + Sync + 'static, F: Fn(&[u8]) -> String + Send, { type Event = Vec<u8>; type Error = Box<dyn Error>; async fn emit_event(&mut self, event: Self::Event) -> Result<(), Self::Error> { let key = (self.key_fn)(&event); self.s3.put_object( PutObjectRequest { body: Some(event.into()), bucket: "".to_string(), key, ..Default::default() } ).compat().await?; Ok(()) } }