1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#[macro_use]
extern crate tracing;
mod protocol;
use async_std::{net::TcpStream, sync::RwLock, task};
use identity::Identity;
use qrpc_sdk::{
builders, default_socket_path,
error::RpcResult,
io::{self, Message},
RpcSocket,
};
use std::{collections::BTreeMap, sync::Arc};
#[allow(unused)]
pub(crate) struct ServiceEntry {
pub(crate) id: Identity,
pub(crate) addr: String,
pub(crate) io: TcpStream,
}
type ConnMap = Arc<RwLock<BTreeMap<String, ServiceEntry>>>;
const ADDRESS: &'static str = "org.qaul._broker";
pub struct Broker {
_sock: Arc<RpcSocket>,
_conns: ConnMap,
}
impl Broker {
pub async fn new() -> RpcResult<Arc<Self>> {
let (addr, port) = default_socket_path();
let _conns = Default::default();
let _sock = {
let con = Arc::clone(&_conns);
RpcSocket::server(addr, port, |stream, data| reader_loop(stream, data), con).await?
};
Ok(Arc::new(Self { _sock, _conns }))
}
}
fn reader_loop(mut stream: TcpStream, data: ConnMap) {
task::block_on(async {
loop {
if let Err(e) = handle_packet(&mut stream, &data).await {
warn!(
"Error occured while accepting packet: {}; dropping stream",
e
);
break;
}
}
});
}
async fn handle_packet(s: &mut TcpStream, conns: &ConnMap) -> RpcResult<()> {
let Message { id, to, from, data } = io::recv(s).await?;
match to.as_str() {
ADDRESS => {
debug!("Message addressed to broker; handling!");
let msg = protocol::broker_command(id, &s, data, &conns).await?;
io::send(s, msg).await?;
Ok(())
}
_ => {
debug!("Message addressed to bus component; looking up stream!");
debug!("Looking up stream: {}", to);
let mut t_stream = match conns.read().await.get(&to).map(|s| s.io.clone()) {
Some(s) => s,
None => {
warn!("Requested component does not exist on qrpc bus!");
io::send(
s,
Message {
id,
to: "<unknown>".into(),
from: ADDRESS.into(),
data: builders::resp_bool(false),
},
)
.await?;
return Ok(());
}
};
io::send(&mut t_stream, Message { id, to, from, data }).await?;
Ok(())
}
}
}