1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
use std::error::Error;
use std::io::Read;
use std::marker::PhantomData;
use std::time::Duration;

use aws_lambda_events::event::s3::S3Event;
use futures::compat::Future01CompatExt;
use log::*;
use rusoto_s3::{GetObjectRequest, S3};
use rusoto_sqs::Message as SqsMessage;
use tokio::prelude::*;

use async_trait::async_trait;

use crate::event_decoder::PayloadDecoder;
use rusoto_core::RusotoError;

#[async_trait]
pub trait PayloadRetriever<T> {
    type Message;
    async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<T, Box<dyn Error>>;
}

#[derive(Clone)]
pub struct S3PayloadRetriever<S, D, E>
where
    S: S3 + Clone + Send + Sync + 'static,
    D: PayloadDecoder<E> + Clone + Send + 'static,
    E: Send + 'static,
{
    s3: S,
    decoder: D,
    phantom: PhantomData<E>,
}

impl<S, D, E> S3PayloadRetriever<S, D, E>
where
    S: S3 + Clone + Send + Sync + 'static,
    D: PayloadDecoder<E> + Clone + Send + 'static,
    E: Send + 'static,
{
    pub fn new(s3: S, decoder: D) -> Self {
        Self {
            s3,
            decoder,
            phantom: PhantomData,
        }
    }
}

#[async_trait]
impl<S, D, E> PayloadRetriever<E> for S3PayloadRetriever<S, D, E>
where
    S: S3 + Clone + Send + Sync + 'static,
    D: PayloadDecoder<E> + Clone + Send + 'static,
    E: Send + 'static,
{
    type Message = SqsMessage;
    async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<E, Box<dyn Error>> {
        let body = msg.body.as_ref().unwrap();
        info!("Got body from message: {}", body);
        let event: serde_json::Value = serde_json::from_str(body)?;

        let record = &event["Records"][0]["s3"];
        // let record = &event.records[0].s3;

        let bucket = record["bucket"]["name"].as_str().expect("bucket name");
        let key = record["object"]["key"].as_str().expect("object key");

        println!("{}/{}", bucket, key);

        let s3_data = self.s3.get_object(GetObjectRequest {
            bucket: bucket.to_string(),
            key: key.to_string(),
            ..Default::default()
        });

        let s3_data = tokio::time::timeout(Duration::from_secs(5), s3_data).await??;
        // .with_timeout(Duration::from_secs(2)).compat().await?;

        let object_size = record["object"]["size"].as_u64().unwrap_or_default();
        let prealloc = if object_size < 1024 {
            1024
        } else {
            object_size as usize
        };

        info!("Retrieved s3 payload with size : {:?}", prealloc);

        let mut body = Vec::with_capacity(prealloc);

        s3_data
            .body
            .expect("Missing S3 body")
            .into_async_read()
            .read_to_end(&mut body)
            .await?;

        info!("Read s3 payload body");
        self.decoder.decode(body)
    }
}