crypto_auditing/event_broker/
client.rs1use crate::event_broker::{SOCKET_PATH, error::Result};
5use crate::types::EventGroup;
6use futures::{
7 SinkExt, TryStreamExt,
8 future::{self, AbortHandle},
9 stream::Stream,
10};
11use std::path::{Path, PathBuf};
12use tokio::net::UnixStream;
13use tokio::sync::mpsc::{self, Receiver, Sender};
14use tokio_serde::{SymmetricallyFramed, formats::SymmetricalCbor};
15use tokio_stream::wrappers::ReceiverStream;
16use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
17use tracing::info;
18
19#[derive(Clone, Debug)]
20struct ClientInner {
21 scopes: Vec<String>,
22 sender: Sender<EventGroup>,
23}
24
25pub struct Client {
51 inner: ClientInner,
52 address: PathBuf,
53 receiver: Receiver<EventGroup>,
54}
55
56pub struct ClientHandle(AbortHandle);
59
60impl Drop for ClientHandle {
61 fn drop(&mut self) {
62 self.0.abort();
63 }
64}
65
66impl Client {
67 pub fn new() -> Self {
69 let (tx, rx) = mpsc::channel::<EventGroup>(10);
70
71 Self {
72 inner: ClientInner {
73 scopes: Vec::new(),
74 sender: tx,
75 },
76 address: SOCKET_PATH.into(),
77 receiver: rx,
78 }
79 }
80
81 pub fn address(mut self, address: impl AsRef<Path>) -> Self {
83 self.address = address.as_ref().to_owned();
84 self
85 }
86
87 pub fn scopes(mut self, scopes: &Vec<String>) -> Self {
89 self.inner.scopes = scopes.to_owned();
90 self
91 }
92
93 pub async fn start(self) -> Result<(ClientHandle, impl Stream<Item = EventGroup>)> {
98 let stream = UnixStream::connect(&self.address).await?;
99 let local_addr = stream.local_addr()?;
100
101 let (de, ser) = stream.into_split();
102
103 let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
104 let de = FramedRead::new(de, LengthDelimitedCodec::new());
105
106 let mut ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<Vec<String>>::default());
107 let mut de = SymmetricallyFramed::new(de, SymmetricalCbor::<EventGroup>::default());
108
109 let inner = self.inner.clone();
110 let (handler, abort_handle) = future::abortable(async move {
111 if let Err(e) = ser.send(inner.scopes).await {
112 info!(error = %e,
113 "unable to send subscription request");
114 }
115 loop {
116 let group = match de.try_next().await {
117 Ok(group) => group,
118 Err(e) => {
119 info!(error = %e,
120 "unable to deserialize event");
121 break;
122 }
123 };
124
125 if let Some(group) = group
126 && let Err(e) = inner.sender.send(group).await
127 {
128 info!(error = %e,
129 "unable to send event");
130 break;
131 }
132 }
133 });
134 tokio::spawn(async move {
135 match handler.await {
136 Ok(()) | Err(future::Aborted) => info!(?local_addr, "client shutdown."),
137 }
138 });
139 Ok((
140 ClientHandle(abort_handle),
141 ReceiverStream::new(self.receiver),
142 ))
143 }
144}
145
146impl Default for Client {
147 fn default() -> Self {
148 Self::new()
149 }
150}