1extern crate futures;
2extern crate rusoto_s3;
3extern crate rusoto_sqs;
4extern crate sqs_lambda;
5extern crate tokio;
6
7use std::error::Error;
8use std::io::Cursor;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use prost::Message;
12use rusoto_s3::S3Client;
13use rusoto_sqs::SqsClient;
14use serde::Deserialize;
15
16use async_trait::async_trait;
17use aws_lambda_events::event::s3::{
18 S3Bucket, S3Entity, S3Event, S3EventRecord, S3Object, S3RequestParameters, S3UserIdentity,
19};
20use chrono::Utc;
21use lambda_runtime::Context;
22use prost::bytes::Bytes;
23use rusoto_core::Region;
24use sqs_lambda::cache::{Cache, NopCache};
25use sqs_lambda::completion_event_serializer::CompletionEventSerializer;
26use sqs_lambda::error::Error as SqsLambdaError;
27use sqs_lambda::event_decoder::PayloadDecoder;
28use sqs_lambda::event_handler::{Completion, EventHandler, OutputEvent};
29use sqs_lambda::local_sqs_service::local_sqs_service;
30use std::fmt::Debug;
31use tracing_subscriber::EnvFilter;
32
33struct MyService<C>
34where
35 C: Cache + Clone + Send + Sync + 'static,
36{
37 cache: C,
38}
39
40impl<C> Clone for MyService<C>
41where
42 C: Cache + Clone + Send + Sync + 'static,
43{
44 fn clone(&self) -> MyService<C> {
45 Self {
46 cache: self.cache.clone(),
47 }
48 }
49}
50
51impl<C> MyService<C>
52where
53 C: Cache + Clone + Send + Sync + 'static,
54{
55 pub fn new(cache: C) -> Self {
56 Self { cache }
57 }
58}
59
60#[async_trait]
61impl<C> EventHandler for MyService<C>
62where
63 C: Cache + Clone + Send + Sync + 'static,
64{
65 type InputEvent = Vec<u8>;
66 type OutputEvent = Subgraph;
67 type Error = SqsLambdaError;
68
69 async fn handle_event(
70 &mut self,
71 _input: Self::InputEvent,
72 ) -> OutputEvent<Self::OutputEvent, Self::Error> {
73 let completed = OutputEvent::new(Completion::Total(Subgraph {}));
75
76 completed
81 }
82}
83
84#[derive(Clone, Debug)]
85pub struct Subgraph {}
86
87impl Subgraph {
88 fn merge(&mut self, _other: &Self) {
89 unimplemented!()
90 }
91
92 fn into_bytes(self) -> Vec<u8> {
93 unimplemented!()
94 }
95}
96
97#[derive(Clone, Debug)]
98pub struct SubgraphSerializer {}
99
100impl CompletionEventSerializer for SubgraphSerializer {
101 type CompletedEvent = Subgraph;
102 type Output = Vec<u8>;
103 type Error = SqsLambdaError;
104
105 fn serialize_completed_events(
106 &mut self,
107 completed_events: &[Self::CompletedEvent],
108 ) -> Result<Vec<Self::Output>, Self::Error> {
109 let mut subgraph = Subgraph {};
110 for sg in completed_events {
111 subgraph.merge(sg);
112 }
113
114 Ok(vec![])
116 }
117}
118
119#[derive(Clone)]
120pub struct ZstdProtoDecoder;
121
122impl<E> PayloadDecoder<E> for ZstdProtoDecoder
123where
124 E: Message + Default,
125{
126 fn decode(&mut self, body: Vec<u8>) -> Result<E, Box<dyn Error>>
127 where
128 E: Message + Default,
129 {
130 let mut decompressed = Vec::new();
131
132 let mut body = Cursor::new(&body);
133
134 zstd::stream::copy_decode(&mut body, &mut decompressed)?;
135
136 let buf = Bytes::from(decompressed);
137
138 Ok(E::decode(buf)?)
139 }
140}
141
142#[derive(Clone, Default)]
143pub struct ZstdDecoder {
144 pub buffer: Vec<u8>,
145}
146
147impl PayloadDecoder<Vec<u8>> for ZstdDecoder {
148 fn decode(&mut self, body: Vec<u8>) -> Result<Vec<u8>, Box<dyn Error>> {
149 self.buffer.clear();
150
151 let mut body = Cursor::new(&body);
152
153 zstd::stream::copy_decode(&mut body, &mut self.buffer)?;
154
155 Ok(self.buffer.clone())
156 }
157}
158
159#[derive(Clone, Default)]
160pub struct ZstdJsonDecoder {
161 pub buffer: Vec<u8>,
162}
163
164impl<E> PayloadDecoder<E> for ZstdJsonDecoder
165where
166 E: for<'a> Deserialize<'a>,
167{
168 fn decode(&mut self, body: Vec<u8>) -> Result<E, Box<dyn Error>> {
169 self.buffer.clear();
170
171 let mut body = Cursor::new(&body);
172
173 zstd::stream::copy_decode(&mut body, &mut self.buffer)?;
174
175 Ok(serde_json::from_slice(&self.buffer[..])?)
176 }
177}
178
179fn init_sqs_client() -> SqsClient {
180 SqsClient::new(Region::Custom {
181 name: "localsqs".to_string(),
182 endpoint: "http://localhost:9324".to_string(),
183 })
184}
185
186fn init_s3_client() -> S3Client {
187 S3Client::new(Region::Custom {
188 name: "locals3".to_string(),
189 endpoint: "http://localhost:4572".to_string(),
190 })
191}
192
193fn time_based_key_fn(_event: &[u8]) -> String {
194 let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
195 Ok(n) => n.as_millis(),
196 Err(_) => panic!("SystemTime before UNIX EPOCH!"),
197 };
198
199 let cur_day = cur_ms - (cur_ms % 86400);
200
201 format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
202}
203
204#[tokio::main]
219async fn main() -> Result<(), Box<dyn std::error::Error>> {
220 let filter = EnvFilter::from_default_env();
221 tracing_subscriber::fmt()
222 .with_env_filter(filter)
225 .init();
226
227 tracing::info!("Initializing service");
230 let service = MyService::new(NopCache {});
231
232 local_sqs_service(
233 "http://localhost:9324/queue/sysmon-graph-generator-queue",
234 "unid-subgraphs-generated",
235 Context {
236 deadline: Utc::now().timestamp_millis() + 300_000,
237 ..Default::default()
238 },
239 |_| init_s3_client(),
240 init_s3_client(),
241 init_sqs_client(),
242 ZstdJsonDecoder { buffer: vec![] },
243 SubgraphSerializer {},
244 service,
245 NopCache {},
246 |_, event_result| {
247 dbg!(event_result);
248 },
249 move |bucket, key| async move {
250 let _output_event = S3Event {
251 records: vec![S3EventRecord {
252 event_version: None,
253 event_source: None,
254 aws_region: None,
255 event_time: chrono::Utc::now(),
256 event_name: None,
257 principal_id: S3UserIdentity { principal_id: None },
258 request_parameters: S3RequestParameters {
259 source_ip_address: None,
260 },
261 response_elements: Default::default(),
262 s3: S3Entity {
263 schema_version: None,
264 configuration_id: None,
265 bucket: S3Bucket {
266 name: Some(bucket),
267 owner_identity: S3UserIdentity { principal_id: None },
268 arn: None,
269 },
270 object: S3Object {
271 key: Some(key),
272 size: 0,
273 url_decoded_key: None,
274 version_id: None,
275 e_tag: None,
276 sequencer: None,
277 },
278 },
279 }],
280 };
281
282 let _sqs_client = init_sqs_client();
283
284 Ok(())
295 },
296 )
297 .await;
298
299 Ok(())
300}