1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! An extensible rpc message broker for the libqaul ecosystem.

#[macro_use]
extern crate tracing;

mod protocol;

use async_std::{net::TcpStream, sync::RwLock, task};
use identity::Identity;
use qrpc_sdk::{
    builders, default_socket_path,
    error::RpcResult,
    io::{self, Message},
    RpcSocket,
};
use std::{collections::BTreeMap, sync::Arc};

#[allow(unused)]
pub(crate) struct ServiceEntry {
    pub(crate) id: Identity,
    pub(crate) addr: String,
    pub(crate) io: TcpStream,
}

type ConnMap = Arc<RwLock<BTreeMap<String, ServiceEntry>>>;

const ADDRESS: &'static str = "org.qaul._broker";

/// Hold the main broker state
pub struct Broker {
    _sock: Arc<RpcSocket>,
    _conns: ConnMap,
}

impl Broker {
    pub async fn new() -> RpcResult<Arc<Self>> {
        let (addr, port) = default_socket_path();
        let _conns = Default::default();

        let _sock = {
            let con = Arc::clone(&_conns);
            RpcSocket::server(addr, port, |stream, data| reader_loop(stream, data), con).await?
        };

        Ok(Arc::new(Self { _sock, _conns }))
    }
}

fn reader_loop(mut stream: TcpStream, data: ConnMap) {
    task::block_on(async {
        loop {
            if let Err(e) = handle_packet(&mut stream, &data).await {
                warn!(
                    "Error occured while accepting packet: {}; dropping stream",
                    e
                );
                break;
            }
        }
    });
}

async fn handle_packet(s: &mut TcpStream, conns: &ConnMap) -> RpcResult<()> {
    let Message { id, to, from, data } = io::recv(s).await?;
    match to.as_str() {
        ADDRESS => {
            debug!("Message addressed to broker; handling!");
            let msg = protocol::broker_command(id, &s, data, &conns).await?;
            io::send(s, msg).await?;
            Ok(())
        }
        _ => {
            debug!("Message addressed to bus component; looking up stream!");
            debug!("Looking up stream: {}", to);
            let mut t_stream = match conns.read().await.get(&to).map(|s| s.io.clone()) {
                Some(s) => s,
                None => {
                    warn!("Requested component does not exist on qrpc bus!");
                    io::send(
                        s,
                        Message {
                            id,
                            to: "<unknown>".into(),
                            from: ADDRESS.into(),
                            data: builders::resp_bool(false),
                        },
                    )
                    .await?;
                    return Ok(());
                }
            };

            // If we reach this point, we can send the relay message
            io::send(&mut t_stream, Message { id, to, from, data }).await?;
            Ok(())
        }
    }
}