time/server/
tcp.rs

1//! # TCP binder
2//!
3//! This module contains the implementation of the TCP server binder,
4//! based on [`tokio::net::TcpStream`].
5
6use async_trait::async_trait;
7use log::debug;
8use std::io;
9use tokio::{
10    io::{AsyncBufReadExt, AsyncWriteExt},
11    net::TcpListener,
12};
13
14use crate::{
15    request::{Request, RequestReader},
16    response::{Response, ResponseWriter},
17    tcp::TcpHandler,
18    timer::ThreadSafeTimer,
19};
20
21use super::{ServerBind, ServerStream};
22
23/// The TCP server binder.
24///
25/// This [`ServerBind`]er uses the TCP protocol to bind a listener, to
26/// read requests and write responses.
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct TcpBind {
29    /// The TCP host of the listener.
30    pub host: String,
31
32    /// The TCP port of the listener.
33    pub port: u16,
34}
35
36impl TcpBind {
37    /// Create a new TCP binder using the given host and port.
38    pub fn new(host: impl ToString, port: u16) -> Box<dyn ServerBind> {
39        Box::new(Self {
40            host: host.to_string(),
41            port,
42        })
43    }
44}
45
46#[async_trait]
47impl ServerBind for TcpBind {
48    async fn bind(&self, timer: ThreadSafeTimer) -> io::Result<()> {
49        let listener = TcpListener::bind((self.host.as_str(), self.port)).await?;
50
51        loop {
52            match listener.accept().await {
53                Ok((stream, _)) => {
54                    let mut handler = TcpHandler::from(stream);
55                    if let Err(err) = handler.handle(timer.clone()).await {
56                        debug!("cannot handle request");
57                        debug!("{err:?}");
58                    }
59                }
60                Err(err) => {
61                    debug!("cannot get stream from client");
62                    debug!("{err:?}");
63                }
64            }
65        }
66    }
67}
68
69#[async_trait]
70impl RequestReader for TcpHandler {
71    async fn read(&mut self) -> io::Result<Request> {
72        let mut req = String::new();
73        self.reader.read_line(&mut req).await?;
74
75        let mut tokens = req.split_whitespace();
76        match tokens.next() {
77            Some("start") => Ok(Request::Start),
78            Some("get") => Ok(Request::Get),
79            Some("set") => match tokens.next().map(|duration| duration.parse::<usize>()) {
80                Some(Ok(duration)) => Ok(Request::Set(duration)),
81                Some(Err(err)) => Err(io::Error::new(
82                    io::ErrorKind::InvalidInput,
83                    format!("invalid duration: {err}"),
84                )),
85                None => Err(io::Error::new(
86                    io::ErrorKind::InvalidInput,
87                    "missing duration".to_owned(),
88                )),
89            },
90            Some("pause") => Ok(Request::Pause),
91            Some("resume") => Ok(Request::Resume),
92            Some("stop") => Ok(Request::Stop),
93            Some(req) => Err(io::Error::new(
94                io::ErrorKind::InvalidInput,
95                format!("invalid request: {req}"),
96            )),
97            None => Err(io::Error::new(
98                io::ErrorKind::InvalidInput,
99                "missing request".to_owned(),
100            )),
101        }
102    }
103}
104
105#[async_trait]
106impl ResponseWriter for TcpHandler {
107    async fn write(&mut self, res: Response) -> io::Result<()> {
108        let res = match res {
109            Response::Ok => format!("ok\n"),
110            Response::Timer(timer) => {
111                format!("timer {}\n", serde_json::to_string(&timer).unwrap())
112            }
113        };
114
115        self.writer.write_all(res.as_bytes()).await?;
116
117        Ok(())
118    }
119}