1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{self, Arc},
task::{Context, Poll},
};
use tracing::{debug, instrument, trace};
use crate::{
async_lock::Mutex, raw::Connection as RawConnection, Executor, Message, MsgBroadcaster,
OwnedMatchRule, Result, Socket, Task,
};
#[derive(Debug)]
pub(crate) struct SocketReader {
raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
}
impl SocketReader {
pub fn new(
raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
) -> Self {
Self { raw_conn, senders }
}
pub fn spawn(self, executor: &Executor<'_>) -> Task<()> {
executor.spawn(self.receive_msg(), "socket reader")
}
#[instrument(name = "socket reader", skip(self))]
async fn receive_msg(self) {
loop {
trace!("Waiting for message on the socket..");
let receive_msg = ReceiveMessage {
raw_conn: &self.raw_conn,
};
let msg = receive_msg.await.map(Arc::new);
match &msg {
Ok(msg) => trace!("Message received on the socket: {:?}", msg),
Err(e) => trace!("Error reading from the socket: {:?}", e),
};
let mut senders = self.senders.lock().await;
for (rule, sender) in &*senders {
if let Ok(msg) = &msg {
if let Some(rule) = rule.as_ref() {
match rule.matches(msg) {
Ok(true) => (),
Ok(false) => continue,
Err(e) => {
debug!("Error matching message against rule: {:?}", e);
continue;
}
}
}
}
if let Err(e) = sender.broadcast(msg.clone()).await {
trace!(
"Error broadcasting message to stream for `{:?}`: {:?}",
rule,
e
);
}
}
trace!("Broadcasted to all streams: {:?}", msg);
if msg.is_err() {
senders.clear();
trace!("Socket reading task stopped");
return;
}
}
}
}
struct ReceiveMessage<'r> {
raw_conn: &'r sync::Mutex<RawConnection<Box<dyn Socket>>>,
}
impl<'r> Future for ReceiveMessage<'r> {
type Output = Result<Message>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut raw_conn = self.raw_conn.lock().expect("poisoned lock");
raw_conn.try_receive_message(cx)
}
}