merging_example/
merging_example.rs

1extern crate futures;
2extern crate rusoto_s3;
3extern crate rusoto_sqs;
4extern crate sqs_lambda;
5extern crate tokio;
6
7use std::error::Error;
8use std::io::Cursor;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use prost::Message;
12use rusoto_s3::S3Client;
13use rusoto_sqs::SqsClient;
14use serde::Deserialize;
15
16use async_trait::async_trait;
17use aws_lambda_events::event::s3::{
18    S3Bucket, S3Entity, S3Event, S3EventRecord, S3Object, S3RequestParameters, S3UserIdentity,
19};
20use chrono::Utc;
21use lambda_runtime::Context;
22use prost::bytes::Bytes;
23use rusoto_core::Region;
24use sqs_lambda::cache::{Cache, NopCache};
25use sqs_lambda::completion_event_serializer::CompletionEventSerializer;
26use sqs_lambda::error::Error as SqsLambdaError;
27use sqs_lambda::event_decoder::PayloadDecoder;
28use sqs_lambda::event_handler::{Completion, EventHandler, OutputEvent};
29use sqs_lambda::local_sqs_service::local_sqs_service;
30use std::fmt::Debug;
31use tracing_subscriber::EnvFilter;
32
33struct MyService<C>
34where
35    C: Cache + Clone + Send + Sync + 'static,
36{
37    cache: C,
38}
39
40impl<C> Clone for MyService<C>
41where
42    C: Cache + Clone + Send + Sync + 'static,
43{
44    fn clone(&self) -> MyService<C> {
45        Self {
46            cache: self.cache.clone(),
47        }
48    }
49}
50
51impl<C> MyService<C>
52where
53    C: Cache + Clone + Send + Sync + 'static,
54{
55    pub fn new(cache: C) -> Self {
56        Self { cache }
57    }
58}
59
60#[async_trait]
61impl<C> EventHandler for MyService<C>
62where
63    C: Cache + Clone + Send + Sync + 'static,
64{
65    type InputEvent = Vec<u8>;
66    type OutputEvent = Subgraph;
67    type Error = SqsLambdaError;
68
69    async fn handle_event(
70        &mut self,
71        _input: Self::InputEvent,
72    ) -> OutputEvent<Self::OutputEvent, Self::Error> {
73        // do some work
74        let completed = OutputEvent::new(Completion::Total(Subgraph {}));
75
76        // for input in _input.keys() {
77        //     completed.add_identity(input);
78        // }
79
80        completed
81    }
82}
83
84#[derive(Clone, Debug)]
85pub struct Subgraph {}
86
87impl Subgraph {
88    fn merge(&mut self, _other: &Self) {
89        unimplemented!()
90    }
91
92    fn into_bytes(self) -> Vec<u8> {
93        unimplemented!()
94    }
95}
96
97#[derive(Clone, Debug)]
98pub struct SubgraphSerializer {}
99
100impl CompletionEventSerializer for SubgraphSerializer {
101    type CompletedEvent = Subgraph;
102    type Output = Vec<u8>;
103    type Error = SqsLambdaError;
104
105    fn serialize_completed_events(
106        &mut self,
107        completed_events: &[Self::CompletedEvent],
108    ) -> Result<Vec<Self::Output>, Self::Error> {
109        let mut subgraph = Subgraph {};
110        for sg in completed_events {
111            subgraph.merge(sg);
112        }
113
114        //        subgraph.into_bytes()
115        Ok(vec![])
116    }
117}
118
119#[derive(Clone)]
120pub struct ZstdProtoDecoder;
121
122impl<E> PayloadDecoder<E> for ZstdProtoDecoder
123where
124    E: Message + Default,
125{
126    fn decode(&mut self, body: Vec<u8>) -> Result<E, Box<dyn Error>>
127    where
128        E: Message + Default,
129    {
130        let mut decompressed = Vec::new();
131
132        let mut body = Cursor::new(&body);
133
134        zstd::stream::copy_decode(&mut body, &mut decompressed)?;
135
136        let buf = Bytes::from(decompressed);
137
138        Ok(E::decode(buf)?)
139    }
140}
141
142#[derive(Clone, Default)]
143pub struct ZstdDecoder {
144    pub buffer: Vec<u8>,
145}
146
147impl PayloadDecoder<Vec<u8>> for ZstdDecoder {
148    fn decode(&mut self, body: Vec<u8>) -> Result<Vec<u8>, Box<dyn Error>> {
149        self.buffer.clear();
150
151        let mut body = Cursor::new(&body);
152
153        zstd::stream::copy_decode(&mut body, &mut self.buffer)?;
154
155        Ok(self.buffer.clone())
156    }
157}
158
159#[derive(Clone, Default)]
160pub struct ZstdJsonDecoder {
161    pub buffer: Vec<u8>,
162}
163
164impl<E> PayloadDecoder<E> for ZstdJsonDecoder
165where
166    E: for<'a> Deserialize<'a>,
167{
168    fn decode(&mut self, body: Vec<u8>) -> Result<E, Box<dyn Error>> {
169        self.buffer.clear();
170
171        let mut body = Cursor::new(&body);
172
173        zstd::stream::copy_decode(&mut body, &mut self.buffer)?;
174
175        Ok(serde_json::from_slice(&self.buffer[..])?)
176    }
177}
178
179fn init_sqs_client() -> SqsClient {
180    SqsClient::new(Region::Custom {
181        name: "localsqs".to_string(),
182        endpoint: "http://localhost:9324".to_string(),
183    })
184}
185
186fn init_s3_client() -> S3Client {
187    S3Client::new(Region::Custom {
188        name: "locals3".to_string(),
189        endpoint: "http://localhost:4572".to_string(),
190    })
191}
192
193fn time_based_key_fn(_event: &[u8]) -> String {
194    let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
195        Ok(n) => n.as_millis(),
196        Err(_) => panic!("SystemTime before UNIX EPOCH!"),
197    };
198
199    let cur_day = cur_ms - (cur_ms % 86400);
200
201    format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
202}
203
204// #[tokio::main]
205// async fn main() -> Result<(), Box<dyn std::error::Error>> {
206//     simple_logger::init().unwrap();
207//     let service: MyService<_, SqsLambdaError<()>> = MyService::new(NopCache {});
208//
209//     local_service(
210//         "input-dir",
211//         "output-dir",
212//         SubgraphSerializer {},
213//         ZstdJsonDecoder { buffer: vec![] },
214//         service,
215//     ).await
216// }
217
218#[tokio::main]
219async fn main() -> Result<(), Box<dyn std::error::Error>> {
220    let filter = EnvFilter::from_default_env();
221    tracing_subscriber::fmt()
222        // .json()
223        // .with_max_level(Level::DEBUG)
224        .with_env_filter(filter)
225        .init();
226
227    // simple_logger::init_with_level(Level::Info).unwrap();
228    // let cache = RedisCache::new("address".to_owned()).await.expect("Could not create redis client");
229    tracing::info!("Initializing service");
230    let service = MyService::new(NopCache {});
231
232    local_sqs_service(
233        "http://localhost:9324/queue/sysmon-graph-generator-queue",
234        "unid-subgraphs-generated",
235        Context {
236            deadline: Utc::now().timestamp_millis() + 300_000,
237            ..Default::default()
238        },
239        |_| init_s3_client(),
240        init_s3_client(),
241        init_sqs_client(),
242        ZstdJsonDecoder { buffer: vec![] },
243        SubgraphSerializer {},
244        service,
245        NopCache {},
246        |_, event_result| {
247            dbg!(event_result);
248        },
249        move |bucket, key| async move {
250            let _output_event = S3Event {
251                records: vec![S3EventRecord {
252                    event_version: None,
253                    event_source: None,
254                    aws_region: None,
255                    event_time: chrono::Utc::now(),
256                    event_name: None,
257                    principal_id: S3UserIdentity { principal_id: None },
258                    request_parameters: S3RequestParameters {
259                        source_ip_address: None,
260                    },
261                    response_elements: Default::default(),
262                    s3: S3Entity {
263                        schema_version: None,
264                        configuration_id: None,
265                        bucket: S3Bucket {
266                            name: Some(bucket),
267                            owner_identity: S3UserIdentity { principal_id: None },
268                            arn: None,
269                        },
270                        object: S3Object {
271                            key: Some(key),
272                            size: 0,
273                            url_decoded_key: None,
274                            version_id: None,
275                            e_tag: None,
276                            sequencer: None,
277                        },
278                    },
279                }],
280            };
281
282            let _sqs_client = init_sqs_client();
283
284            // publish to SQS
285            // sqs_client.send_message(
286            //     SendMessageRequest {
287            //         message_body: serde_json::to_string(&output_event)
288            //             .expect("failed to encode s3 event"),
289            //         queue_url: "http://localhost:9324/queue/node-identifier-retry-queue".to_string(),
290            //         ..Default::default()
291            //     }
292            // ).await;
293
294            Ok(())
295        },
296    )
297    .await;
298
299    Ok(())
300}