copernica_broker/broker.rs
1use {
2 crate::{
3 bloom_filter::Blooms,
4 router::Router,
5 Bayes,
6 },
7 copernica_packets::{LinkId, InterLinkPacket, NarrowWaistPacket },
8 copernica_common::{ constants, Operations},
9 anyhow::{anyhow, Result},
10 crossbeam_channel::{bounded, Receiver, Sender},
11 uluru::LRUCache,
12 std::{
13 collections::HashMap,
14 },
15 log::{
16 error, trace,
17 //debug
18 },
19};
20
21/*
22 s = Service, l = Link, b = Broker, r = Router, 2 = to: e.g. l2b = "link to copernica_broker"
23 link::{udp, mpsc_channel, mpsc_corruptor, etc}
24 +----------------------------+
25 +-----------+s2l_tx s2l_rx+-----------+l2b_tx l2b_rx| b2r_tx b2r_rx | +-----------+ +-----------+
26 | +-------------->+ +-------------->-------------------------+ +-->+ +-->+ |
27 | Service |l2s_rx l2s_tx| Link |b2l_rx b2l_tx| r2b_rx r2b_tx | | | Link | | Service |
28 | +<--------------+ +<---------------<-------------------+ | +<--+ +<--+ |
29 +-----------+ +-----------+ | | v | +-----------+ +-----------+
30 | +---+---+-+ |
31 +-----------+s2l_tx s2l_rx+-----------+l2b_tx l2b_rx| b2r_tx b2r_rx| | | +-----------+ +-----------+
32 | +-------------->+ +-------------->---------------->+ | +-->+ +-->+ |
33 | Service |l2s_rx l2s_tx| Link |b2l_rx b2l_tx| r2b_rx r2b_tx| Router | | | Link | | Broker |
34 | +<--------------+ +<---------------<---------------+ | +<--+ +<--+ |
35 +-----------+ +-----------+ | | | | +-----------+ +-----------+
36 | +---+---+-+ |
37 +-----------+b2l_tx b2l_rx+-----------+l2b_tx l2b_rx| b2r_tx b2r_rx ^ | | +-----------+ +-----------+
38 | +-------------->+ +-------------->---------------------+ | +-->+ +-->+ |
39 | Broker |l2b_rx l2b_tx| Link |b2l_rx b2l_tx| r2b_rx r2b_tx | | | Link | | Service |
40 | +<--------------+ +<---------------<-----------------------+ +<--+ +<--+ |
41 +-----------+ +-----------+ | Broker | +-----------+ +-----------+
42 +----------------------------+
43*/
44pub type ResponseStore = LRUCache<NarrowWaistPacket, { constants::RESPONSE_STORE_SIZE }>;
45pub struct Broker {
46 label: String,
47 ops: Operations,
48 rs: ResponseStore,
49 l2b_tx: Sender<InterLinkPacket>, // give to link
50 l2b_rx: Receiver<InterLinkPacket>, // keep in broker
51 b2l: HashMap<u32, Sender<InterLinkPacket>>, // keep in broker
52 r2b_tx: Sender<InterLinkPacket>, // give to router
53 r2b_rx: Receiver<InterLinkPacket>, // keep in broker
54 blooms: HashMap<LinkId, Blooms>,
55}
56impl Broker {
57 pub fn new((label, ops): (String, Operations)) -> Self {
58 let (l2b_tx, l2b_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
59 let (r2b_tx, r2b_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
60 let b2l = HashMap::new();
61 let blooms = HashMap::new();
62 let rs = ResponseStore::default();
63 ops.register_router(label.clone());
64 Self {
65 label,
66 rs,
67 l2b_tx,
68 l2b_rx,
69 r2b_tx,
70 r2b_rx,
71 b2l,
72 blooms,
73 ops,
74 }
75 }
76 pub fn peer_with_link(
77 &mut self,
78 link_id: LinkId,
79 ) -> Result<(Sender<InterLinkPacket>, Receiver<InterLinkPacket>)> {
80 match self.blooms.get(&link_id) {
81 Some(_) => Err(anyhow!("Channel already initialized")),
82 None => {
83 let (b2l_tx, b2l_rx) = bounded::<InterLinkPacket>(constants::BOUNDED_BUFFER_SIZE);
84 self.b2l.insert(link_id.lookup_id()?, b2l_tx.clone());
85 self.blooms.insert(link_id, Blooms::new());
86 Ok((self.l2b_tx.clone(), b2l_rx))
87 }
88 }
89 }
90 #[allow(unreachable_code)]
91 pub fn run(&mut self) -> Result<()> {
92 let l2b_rx = self.l2b_rx.clone();
93 let mut blooms = self.blooms.clone();
94 let choke = LinkId::choke();
95 let mut b2l = self.b2l.clone();
96 let r2b_tx = self.r2b_tx.clone();
97 let r2b_rx = self.r2b_rx.clone();
98 let mut bayes = Bayes::new();
99 for (link_id, _) in &blooms {
100 bayes.add_link(&link_id);
101 }
102 let rs = self.rs.clone();
103 let ops = self.ops.clone();
104 let label = self.label.clone();
105 std::thread::spawn(move || {
106 loop {
107 match l2b_rx.recv() {
108 Ok(ilp) => {
109 trace!("\t\t| | | broker-to-router");
110 ops.message_from(label.clone());
111 if !blooms.contains_key(&ilp.link_id()) {
112 trace!("ADDING {:?} to BLOOMS", ilp);
113 blooms.insert(ilp.link_id(), Blooms::new());
114 bayes.add_link(&ilp.link_id());
115 }
116 Router::handle_packet(&label, &ops, &ilp, r2b_tx.clone(), &mut rs.clone(), &mut blooms, &mut bayes, &choke)?;
117 }
118 Err(error) => error!("{}", error),
119 }
120 }
121 Ok::<(), anyhow::Error>(())
122 });
123 let ops = self.ops.clone();
124 let label = self.label.clone();
125 std::thread::spawn(move || {
126 loop {
127 if let Ok(ilp) = r2b_rx.recv() {
128 match &ilp.link_id().lookup_id() {
129 Ok(id) => {
130 match b2l.get_mut(id) {
131 Some(b2l_tx) => {
132 trace!("\t\t| | | router-to-broker");
133 ops.message_from(label.clone());
134 match b2l_tx.send(ilp) {
135 Ok(_) => {},
136 Err(e) => error!("broker {:?}", e),
137 }
138 },
139 None => { continue }
140 }
141 },
142 Err(_e) => { continue },
143 };
144 }
145 }
146 });
147 Ok(())
148 }
149}