1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use ControlMessage;
use bip_peer::PeerInfo;
use bip_peer::messages::ExtendedMessage;
use bip_peer::messages::builders::ExtendedMessageBuilder;
use error::UberError;
use futures::Async;
use futures::Poll;
use futures::Stream;
use futures::task;
use futures::task::Task;
use std::collections::{HashMap, VecDeque};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum IExtendedMessage {
Control(ControlMessage),
RecievedExtendedMessage(PeerInfo, ExtendedMessage),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OExtendedMessage {
SendExtendedMessage(PeerInfo, ExtendedMessage),
}
pub trait ExtendedListener {
fn extend(&self, _info: &PeerInfo, _builder: ExtendedMessageBuilder) -> ExtendedMessageBuilder {
_builder
}
fn on_update(&mut self, _info: &PeerInfo, _extended: &ExtendedPeerInfo) {}
}
pub struct ExtendedPeerInfo {
ours: Option<ExtendedMessage>,
theirs: Option<ExtendedMessage>,
}
impl ExtendedPeerInfo {
pub fn new(ours: Option<ExtendedMessage>, theirs: Option<ExtendedMessage>) -> ExtendedPeerInfo {
ExtendedPeerInfo {
ours: ours,
theirs: theirs,
}
}
pub fn update_ours(&mut self, message: ExtendedMessage) {
self.ours = Some(message);
}
pub fn update_theirs(&mut self, message: ExtendedMessage) {
self.theirs = Some(message);
}
pub fn our_message(&self) -> Option<&ExtendedMessage> {
self.ours.as_ref()
}
pub fn their_message(&self) -> Option<&ExtendedMessage> {
self.theirs.as_ref()
}
}
pub struct ExtendedModule {
builder: ExtendedMessageBuilder,
peers: HashMap<PeerInfo, ExtendedPeerInfo>,
out_queue: VecDeque<OExtendedMessage>,
opt_task: Option<Task>,
}
impl ExtendedModule {
pub fn new(builder: ExtendedMessageBuilder) -> ExtendedModule {
ExtendedModule {
builder: builder,
peers: HashMap::new(),
out_queue: VecDeque::new(),
opt_task: None,
}
}
pub fn process_message<D>(&mut self, message: IExtendedMessage, d_modules: &mut [Box<D>])
where
D: ExtendedListener + ?Sized,
{
match message {
IExtendedMessage::Control(ControlMessage::PeerConnected(info)) => {
let mut builder = self.builder.clone();
for d_module in d_modules.iter() {
let temp_builder = builder;
builder = d_module.extend(&info, temp_builder);
}
let ext_message = builder.build();
let ext_peer_info = ExtendedPeerInfo::new(Some(ext_message.clone()), None);
for d_module in d_modules {
d_module.on_update(&info, &ext_peer_info);
}
self.peers.insert(info, ext_peer_info);
self.out_queue
.push_back(OExtendedMessage::SendExtendedMessage(info, ext_message));
},
IExtendedMessage::Control(ControlMessage::PeerDisconnected(info)) => {
self.peers.remove(&info);
},
IExtendedMessage::RecievedExtendedMessage(info, ext_message) => {
let ext_peer_info = self.peers.get_mut(&info).unwrap();
ext_peer_info.update_theirs(ext_message);
for d_module in d_modules {
d_module.on_update(&info, &ext_peer_info);
}
},
_ => {
()
},
}
self.check_stream_unblock();
}
fn check_stream_unblock(&mut self) {
if !self.out_queue.is_empty() {
if let Some(task) = self.opt_task.take() {
task.notify();
}
}
}
}
impl Stream for ExtendedModule {
type Item = OExtendedMessage;
type Error = UberError;
fn poll(&mut self) -> Poll<Option<OExtendedMessage>, UberError> {
let opt_message = self.out_queue.pop_front();
if let Some(message) = opt_message {
Ok(Async::Ready(Some(message)))
} else {
self.opt_task = Some(task::current());
Ok(Async::NotReady)
}
}
}