waypoint/processor/
print.rs1use crate::{
2 hub::subscriber::{PostProcessHandler, PreProcessHandler},
3 processor::{consumer::EventProcessor, format::format_message},
4 proto::{HubEvent, HubEventType, hub_event::Body},
5};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11#[derive(Clone)]
12pub struct PrintProcessor {
13 _resources: Arc<super::AppResources>, }
16
17impl PrintProcessor {
18 pub fn new(resources: Arc<super::AppResources>) -> Self {
19 Self { _resources: resources }
20 }
21
22 pub fn create_handlers(
23 _processor: Arc<Self>,
24 ) -> (Option<PreProcessHandler>, Option<PostProcessHandler>) {
25 let pre_process = Arc::new(move |events: &[HubEvent], _: &[Vec<u8>]| {
26 let events = events.to_owned();
27 Box::pin(async move {
28 let mut results = Vec::with_capacity(events.len());
29 for event in events {
30 let msg = match TryFrom::try_from(event.r#type).ok() {
31 Some(HubEventType::MergeMessage) => {
32 if let Some(Body::MergeMessageBody(body)) = event.body {
33 body.message
34 } else {
35 None
36 }
37 },
38 Some(HubEventType::PruneMessage) => {
39 if let Some(Body::PruneMessageBody(body)) = event.body {
40 body.message
41 } else {
42 None
43 }
44 },
45 Some(HubEventType::RevokeMessage) => {
46 if let Some(Body::RevokeMessageBody(body)) = event.body {
47 body.message
48 } else {
49 None
50 }
51 },
52 _ => None,
53 };
54
55 if let Some(msg) = msg {
56 debug!("Pre-processing message: {}", format_message(&msg));
57 results.push(false);
58 } else {
59 results.push(true);
60 }
61 }
62 results
63 }) as BoxFuture<'static, Vec<bool>>
64 });
65
66 let post_process = Arc::new(move |events: &[HubEvent], _: &[Vec<u8>]| {
67 let events_len = events.len();
68 Box::pin(async move {
69 info!("Processed batch of {} messages", events_len);
70 }) as BoxFuture<'static, ()>
71 });
72
73 (Some(pre_process), Some(post_process))
74 }
75}
76
77#[async_trait]
78impl EventProcessor for PrintProcessor {
79 async fn process_event(
80 &self,
81 event: HubEvent,
82 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
83 match &event.body {
84 Some(Body::MergeMessageBody(body)) => {
85 if let Some(msg) = &body.message {
86 info!("merge: {}", format_message(msg));
87 }
88 },
89 Some(Body::PruneMessageBody(body)) => {
90 if let Some(msg) = &body.message {
91 info!("prune: {}", format_message(msg));
92 }
93 },
94 Some(Body::RevokeMessageBody(body)) => {
95 if let Some(msg) = &body.message {
96 info!("revoke: {}", format_message(msg));
97 }
98 },
99 _ => {},
100 }
101 Ok(())
102 }
103}