echo_server/
echo_server.rs

1//! This demonstrates a TCP echo server.  It listens for connections
2//! on port 7777.  Each connection is handled by a separate actor.
3//! This actor just sends back whatever data it receives over TCP.
4//!
5//! However to make things a little more interesting, some additional
6//! processing is performed:
7//!
8//! - All echos are delayed by one second.
9//!
10//! - If the character '!' is passed, then the actor terminates the
11//!   TCP connection and shuts down.  All other TCP connections
12//!   continue as normal though.
13//!
14//! - If the character '%' is passed, this causes the actor to fail,
15//!   passing an `AbortError` back to the listener.  The listener
16//!   detects this particular kind of failure, and shuts down the
17//!   whole server.
18//!
19//! - The server will shut down if there is no incoming connection for
20//!   60 seconds
21//!
22//! Start the example, and then connect using `telnet 127.0.0.1 7777`.
23//! Many `telnet` sessions can be handled at the same time.
24
25use stakker::{
26    actor, actor_in_slab, after, fail, fwd_to, ret_shutdown, ret_some_to, stop, timer_max,
27    ActorOwnSlab, MaxTimerKey, Stakker, StopCause, CX,
28};
29use stakker_mio::mio::net::{TcpListener, TcpStream};
30use stakker_mio::mio::{Events, Interest, Poll};
31use stakker_mio::{MioPoll, MioSource, ReadStatus, Ready, TcpStreamBuf};
32
33use std::error::Error;
34use std::fmt;
35use std::io::ErrorKind;
36use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
37use std::time::{Duration, Instant};
38
39const PORT: u16 = 7777;
40
41// Here fatal top-level MIO failures are returned from main.  All
42// other I/O failures are handled as actor failure.
43fn main() -> Result<(), Box<dyn Error>> {
44    let mut stakker = Stakker::new(Instant::now());
45    let s = &mut stakker;
46    let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48    let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50    // Don't need `idle!` handling
51    s.run(Instant::now(), false);
52    while s.not_shutdown() {
53        let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54        miopoll.poll(maxdur)?;
55        s.run(Instant::now(), false);
56    }
57
58    println!("Shutdown: {}", s.shutdown_reason().unwrap());
59    Ok(())
60}
61
62/// Listens for incoming TCP connections
63struct Listener {
64    children: ActorOwnSlab<Echoer>,
65    listener: MioSource<TcpListener>,
66    inactivity: MaxTimerKey,
67}
68
69impl Listener {
70    fn init(cx: CX![]) -> Option<Self> {
71        match Self::setup(cx) {
72            Err(e) => {
73                fail!(cx, "Listening socket setup failed on port {}: {}", PORT, e);
74                None
75            }
76            Ok(this) => Some(this),
77        }
78    }
79
80    fn setup(cx: CX![]) -> std::io::Result<Self> {
81        let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
82        let listen = TcpListener::bind(SocketAddr::V4(addr))?;
83        let miopoll = cx.anymap_get::<MioPoll>();
84        let listener = miopoll.add(
85            listen,
86            Interest::READABLE,
87            10,
88            fwd_to!([cx], connect() as (Ready)),
89        )?;
90        println!("Listening on port 7777 for incoming telnet connections ...");
91
92        let mut this = Self {
93            listener,
94            children: ActorOwnSlab::new(),
95            inactivity: MaxTimerKey::default(),
96        };
97        this.activity(cx);
98
99        Ok(this)
100    }
101
102    // Register activity, pushing back the inactivity timer
103    fn activity(&mut self, cx: CX![]) {
104        timer_max!(
105            &mut self.inactivity,
106            cx.now() + Duration::from_secs(60),
107            [cx],
108            |_this, cx| {
109                fail!(cx, "Timed out waiting for connection");
110            }
111        );
112    }
113
114    fn connect(&mut self, cx: CX![], _: Ready) {
115        loop {
116            match self.listener.accept() {
117                Ok((stream, addr)) => {
118                    println!("New connection from {}", addr);
119                    actor_in_slab!(
120                        self.children,
121                        cx,
122                        Echoer::init(stream),
123                        ret_some_to!([cx], |_this, cx, cause: StopCause| {
124                            // Mostly just report child failure, but watch out for
125                            // AbortError to terminate this actor, which in turn shuts
126                            // down the whole process
127                            println!("Child actor terminated: {}", cause);
128
129                            if let StopCause::Failed(e) = cause {
130                                if e.downcast::<AbortError>().is_ok() {
131                                    fail!(cx, "Aborted");
132                                }
133                            }
134                        })
135                    );
136                    self.activity(cx);
137                }
138                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
139                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
140                Err(e) => {
141                    fail!(cx, "TCP listen socket failure on accept: {}", e);
142                    return;
143                }
144            }
145        }
146    }
147}
148
149/// Echoes received data back to sender, with a delay
150struct Echoer {
151    tcp: TcpStreamBuf,
152}
153
154impl Echoer {
155    fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
156        match Self::setup(cx, stream) {
157            Err(e) => {
158                fail!(cx, "Failed to set up a new TCP stream: {}", e);
159                None
160            }
161            Ok(this) => Some(this),
162        }
163    }
164
165    fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
166        let miopoll = cx.anymap_get::<MioPoll>();
167        let source = miopoll.add(
168            stream,
169            Interest::READABLE | Interest::WRITABLE,
170            10,
171            fwd_to!([cx], ready() as (Ready)),
172        )?;
173
174        let mut tcp = TcpStreamBuf::new();
175        tcp.init(source);
176
177        Ok(Self { tcp })
178    }
179
180    fn ready(&mut self, cx: CX![], ready: Ready) {
181        if ready.is_readable() {
182            loop {
183                match self.tcp.read(8192) {
184                    ReadStatus::NewData => {
185                        let data = self.tcp.inp[self.tcp.rd..self.tcp.wr].to_vec();
186                        self.tcp.rd = self.tcp.wr;
187                        self.check_special_chars(cx, &data);
188                        after!(Duration::from_secs(1), [cx], send_data(data));
189                        continue;
190                    }
191                    ReadStatus::WouldBlock => (),
192                    ReadStatus::EndOfStream => {
193                        after!(Duration::from_secs(1), [cx], send_eof());
194                    }
195                    ReadStatus::Error(e) => {
196                        fail!(cx, "Read failure on TCP stream: {}", e);
197                    }
198                }
199                break;
200            }
201        }
202
203        if ready.is_writable() {
204            self.flush(cx);
205        }
206    }
207
208    fn send_data(&mut self, cx: CX![], data: Vec<u8>) {
209        self.tcp.out.extend_from_slice(&data);
210        self.flush(cx);
211    }
212
213    fn send_eof(&mut self, cx: CX![]) {
214        self.tcp.out_eof = true;
215        self.flush(cx);
216    }
217
218    fn flush(&mut self, cx: CX![]) {
219        if let Err(e) = self.tcp.flush() {
220            fail!(cx, "Write failure on TCP stream: {}", e);
221        }
222        if self.tcp.out_eof && self.tcp.out.is_empty() {
223            stop!(cx); // Stop actor when output is complete
224        }
225    }
226
227    fn check_special_chars(&mut self, cx: CX![], data: &[u8]) {
228        if data.contains(&b'!') {
229            self.tcp.out_eof = true;
230            self.flush(cx);
231        }
232        if data.contains(&b'%') {
233            fail!(cx, AbortError);
234        }
235    }
236}
237
238#[derive(Debug)]
239struct AbortError;
240impl Error for AbortError {}
241impl fmt::Display for AbortError {
242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243        write!(f, "AbortError")
244    }
245}