1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#![warn(missing_docs)]
#[macro_use]
extern crate bytecodec;
#[macro_use]
extern crate slog;
#[macro_use]
extern crate trackable;
pub use error::{Error, ErrorKind};
mod codec;
mod error;
mod node_id;
mod node_id_generator;
mod rpc;
pub mod message;
pub mod metrics;
pub mod misc;
pub mod node;
pub mod service;
pub type Result<T> = std::result::Result<T, Error>;
#[cfg(test)]
mod tests {
use crate::node::{Node, SerialLocalNodeIdGenerator};
use crate::service::Service;
use fibers::Spawn;
use futures::{Future, Stream};
#[test]
fn it_works() {
let server_addr = "127.0.0.1:12121".parse().unwrap();
let service = Service::<String>::new(
server_addr,
fibers_global::handle(),
SerialLocalNodeIdGenerator::new(),
);
let service_handle = service.handle();
fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
let mut fibers = Vec::new();
let mut first_node_id = None;
for i in 0..100 {
let mut node = Node::new(service_handle.clone());
if let Some(id) = first_node_id {
node.join(id);
} else {
first_node_id = Some(node.id());
}
if i == 99 {
node.broadcast("hello".to_owned());
}
let spawner = fibers_global::handle();
let fiber = fibers_global::spawn_monitor(
node.into_future()
.map(move |(message, stream)| {
spawner.spawn(stream.for_each(|_| Ok(())).map_err(|_| ()));
message.map(|m| m.into_payload())
})
.map_err(|(e, _)| e),
);
fibers.push(fiber);
}
for fiber in fibers {
match fibers_global::execute(fiber) {
Err(e) => panic!("{}", e),
Ok(message) => {
assert_eq!(message, Some("hello".to_owned()));
}
}
}
}
}