1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::RwLock;
6use zelos_trace_types::ipc::{IpcMessageWithId, Receiver, Sender};
7
8use crate::{filter::Filter, router::DEFAULT_CHANNEL_SIZE};
9
10#[async_trait]
11pub(crate) trait TraceSinkHandle: Send + Sync {
12 async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()>;
13}
14
15pub(crate) struct TraceSinkHandleFiltered {
17 pub sender: Sender,
18 pub filters: Arc<RwLock<Vec<Filter>>>,
19}
20
21#[async_trait]
22impl TraceSinkHandle for TraceSinkHandleFiltered {
23 async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()> {
24 for filter in self.filters.read().await.iter() {
25 if filter.matches(msg) {
26 self.sender.try_send(msg.clone())?;
27 continue;
28 }
29 }
30 Ok(())
31 }
32}
33
34pub(crate) struct TraceSinkHandleAllBlocking {
35 pub sender: Sender,
36}
37
38impl TraceSinkHandleAllBlocking {
39 pub(crate) fn new() -> (Self, Receiver) {
40 let (sender, receiver) = flume::bounded::<IpcMessageWithId>(DEFAULT_CHANNEL_SIZE);
41 (Self { sender }, receiver)
42 }
43}
44
45#[async_trait]
46impl TraceSinkHandle for TraceSinkHandleAllBlocking {
47 async fn send_async(&self, msg: &IpcMessageWithId) -> Result<()> {
48 self.sender.send_async(msg.clone()).await?;
49 Ok(())
50 }
51}
52
53#[derive(Debug)]
56pub struct TraceSink {
57 filters: Arc<RwLock<Vec<Filter>>>,
59}
60
61impl TraceSink {
62 pub(crate) fn new() -> (Self, Receiver, TraceSinkHandleFiltered) {
64 let (sender, receiver) = flume::bounded::<IpcMessageWithId>(1024);
65 let filters = Arc::new(RwLock::new(Vec::new()));
66 (
67 Self {
68 filters: filters.clone(),
69 },
70 receiver,
71 TraceSinkHandleFiltered { sender, filters },
72 )
73 }
74
75 pub async fn subscribe(&self, filter: Filter) {
77 let mut filters = self.filters.write().await;
78 filters.push(filter);
79 }
80
81 pub async fn unsubscribe(&self, filter: Filter) {
83 let mut filters = self.filters.write().await;
84 filters.retain(|f| f != &filter);
85 }
86}