copernica_broker/
broker.rs

1use {
2    crate::{
3        bloom_filter::Blooms,
4        router::Router,
5        Bayes,
6    },
7    copernica_packets::{LinkId, InterLinkPacket, NarrowWaistPacket },
8    copernica_common::{ constants, Operations},
9    anyhow::{anyhow, Result},
10    crossbeam_channel::{bounded, Receiver, Sender},
11    uluru::LRUCache,
12    std::{
13        collections::HashMap,
14    },
15    log::{
16        error, trace,
17        //debug
18    },
19};
20
21/*
22     s = Service, l = Link, b = Broker, r = Router, 2 = to: e.g. l2b = "link to copernica_broker"
23     link::{udp, mpsc_channel, mpsc_corruptor, etc}
24                                                            +----------------------------+
25    +-----------+s2l_tx   s2l_rx+-----------+l2b_tx   l2b_rx| b2r_tx           b2r_rx    |   +-----------+   +-----------+
26    |           +-------------->+           +-------------->-------------------------+   +-->+           +-->+           |
27    | Service   |l2s_rx   l2s_tx|   Link    |b2l_rx   b2l_tx| r2b_rx       r2b_tx    |   |   |   Link    |   | Service   |
28    |           +<--------------+           +<---------------<-------------------+   |   +<--+           +<--+           |
29    +-----------+               +-----------+               |                    |   v   |   +-----------+   +-----------+
30                                                            |                +---+---+-+ |
31    +-----------+s2l_tx   s2l_rx+-----------+l2b_tx   l2b_rx| b2r_tx   b2r_rx|         | |   +-----------+   +-----------+
32    |           +-------------->+           +-------------->---------------->+         | +-->+           +-->+           |
33    | Service   |l2s_rx   l2s_tx|   Link    |b2l_rx   b2l_tx| r2b_rx   r2b_tx|  Router | |   |   Link    |   |  Broker   |
34    |           +<--------------+           +<---------------<---------------+         | +<--+           +<--+           |
35    +-----------+               +-----------+               |                |         | |   +-----------+   +-----------+
36                                                            |                +---+---+-+ |
37    +-----------+b2l_tx   b2l_rx+-----------+l2b_tx   l2b_rx| b2r_tx      b2r_rx ^   |   |   +-----------+   +-----------+
38    |           +-------------->+           +-------------->---------------------+   |   +-->+           +-->+           |
39    |  Broker   |l2b_rx   l2b_tx|   Link    |b2l_rx   b2l_tx| r2b_rx          r2b_tx |   |   |   Link    |   | Service   |
40    |           +<--------------+           +<---------------<-----------------------+   +<--+           +<--+           |
41    +-----------+               +-----------+               |           Broker           |   +-----------+   +-----------+
42                                                            +----------------------------+
43*/
44pub type ResponseStore = LRUCache<NarrowWaistPacket, { constants::RESPONSE_STORE_SIZE }>;
45pub struct Broker {
46    label:  String,
47    ops: Operations,
48    rs:     ResponseStore,
49    l2b_tx: Sender<InterLinkPacket>,                         // give to link
50    l2b_rx: Receiver<InterLinkPacket>,                       // keep in broker
51    b2l:    HashMap<u32, Sender<InterLinkPacket>>,           // keep in broker
52    r2b_tx: Sender<InterLinkPacket>,                // give to router
53    r2b_rx: Receiver<InterLinkPacket>,  // keep in broker
54    blooms: HashMap<LinkId, Blooms>,
55}
56impl Broker {
57    pub fn new((label, ops): (String, Operations)) -> Self {
58        let (l2b_tx, l2b_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
59        let (r2b_tx, r2b_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
60        let b2l = HashMap::new();
61        let blooms = HashMap::new();
62        let rs = ResponseStore::default();
63        ops.register_router(label.clone());
64        Self {
65            label,
66            rs,
67            l2b_tx,
68            l2b_rx,
69            r2b_tx,
70            r2b_rx,
71            b2l,
72            blooms,
73            ops,
74        }
75    }
76    pub fn peer_with_link(
77        &mut self,
78        link_id: LinkId,
79    ) -> Result<(Sender<InterLinkPacket>, Receiver<InterLinkPacket>)> {
80        match self.blooms.get(&link_id) {
81            Some(_) => Err(anyhow!("Channel already initialized")),
82            None => {
83                let (b2l_tx, b2l_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
84                self.b2l.insert(link_id.lookup_id()?, b2l_tx.clone());
85                self.blooms.insert(link_id, Blooms::new());
86                Ok((self.l2b_tx.clone(), b2l_rx))
87            }
88        }
89    }
90    #[allow(unreachable_code)]
91    pub fn run(&mut self) -> Result<()> {
92        let l2b_rx = self.l2b_rx.clone();
93        let mut blooms = self.blooms.clone();
94        let choke = LinkId::choke();
95        let mut b2l = self.b2l.clone();
96        let r2b_tx = self.r2b_tx.clone();
97        let r2b_rx = self.r2b_rx.clone();
98        let mut bayes = Bayes::new();
99        for (link_id, _) in &blooms {
100            bayes.add_link(&link_id);
101        }
102        let rs = self.rs.clone();
103        let ops = self.ops.clone();
104        let label = self.label.clone();
105        std::thread::spawn(move || {
106            loop {
107                match l2b_rx.recv() {
108                    Ok(ilp) => {
109                        trace!("\t\t|  |  |  broker-to-router");
110                        ops.message_from(label.clone());
111                        if !blooms.contains_key(&ilp.link_id()) {
112                            trace!("ADDING {:?} to BLOOMS", ilp);
113                            blooms.insert(ilp.link_id(), Blooms::new());
114                            bayes.add_link(&ilp.link_id());
115                        }
116                        Router::handle_packet(&label, &ops, &ilp, r2b_tx.clone(), &mut rs.clone(), &mut blooms, &mut bayes, &choke)?;
117                    }
118                    Err(error) => error!("{}", error),
119                }
120            }
121            Ok::<(), anyhow::Error>(())
122        });
123        let ops = self.ops.clone();
124        let label = self.label.clone();
125        std::thread::spawn(move || {
126            loop {
127                if let Ok(ilp) = r2b_rx.recv() {
128                    match &ilp.link_id().lookup_id() {
129                        Ok(id) => {
130                            match b2l.get_mut(id) {
131                                Some(b2l_tx) => {
132                                    trace!("\t\t|  |  |  router-to-broker");
133                                    ops.message_from(label.clone());
134                                    match b2l_tx.send(ilp) {
135                                        Ok(_) => {},
136                                        Err(e) => error!("broker {:?}", e),
137                                    }
138                                },
139                                None => { continue }
140                            }
141                        },
142                        Err(_e) => { continue },
143                    };
144                }
145            }
146        });
147        Ok(())
148    }
149}