1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
// SPDX-License-Identifier: GPL-3.0-or-later
// Copyright (C) 2022-2023 The crypto-auditing developers.
use crate::event_broker::{error::Result, SOCKET_PATH};
use crate::types::EventGroup;
use futures::{
future::{self, AbortHandle},
stream::Stream,
SinkExt, TryStreamExt,
};
use std::path::{Path, PathBuf};
use tokio::net::UnixStream;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::info;
#[derive(Clone, Debug)]
struct ClientInner {
scopes: Vec<String>,
sender: Sender<EventGroup>,
}
/// A client to the event broker service
///
/// # Examples
///
/// ```no_run
/// use crypto_auditing::event_broker::Client;
/// use futures::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let client = Client::new().scopes(&vec!["tcp".to_string()]);
///
/// let (_handle, mut reader) = client.start().await?;
///
/// tokio::spawn(async move {
/// while let Some(event) = reader.next().await {
/// println!("{:?}", &event);
/// }
/// });
///
/// tokio::signal::ctrl_c().await?;
///
/// Ok(())
/// }
/// ```
pub struct Client {
inner: ClientInner,
address: PathBuf,
receiver: Receiver<EventGroup>,
}
/// A handle for the client connection, which will be aborted once
/// the ownership is dropped
pub struct ClientHandle(AbortHandle);
impl Drop for ClientHandle {
fn drop(&mut self) {
self.0.abort();
}
}
impl Client {
/// Returns a new [`Client`]
pub fn new() -> Self {
let (tx, rx) = mpsc::channel::<EventGroup>(10);
Self {
inner: ClientInner {
scopes: Vec::new(),
sender: tx,
},
address: SOCKET_PATH.into(),
receiver: rx,
}
}
/// Sets the Unix domain address of event broker
pub fn address(mut self, address: impl AsRef<Path>) -> Self {
self.address = address.as_ref().to_owned();
self
}
/// Sets the scopes to restrict matches of events
pub fn scopes(mut self, scopes: &Vec<String>) -> Self {
self.inner.scopes = scopes.to_owned();
self
}
/// Starts driving the client connection.
///
/// This returns a tuple consisting a [`ClientHandle`] and a [`Stream`]
/// which generates a sequence of event groups.
pub async fn start(self) -> Result<(ClientHandle, impl Stream<Item = EventGroup>)> {
let stream = UnixStream::connect(&self.address).await?;
let local_addr = stream.local_addr()?;
let (de, ser) = stream.into_split();
let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
let de = FramedRead::new(de, LengthDelimitedCodec::new());
let mut ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<Vec<String>>::default());
let mut de = SymmetricallyFramed::new(de, SymmetricalCbor::<EventGroup>::default());
let inner = self.inner.clone();
let (handler, abort_handle) = future::abortable(async move {
if let Err(e) = ser.send(inner.scopes).await {
info!(error = %e,
"unable to send subscription request");
}
loop {
let group = match de.try_next().await {
Ok(group) => group,
Err(e) => {
info!(error = %e,
"unable to deserialize event");
break;
}
};
if let Some(group) = group {
if let Err(e) = inner.sender.send(group).await {
info!(error = %e,
"unable to send event");
break;
}
}
}
});
tokio::spawn(async move {
match handler.await {
Ok(()) | Err(future::Aborted) => info!(?local_addr, "client shutdown."),
}
});
Ok((
ClientHandle(abort_handle),
ReceiverStream::new(self.receiver),
))
}
}
impl Default for Client {
fn default() -> Self {
Self::new()
}
}