1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
pub mod client;
pub mod config;
pub mod resolve;
pub mod service;
use std::cell::RefCell;
use std::rc::Rc;
use futures::Future;
use futures::future::{err, join_all};
use notify::Error as NotifyError;
use tokio_core::reactor::Handle;
use client::messages::ClientBrokerNegotiation;
use common::messages::{Identifier, ProtocolVersion, SoftwareVersion};
use self::config::Config;
use self::resolve::Cache;
use self::service::{Service, ServiceConnectError, ServiceConnectErrorKind};
use service::messages::ServiceBrokerNegotiation;
pub struct Broker {
cache: Rc<RefCell<Cache>>,
config: Config,
handle: Handle,
services: Vec<Service>,
}
impl Broker {
pub fn new(config: Config, handle: Handle) -> Box<Future<Item=Broker, Error=NewBrokerError>> {
let cache = match Cache::new(&handle) {
Ok(cache) => cache,
Err(e) => return Box::new(err(e.into())),
};
let futures = config.service.clone()
.into_iter()
.map(|s| Service::connect(config.clone(), s, &handle)
.map_err(NewBrokerError::from))
.collect::<Vec<_>>();
Box::new(join_all(futures).map(|services| {
info!("Connected to all services: {:?}", services);
Broker { cache, config, handle, services }
}))
}
pub fn find_service(&self, id: &Identifier) -> Option<&Service> {
for service in self.services.iter() {
if &service.negotiation.service.id == id {
return Some(service);
}
}
None
}
pub fn client_negotiation(&self) -> ClientBrokerNegotiation {
ClientBrokerNegotiation {
monto: ProtocolVersion {
major: 3,
minor: 0,
patch: 0,
},
broker: self.version(),
extensions: self.config.extensions.client.clone(),
services: self.services.iter().map(|s| s.negotiation.clone()).collect(),
}
}
pub fn service_negotiation(&self) -> ServiceBrokerNegotiation {
ServiceBrokerNegotiation {
monto: ProtocolVersion {
major: 3,
minor: 0,
patch: 0,
},
extensions: self.config.extensions.service.clone(),
broker: self.version(),
}
}
pub fn version(&self) -> SoftwareVersion {
self.config.version.clone().into()
}
}
error_chain! {
types {
NewBrokerError, NewBrokerErrorKind, NewBrokerResultExt;
}
foreign_links {
Notify(NotifyError)
#[doc = "An error setting up the notifier."];
}
links {
ServiceConnect(ServiceConnectError, ServiceConnectErrorKind)
#[doc = "An error connecting to a service."];
}
}