Skip to main content

waypoint/processor/
print.rs

1use crate::{
2    hub::subscriber::{PostProcessHandler, PreProcessHandler},
3    processor::{consumer::EventProcessor, format::format_message},
4    proto::{HubEvent, HubEventType, hub_event::Body},
5};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use std::sync::Arc;
9use tracing::{debug, info};
10
11#[derive(Clone)]
12pub struct PrintProcessor {
13    _resources: Arc<super::AppResources>, /* Prefixed with underscore to indicate intentionally
14                                           * unused */
15}
16
17impl PrintProcessor {
18    pub fn new(resources: Arc<super::AppResources>) -> Self {
19        Self { _resources: resources }
20    }
21
22    pub fn create_handlers(
23        _processor: Arc<Self>,
24    ) -> (Option<PreProcessHandler>, Option<PostProcessHandler>) {
25        let pre_process = Arc::new(move |events: &[HubEvent], _: &[Vec<u8>]| {
26            let events = events.to_owned();
27            Box::pin(async move {
28                let mut results = Vec::with_capacity(events.len());
29                for event in events {
30                    let msg = match TryFrom::try_from(event.r#type).ok() {
31                        Some(HubEventType::MergeMessage) => {
32                            if let Some(Body::MergeMessageBody(body)) = event.body {
33                                body.message
34                            } else {
35                                None
36                            }
37                        },
38                        Some(HubEventType::PruneMessage) => {
39                            if let Some(Body::PruneMessageBody(body)) = event.body {
40                                body.message
41                            } else {
42                                None
43                            }
44                        },
45                        Some(HubEventType::RevokeMessage) => {
46                            if let Some(Body::RevokeMessageBody(body)) = event.body {
47                                body.message
48                            } else {
49                                None
50                            }
51                        },
52                        _ => None,
53                    };
54
55                    if let Some(msg) = msg {
56                        debug!("Pre-processing message: {}", format_message(&msg));
57                        results.push(false);
58                    } else {
59                        results.push(true);
60                    }
61                }
62                results
63            }) as BoxFuture<'static, Vec<bool>>
64        });
65
66        let post_process = Arc::new(move |events: &[HubEvent], _: &[Vec<u8>]| {
67            let events_len = events.len();
68            Box::pin(async move {
69                info!("Processed batch of {} messages", events_len);
70            }) as BoxFuture<'static, ()>
71        });
72
73        (Some(pre_process), Some(post_process))
74    }
75}
76
77#[async_trait]
78impl EventProcessor for PrintProcessor {
79    async fn process_event(
80        &self,
81        event: HubEvent,
82    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
83        match &event.body {
84            Some(Body::MergeMessageBody(body)) => {
85                if let Some(msg) = &body.message {
86                    info!("merge: {}", format_message(msg));
87                }
88            },
89            Some(Body::PruneMessageBody(body)) => {
90                if let Some(msg) = &body.message {
91                    info!("prune: {}", format_message(msg));
92                }
93            },
94            Some(Body::RevokeMessageBody(body)) => {
95                if let Some(msg) = &body.message {
96                    info!("revoke: {}", format_message(msg));
97                }
98            },
99            _ => {},
100        }
101        Ok(())
102    }
103}