1use stakker::{
26 actor, actor_in_slab, after, fail, fwd_to, ret_shutdown, ret_some_to, stop, timer_max,
27 ActorOwnSlab, MaxTimerKey, Stakker, StopCause, CX,
28};
29use stakker_mio::mio::net::{TcpListener, TcpStream};
30use stakker_mio::mio::{Events, Interest, Poll};
31use stakker_mio::{MioPoll, MioSource, ReadStatus, Ready, TcpStreamBuf};
32
33use std::error::Error;
34use std::fmt;
35use std::io::ErrorKind;
36use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
37use std::time::{Duration, Instant};
38
39const PORT: u16 = 7777;
40
41fn main() -> Result<(), Box<dyn Error>> {
44 let mut stakker = Stakker::new(Instant::now());
45 let s = &mut stakker;
46 let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48 let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50 s.run(Instant::now(), false);
52 while s.not_shutdown() {
53 let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54 miopoll.poll(maxdur)?;
55 s.run(Instant::now(), false);
56 }
57
58 println!("Shutdown: {}", s.shutdown_reason().unwrap());
59 Ok(())
60}
61
62struct Listener {
64 children: ActorOwnSlab<Echoer>,
65 listener: MioSource<TcpListener>,
66 inactivity: MaxTimerKey,
67}
68
69impl Listener {
70 fn init(cx: CX![]) -> Option<Self> {
71 match Self::setup(cx) {
72 Err(e) => {
73 fail!(cx, "Listening socket setup failed on port {}: {}", PORT, e);
74 None
75 }
76 Ok(this) => Some(this),
77 }
78 }
79
80 fn setup(cx: CX![]) -> std::io::Result<Self> {
81 let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
82 let listen = TcpListener::bind(SocketAddr::V4(addr))?;
83 let miopoll = cx.anymap_get::<MioPoll>();
84 let listener = miopoll.add(
85 listen,
86 Interest::READABLE,
87 10,
88 fwd_to!([cx], connect() as (Ready)),
89 )?;
90 println!("Listening on port 7777 for incoming telnet connections ...");
91
92 let mut this = Self {
93 listener,
94 children: ActorOwnSlab::new(),
95 inactivity: MaxTimerKey::default(),
96 };
97 this.activity(cx);
98
99 Ok(this)
100 }
101
102 fn activity(&mut self, cx: CX![]) {
104 timer_max!(
105 &mut self.inactivity,
106 cx.now() + Duration::from_secs(60),
107 [cx],
108 |_this, cx| {
109 fail!(cx, "Timed out waiting for connection");
110 }
111 );
112 }
113
114 fn connect(&mut self, cx: CX![], _: Ready) {
115 loop {
116 match self.listener.accept() {
117 Ok((stream, addr)) => {
118 println!("New connection from {}", addr);
119 actor_in_slab!(
120 self.children,
121 cx,
122 Echoer::init(stream),
123 ret_some_to!([cx], |_this, cx, cause: StopCause| {
124 println!("Child actor terminated: {}", cause);
128
129 if let StopCause::Failed(e) = cause {
130 if e.downcast::<AbortError>().is_ok() {
131 fail!(cx, "Aborted");
132 }
133 }
134 })
135 );
136 self.activity(cx);
137 }
138 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
139 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
140 Err(e) => {
141 fail!(cx, "TCP listen socket failure on accept: {}", e);
142 return;
143 }
144 }
145 }
146 }
147}
148
149struct Echoer {
151 tcp: TcpStreamBuf,
152}
153
154impl Echoer {
155 fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
156 match Self::setup(cx, stream) {
157 Err(e) => {
158 fail!(cx, "Failed to set up a new TCP stream: {}", e);
159 None
160 }
161 Ok(this) => Some(this),
162 }
163 }
164
165 fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
166 let miopoll = cx.anymap_get::<MioPoll>();
167 let source = miopoll.add(
168 stream,
169 Interest::READABLE | Interest::WRITABLE,
170 10,
171 fwd_to!([cx], ready() as (Ready)),
172 )?;
173
174 let mut tcp = TcpStreamBuf::new();
175 tcp.init(source);
176
177 Ok(Self { tcp })
178 }
179
180 fn ready(&mut self, cx: CX![], ready: Ready) {
181 if ready.is_readable() {
182 loop {
183 match self.tcp.read(8192) {
184 ReadStatus::NewData => {
185 let data = self.tcp.inp[self.tcp.rd..self.tcp.wr].to_vec();
186 self.tcp.rd = self.tcp.wr;
187 self.check_special_chars(cx, &data);
188 after!(Duration::from_secs(1), [cx], send_data(data));
189 continue;
190 }
191 ReadStatus::WouldBlock => (),
192 ReadStatus::EndOfStream => {
193 after!(Duration::from_secs(1), [cx], send_eof());
194 }
195 ReadStatus::Error(e) => {
196 fail!(cx, "Read failure on TCP stream: {}", e);
197 }
198 }
199 break;
200 }
201 }
202
203 if ready.is_writable() {
204 self.flush(cx);
205 }
206 }
207
208 fn send_data(&mut self, cx: CX![], data: Vec<u8>) {
209 self.tcp.out.extend_from_slice(&data);
210 self.flush(cx);
211 }
212
213 fn send_eof(&mut self, cx: CX![]) {
214 self.tcp.out_eof = true;
215 self.flush(cx);
216 }
217
218 fn flush(&mut self, cx: CX![]) {
219 if let Err(e) = self.tcp.flush() {
220 fail!(cx, "Write failure on TCP stream: {}", e);
221 }
222 if self.tcp.out_eof && self.tcp.out.is_empty() {
223 stop!(cx); }
225 }
226
227 fn check_special_chars(&mut self, cx: CX![], data: &[u8]) {
228 if data.contains(&b'!') {
229 self.tcp.out_eof = true;
230 self.flush(cx);
231 }
232 if data.contains(&b'%') {
233 fail!(cx, AbortError);
234 }
235 }
236}
237
238#[derive(Debug)]
239struct AbortError;
240impl Error for AbortError {}
241impl fmt::Display for AbortError {
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 write!(f, "AbortError")
244 }
245}