1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::error::Error;
use std::io::Read;
use std::marker::PhantomData;
use std::time::Duration;

use aws_lambda_events::event::s3::S3Event;
use futures::compat::Future01CompatExt;
use log::*;
use rusoto_s3::{GetObjectRequest, S3};
use rusoto_sqs::Message as SqsMessage;
use tokio::prelude::*;

use async_trait::async_trait;

use crate::event_decoder::PayloadDecoder;
use rusoto_core::RusotoError;

#[async_trait]
pub trait PayloadRetriever<T> {
    type Message;
    async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<T, Box<dyn Error>>;
}


#[derive(Clone)]
pub struct S3PayloadRetriever<S, D, E>
    where S: S3 + Clone + Send + Sync + 'static,
          D: PayloadDecoder<E> + Clone + Send + 'static,
          E: Send + 'static
{
    s3: S,
    decoder: D,
    phantom: PhantomData<E>,
}

impl<S, D, E> S3PayloadRetriever<S, D, E>
    where S: S3 + Clone + Send + Sync + 'static,
          D: PayloadDecoder<E> + Clone + Send + 'static,
          E: Send + 'static
{
    pub fn new(s3: S, decoder: D) -> Self {
        Self { s3, decoder, phantom: PhantomData }
    }
}

#[async_trait]
impl<S, D, E> PayloadRetriever<E> for S3PayloadRetriever<S, D, E>
    where S: S3 + Clone + Send + Sync + 'static,
          D: PayloadDecoder<E> + Clone + Send + 'static,
          E: Send + 'static
{
    type Message = SqsMessage;
    async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<E, Box<dyn Error>> {
        let body = msg.body.as_ref().unwrap();
        info!("Got body from message: {}", body);
        let event: serde_json::Value = serde_json::from_str(body)?;

        let record = &event["Records"][0]["s3"];
        // let record = &event.records[0].s3;

        let bucket = record["bucket"]["name"].as_str().expect("bucket name");
        let key = record["object"]["key"].as_str().expect("object key");

        println!("{}/{}", bucket, key);

        let s3_data = self.s3.get_object(
            GetObjectRequest {
                bucket: bucket.to_string(),
                key: key.to_string(),
                ..Default::default()
            }
        );

        let s3_data = tokio::time::timeout(
            Duration::from_secs(5),
            s3_data,
        ).await??;
        // .with_timeout(Duration::from_secs(2)).compat().await?;

        let object_size = record["object"]["size"].as_u64().unwrap_or_default();
        let prealloc = if object_size < 1024 {
            1024
        } else {
            object_size as usize
        };

        info!("Retrieved s3 payload with size : {:?}", prealloc);

        let mut body = Vec::with_capacity(prealloc);

        s3_data.body.expect("Missing S3 body").into_async_read().read_to_end(&mut body).await?;

        info!("Read s3 payload body");
        self.decoder.decode(body)
    }
}