1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{collections::BTreeMap, io};
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use crate::{
conn::{frame::Frame, Connection, MessageKind},
types::status::{CurrentStatus, StatusChange},
};
type Outstanding = BTreeMap<(u8, u8), oneshot::Sender<io::Result<Frame>>>;
pub(super) type Request = (Frame, oneshot::Sender<io::Result<Frame>>);
struct IoLoop {
connection: Connection,
channel_kill: oneshot::Receiver<()>,
channel_die: Option<oneshot::Sender<io::Error>>,
channel_req: mpsc::Receiver<Request>,
channel_change: broadcast::Sender<StatusChange>,
channel_status: watch::Sender<CurrentStatus>,
outstanding: Outstanding,
}
impl IoLoop {
/// Main IO Loop: select for
/// - `channel_kill`: shutdown requested, clean up and die
/// - `connection` as `Stream`: forward status messages to `broadcast_status()`,
/// and handle replies to control requests or extended messages by sending
/// them back to caller on its `oneshot`
/// - `channel_req`: forward request to `connection` as `Sink` and register caller
/// as waiting for reply
async fn run(&mut self) {
use futures_util::sink::SinkExt;
use tokio_stream::StreamExt;
let mut running = true;
while running {
tokio::select! {
// frame from the connection
next = self.connection.next() => {
if let Some(Ok(frame)) = next {
// response or status frame
if let Some(tx) = self.outstanding.remove(&(frame.msg_id, frame.msg_type)) {
// a response, send back to caller
if frame.kind == MessageKind::StatusResponse {
// and also broadcast as a status update
self.broadcast_status(frame.clone());
}
let _ = tx.send(Ok(frame));
} else if frame.kind == MessageKind::StatusResponse {
// a status update to broadcast
self.broadcast_status(frame);
} else {
log::debug!("ignoring unexpected {:?} message", frame.kind);
}
} else {
// IO error or remote closed the connection
let e = match next {
Some(Err(e)) => e,
None => io::Error::from(io::ErrorKind::UnexpectedEof),
_ => panic!("unexpected next value"),
};
log::error!("{}", e);
if let Some(die_tx) = self.channel_die.take() {
let _ = die_tx.send(e);
}
}
},
Some((frame, ret_tx)) = self.channel_req.recv() => {
let (msg_id, msg_type) = (frame.msg_id, frame.msg_type);
match self.connection.send(frame).await {
Ok(_) => {
// register that we're expecting a response
self.outstanding.insert((msg_id, msg_type), ret_tx);
},
Err(e) => {
// failed to send, send error back to caller
// (ignore send failure if the caller has stopped waiting)
let _ = ret_tx.send(Err(e));
},
}
}
// shutdown request
_ = &mut self.channel_kill => { running = false; },
}
}
}
fn broadcast_status(&mut self, frame: Frame) {
if let Ok(change) = StatusChange::try_from(frame) {
// TODO: detect whether actually modified and use `send_if_modified()`
self.channel_status
.send_modify(|status| status.apply(&change));
let _ = self.channel_change.send(change);
}
}
}
pub(super) fn spawn(
connection: Connection,
channel_kill: oneshot::Receiver<()>,
channel_req: mpsc::Receiver<Request>,
channel_change: broadcast::Sender<StatusChange>,
channel_status: watch::Sender<CurrentStatus>,
) -> tokio::task::JoinHandle<()> {
let outstanding = BTreeMap::<_, _>::new();
tokio::spawn(async move {
let mut io_loop = IoLoop {
connection,
channel_kill,
channel_die: None, /* FIXME */
channel_req,
channel_change,
channel_status,
outstanding,
};
io_loop.run().await;
})
}