master_and_slave/
master_and_slave.rs

1//! A example to show how to combine client and server into one task.
2//!
3//! 'master' is a server to control 'manager' when to stop (10s after starting
4//! in our example).
5//!
6//!'manager' run a big task including two small tasks:
7//! one for connecting to 'master' socket, when receiving a *stop* message, process exits;
8//! one for listening another socket as server, when receiving 'master' *stop*, send stop
9//! message (*0* in our example) to its client.
10//!
11//! 'driver' is a client to connect 'manager', and as a counter to send&receive u32.
12//!
13//! You can test this out by running:
14//!
15//!     cargo run --example master_and_slave  master
16//!
17//! And then fastly (within 10s) in another two windows run:
18//!
19//!     cargo run --example master_and_slave  manager
20//!
21//!     cargo run --example master_and_slave  slave
22//!
23
24extern crate msg_transmitter;
25extern crate tokio;
26extern crate futures;
27#[macro_use]
28extern crate serde_derive;
29extern crate bincode;
30
31use tokio::prelude::*;
32use msg_transmitter::*;
33
34use std::env;
35
36#[derive(Serialize, Deserialize, Debug, Clone)]
37enum Message {
38    Start(),
39    Stop(),
40}
41
42
43fn main() {
44    let a = env::args().skip(1).collect::<Vec<_>>();
45    match a.first().unwrap().as_str() {
46        "master" => master(),
47        "manager" => manager(),
48        "driver" => driver(),
49        _ => panic!("failed"),
50    };
51}
52
53fn master() {
54    use std::{thread, time};
55    let srv = create_tcp_server("127.0.0.1:7777", "master");
56    let master_task = srv.start_server(Message::Start(), |client_name, msg: Message| {
57        println!("master receive {:?} from {}", msg, client_name);
58        println!("sleep");
59        let ten_millis = time::Duration::from_millis(10000);
60        let now = time::Instant::now();
61        thread::sleep(ten_millis);
62        println!("awake");
63        vec![(client_name, Message::Stop())]
64    });
65    tokio::run(master_task);
66}
67
68fn manager() {
69    use futures::sync::mpsc;
70    use std::sync::{Arc, Mutex};
71    let future_task = future::ok(1).and_then(|_| {
72        let srv = create_tcp_server("127.0.0.1:6666", "management_server");
73        let clt = create_tcp_client("127.0.0.1:7777", "management_client");
74        let connections = srv.connections.clone();
75
76        let clt_task = clt.start_client(move |msg: Message| {
77            println!("manager receive {:?} from master", msg);
78            match msg {
79                Message::Stop() => {
80                    for (_, tx) in connections.lock().unwrap().iter_mut() {
81                        (*tx).try_send(Some(0)).unwrap();
82                    }
83                    std::process::exit(0);
84                }
85                _ =>{}
86            }
87            vec![msg]
88        });
89        tokio::spawn(clt_task);
90
91        let srv_task = srv.start_server(1, |client_name, msg: u32| {
92            println!("manager receive {} from {}", msg, client_name);
93            vec![(client_name, msg + 1)]
94        });
95        tokio::spawn(srv_task);
96        Ok(())
97    }).map_err(|e: std::io::Error| println!("{:?}", e));
98    tokio::run(future_task);
99}
100
101fn driver() {
102    let clt = create_tcp_client("127.0.0.1:6666", "driver");
103    let client_task = clt.start_client(|msg: u32| {
104        println!("{}", msg);
105        if msg != 0 {
106            vec![msg + 1]
107        } else {
108            println!("oh god");
109            std::process::exit(0);
110        }
111    });
112    tokio::run(client_task);
113}