1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::error::Error;
use std::io::Read;
use std::marker::PhantomData;
use std::time::Duration;
use aws_lambda_events::event::s3::S3Event;
use futures::compat::Future01CompatExt;
use log::*;
use rusoto_s3::{GetObjectRequest, S3};
use rusoto_sqs::Message as SqsMessage;
use tokio::prelude::*;
use async_trait::async_trait;
use crate::event_decoder::PayloadDecoder;
use rusoto_core::RusotoError;
#[async_trait]
pub trait PayloadRetriever<T> {
type Message;
async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<T, Box<dyn Error>>;
}
#[derive(Clone)]
pub struct S3PayloadRetriever<S, D, E>
where S: S3 + Clone + Send + Sync + 'static,
D: PayloadDecoder<E> + Clone + Send + 'static,
E: Send + 'static
{
s3: S,
decoder: D,
phantom: PhantomData<E>,
}
impl<S, D, E> S3PayloadRetriever<S, D, E>
where S: S3 + Clone + Send + Sync + 'static,
D: PayloadDecoder<E> + Clone + Send + 'static,
E: Send + 'static
{
pub fn new(s3: S, decoder: D) -> Self {
Self { s3, decoder, phantom: PhantomData }
}
}
#[async_trait]
impl<S, D, E> PayloadRetriever<E> for S3PayloadRetriever<S, D, E>
where S: S3 + Clone + Send + Sync + 'static,
D: PayloadDecoder<E> + Clone + Send + 'static,
E: Send + 'static
{
type Message = SqsMessage;
async fn retrieve_event(&mut self, msg: &Self::Message) -> Result<E, Box<dyn Error>> {
let body = msg.body.as_ref().unwrap();
info!("Got body from message: {}", body);
let event: serde_json::Value = serde_json::from_str(body)?;
let record = &event["Records"][0]["s3"];
let bucket = record["bucket"]["name"].as_str().expect("bucket name");
let key = record["object"]["key"].as_str().expect("object key");
println!("{}/{}", bucket, key);
let s3_data = self.s3.get_object(
GetObjectRequest {
bucket: bucket.to_string(),
key: key.to_string(),
..Default::default()
}
);
let s3_data = tokio::time::timeout(
Duration::from_secs(5),
s3_data,
).await??;
let object_size = record["object"]["size"].as_u64().unwrap_or_default();
let prealloc = if object_size < 1024 {
1024
} else {
object_size as usize
};
info!("Retrieved s3 payload with size : {:?}", prealloc);
let mut body = Vec::with_capacity(prealloc);
s3_data.body.expect("Missing S3 body").into_async_read().read_to_end(&mut body).await?;
info!("Read s3 payload body");
self.decoder.decode(body)
}
}