Skip to main content

zelos_trace/
sink.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::RwLock;
6use zelos_trace_types::ipc::{IpcMessageWithId, Receiver, Sender};
7
8use crate::{filter::Filter, router::DEFAULT_CHANNEL_SIZE};
9
10#[async_trait]
11pub(crate) trait TraceSinkHandle: Send + Sync {
12    async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()>;
13}
14
15/// The handle for a trace sink that has filters
16pub(crate) struct TraceSinkHandleFiltered {
17    pub sender: Sender,
18    pub filters: Arc<RwLock<Vec<Filter>>>,
19}
20
21#[async_trait]
22impl TraceSinkHandle for TraceSinkHandleFiltered {
23    async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()> {
24        for filter in self.filters.read().await.iter() {
25            if filter.matches(msg) {
26                self.sender.try_send(msg.clone())?;
27                continue;
28            }
29        }
30        Ok(())
31    }
32}
33
34pub(crate) struct TraceSinkHandleAllBlocking {
35    pub sender: Sender,
36}
37
38impl TraceSinkHandleAllBlocking {
39    pub(crate) fn new() -> (Self, Receiver) {
40        let (sender, receiver) = flume::bounded::<IpcMessageWithId>(DEFAULT_CHANNEL_SIZE);
41        (Self { sender }, receiver)
42    }
43}
44
45#[async_trait]
46impl TraceSinkHandle for TraceSinkHandleAllBlocking {
47    async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()> {
48        self.sender.send_async(msg.clone()).await?;
49        Ok(())
50    }
51}
52
53/// A trace sink is a client connection for the trace router. It hold state about what data the client has seen and is
54/// subscribed to.
55#[derive(Debug)]
56pub struct TraceSink {
57    /// The list of filters for this sink
58    filters: Arc<RwLock<Vec<Filter>>>,
59}
60
61impl TraceSink {
62    /// Create a new TraceSink and TraceSinkHandle pair that share a set of filters
63    pub(crate) fn new() -> (Self, Receiver, TraceSinkHandleFiltered) {
64        let (sender, receiver) = flume::bounded::<IpcMessageWithId>(1024);
65        let filters = Arc::new(RwLock::new(Vec::new()));
66        (
67            Self {
68                filters: filters.clone(),
69            },
70            receiver,
71            TraceSinkHandleFiltered { sender, filters },
72        )
73    }
74
75    /// Add `filter` to the list of filters for this sink
76    pub async fn subscribe(&self, filter: Filter) {
77        let mut filters = self.filters.write().await;
78        filters.push(filter);
79    }
80
81    /// Remove `filter` from the list of filters for this sink
82    pub async fn unsubscribe(&self, filter: Filter) {
83        let mut filters = self.filters.write().await;
84        filters.retain(|f| f != &filter);
85    }
86}