embassy_socket/tcp_server/
runner.rs

1use embassy_net::{IpEndpoint, Stack};
2use embassy_net::tcp::State;
3use embassy_time::{Duration, Timer};
4use crate::channel::WriteChannel;
5use crate::connection::socket_state::SocketState;
6use crate::connection::TcpConnection;
7use crate::err::{SocketErr, SocketResult};
8use crate::tcp_server::callback::TcpServerCallBack;
9
10/// single tcp server runner
11pub struct TcpServerRunner<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack> {
12    /// tcp stack
13    stack: Stack<'d>,
14    /// socket state, memory pool
15    state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>,
16    /// listener port
17    port: u16,
18    /// socket timeout
19    socket_timeout: Option<Duration>,
20    /// read data timeout, default is 100 milliseconds
21    read_timeout: Duration,
22    /// tcp callback
23    pub cb: &'d CB,
24}
25
26/// runner support Clone
27impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack> Clone for
28TcpServerRunner<'d, N, TX_SZ, RX_SZ, BUF_SIZE, CB> {
29    #[inline]
30    fn clone(&self) -> Self {
31        Self {
32            stack: self.stack,
33            state: self.state,
34            port: self.port,
35            socket_timeout: self.socket_timeout,
36            read_timeout: self.read_timeout,
37            cb: self.cb,
38        }
39    }
40}
41
42/// runner support Copy
43impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack> Copy for
44TcpServerRunner<'d, N, TX_SZ, RX_SZ, BUF_SIZE, CB> {}
45
46/// custom method
47impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack>
48TcpServerRunner<'d, N, TX_SZ, RX_SZ, BUF_SIZE, CB> {
49    /// create one runner
50    #[inline]
51    pub fn new(stack: Stack<'d>, state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>, port: u16, cb: &'d CB) -> Self {
52        Self { stack, state, port, socket_timeout: None, read_timeout: Duration::from_millis(100), cb }
53    }
54
55    /// set socket timeout<br />
56    /// connection timeout and etc. <br />
57    /// tt is recommended not to set or set to None
58    #[inline]
59    pub fn socket_timeout(&mut self, timeout: Option<Duration>) {
60        self.socket_timeout = timeout;
61    }
62
63    /// set read timeout
64    #[inline]
65    pub fn read_timeout(&mut self, timeout: Duration) {
66        self.read_timeout = timeout;
67    }
68
69    /// run tcp server
70    pub async fn run<const CN: usize>(&self, wch: &WriteChannel<'_, CN>, t: &mut CB::T) {
71        loop {
72            if let Err(e) = self.run_logic(wch, t).await {
73                self.cb.err(e, t).await;
74            }
75        }
76    }
77
78    /// run logic
79    async fn run_logic<const CN: usize>(&self, wch: &WriteChannel<'_, CN>, t: &mut CB::T) -> SocketResult<()> {
80        let mut conn = self.try_accept().await?;
81        let endpoint = conn.socket.remote_endpoint().ok_or_else(SocketErr::no_route)?;
82
83        wch.enable().await;
84        self.cb.conn(endpoint, wch, t).await;
85        while !self.read_logic(&mut conn, endpoint, wch, t).await {}
86        wch.disable().await;
87        self.cb.dis_conn(endpoint, t).await;
88        Ok(())
89    }
90
91    /// read logic
92    async fn read_logic<const CN: usize>(
93        &self,
94        conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>,
95        endpoint: IpEndpoint,
96        wch: &WriteChannel<'_, CN>,
97        t: &mut CB::T) -> bool {
98        if !conn.socket.can_recv() {
99            if let Err(e) = self.write_logic(conn, wch).await { self.cb.err(e, t).await; }
100            return matches!(conn.socket.state(), State::CloseWait|State::Closed);
101        }
102
103        match conn.try_read().await {
104            Ok(bytes) => {
105                self.cb.recv(endpoint, bytes, wch, t).await;
106                false
107            }
108            Err(e) => {
109                self.cb.err(e.into(), t).await;
110                true
111            }
112        }
113    }
114
115    /// write logic
116    async fn write_logic<const CN: usize>(
117        &self,
118        conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>,
119        wch: &WriteChannel<'_, CN>) -> SocketResult<()> {
120        if wch.is_empty().await {
121            Timer::after(self.read_timeout).await;
122            return Ok(());
123        }
124
125        wch.try_tcp_write(conn).await?;
126        Ok(())
127    }
128
129    /// try accept connection
130    async fn try_accept(&self) -> SocketResult<TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>> {
131        self.stack.wait_link_up().await;
132        self.stack.wait_config_up().await;
133
134        let mut conn = TcpConnection::new(self.stack, self.state)?;
135        conn.socket.set_timeout(self.socket_timeout);
136        conn.socket.accept(self.port).await?;
137        Ok(conn)
138    }
139}