crypto_auditing/event_broker/
client.rs

1// SPDX-License-Identifier: GPL-3.0-or-later
2// Copyright (C) 2022-2023 The crypto-auditing developers.
3
4use crate::event_broker::{SOCKET_PATH, error::Result};
5use crate::types::EventGroup;
6use futures::{
7    SinkExt, TryStreamExt,
8    future::{self, AbortHandle},
9    stream::Stream,
10};
11use std::path::{Path, PathBuf};
12use tokio::net::UnixStream;
13use tokio::sync::mpsc::{self, Receiver, Sender};
14use tokio_serde::{SymmetricallyFramed, formats::SymmetricalCbor};
15use tokio_stream::wrappers::ReceiverStream;
16use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
17use tracing::info;
18
19#[derive(Clone, Debug)]
20struct ClientInner {
21    scopes: Vec<String>,
22    sender: Sender<EventGroup>,
23}
24
25/// A client to the event broker service
26///
27/// # Examples
28///
29/// ```no_run
30/// use crypto_auditing::event_broker::Client;
31/// use futures::stream::StreamExt;
32///
33/// #[tokio::main]
34/// async fn main() -> anyhow::Result<()> {
35///     let client = Client::new().scopes(&vec!["tcp".to_string()]);
36///
37///     let (_handle, mut reader) = client.start().await?;
38///
39///     tokio::spawn(async move {
40///         while let Some(event) = reader.next().await {
41///             println!("{:?}", &event);
42///         }
43///     });
44///
45///     tokio::signal::ctrl_c().await?;
46///
47///     Ok(())
48/// }
49/// ```
50pub struct Client {
51    inner: ClientInner,
52    address: PathBuf,
53    receiver: Receiver<EventGroup>,
54}
55
56/// A handle for the client connection, which will be aborted once
57/// the ownership is dropped
58pub struct ClientHandle(AbortHandle);
59
60impl Drop for ClientHandle {
61    fn drop(&mut self) {
62        self.0.abort();
63    }
64}
65
66impl Client {
67    /// Returns a new [`Client`]
68    pub fn new() -> Self {
69        let (tx, rx) = mpsc::channel::<EventGroup>(10);
70
71        Self {
72            inner: ClientInner {
73                scopes: Vec::new(),
74                sender: tx,
75            },
76            address: SOCKET_PATH.into(),
77            receiver: rx,
78        }
79    }
80
81    /// Sets the Unix domain address of event broker
82    pub fn address(mut self, address: impl AsRef<Path>) -> Self {
83        self.address = address.as_ref().to_owned();
84        self
85    }
86
87    /// Sets the scopes to restrict matches of events
88    pub fn scopes(mut self, scopes: &Vec<String>) -> Self {
89        self.inner.scopes = scopes.to_owned();
90        self
91    }
92
93    /// Starts driving the client connection.
94    ///
95    /// This returns a tuple consisting a [`ClientHandle`] and a [`Stream`]
96    /// which generates a sequence of event groups.
97    pub async fn start(self) -> Result<(ClientHandle, impl Stream<Item = EventGroup>)> {
98        let stream = UnixStream::connect(&self.address).await?;
99        let local_addr = stream.local_addr()?;
100
101        let (de, ser) = stream.into_split();
102
103        let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
104        let de = FramedRead::new(de, LengthDelimitedCodec::new());
105
106        let mut ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<Vec<String>>::default());
107        let mut de = SymmetricallyFramed::new(de, SymmetricalCbor::<EventGroup>::default());
108
109        let inner = self.inner.clone();
110        let (handler, abort_handle) = future::abortable(async move {
111            if let Err(e) = ser.send(inner.scopes).await {
112                info!(error = %e,
113                          "unable to send subscription request");
114            }
115            loop {
116                let group = match de.try_next().await {
117                    Ok(group) => group,
118                    Err(e) => {
119                        info!(error = %e,
120                                  "unable to deserialize event");
121                        break;
122                    }
123                };
124
125                if let Some(group) = group
126                    && let Err(e) = inner.sender.send(group).await
127                {
128                    info!(error = %e,
129                                  "unable to send event");
130                    break;
131                }
132            }
133        });
134        tokio::spawn(async move {
135            match handler.await {
136                Ok(()) | Err(future::Aborted) => info!(?local_addr, "client shutdown."),
137            }
138        });
139        Ok((
140            ClientHandle(abort_handle),
141            ReceiverStream::new(self.receiver),
142        ))
143    }
144}
145
146impl Default for Client {
147    fn default() -> Self {
148        Self::new()
149    }
150}