1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
use log::*; use rusoto_sqs::Message as SqsMessage; use rusoto_s3::{S3, GetObjectRequest}; use crate::event_decoder::PayloadDecoder; use std::marker::PhantomData; use aws_lambda_events::event::s3::S3Event; use futures::compat::Future01CompatExt; use std::error::Error; use std::io::Read; use async_trait::async_trait; use std::time::Duration; #[async_trait] pub trait PayloadRetriever<T> { async fn retrieve_event(&mut self, msg: &SqsMessage) -> Result<T, Box< dyn Error>>; } #[derive(Clone)] pub struct S3PayloadRetriever<S, D, E> where S: S3 + Clone + Send + Sync + 'static, D: PayloadDecoder<E> + Clone + Send + 'static, E: Send + 'static { s3: S, decoder: D, phantom: PhantomData<E> } impl<S, D, E> S3PayloadRetriever<S, D, E> where S: S3 + Clone + Send + Sync + 'static, D: PayloadDecoder<E> + Clone + Send + 'static, E: Send + 'static { pub fn new(s3: S, decoder: D) -> Self { Self {s3, decoder, phantom: PhantomData} } } #[async_trait] impl<S, D, E> PayloadRetriever<E> for S3PayloadRetriever<S, D, E> where S: S3 + Clone + Send + Sync + 'static, D: PayloadDecoder<E> + Clone + Send + 'static, E: Send + 'static { async fn retrieve_event(&mut self, msg: &SqsMessage) -> Result<E, Box<dyn Error>> { let body = msg.body.as_ref().unwrap(); info!("Got body from message: {}", body); let event: S3Event = serde_json::from_str(body)?; let record = &event.records[0].s3; let s3_data = self.s3.get_object( GetObjectRequest { bucket: record.bucket.name.clone().unwrap(), key: record.object.key.clone().unwrap(), ..Default::default() } ).with_timeout(Duration::from_secs(2)).compat().await?; let prealloc = if record.object.size < 1024 { 1024 } else { record.object.size as usize }; info!("Retrieved s3 payload with size : {:?}", prealloc); let mut body = Vec::with_capacity(prealloc); s3_data.body.unwrap().into_async_read().read_to_end(&mut body)?; info!("Read s3 payload body"); self.decoder.decode(body) } }