embassy_socket/tcp_server/
runner.rs

1use embassy_net::{IpEndpoint, Stack};
2use embassy_net::tcp::State;
3use embassy_time::{Duration, Timer};
4use crate::channel::WriteChannel;
5use crate::connection::socket_state::SocketState;
6use crate::connection::TcpConnection;
7use crate::err::{SocketErr, SocketResult};
8use crate::tcp_server::callback::TcpServerCallBack;
9
10/// single tcp server runner
11#[derive(Copy, Clone)]
12pub struct TcpServerRunner<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack> {
13    /// tcp stack
14    stack: Stack<'d>,
15    /// socket state, memory pool
16    state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>,
17    /// listener port
18    port: u16,
19    /// socket timeout
20    socket_timeout: Option<Duration>,
21    /// read data timeout, default is 100 milliseconds
22    read_timeout: Duration,
23    /// tcp callback
24    pub cb: &'d CB,
25}
26
27/// custom method
28impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpServerCallBack> TcpServerRunner<'d, N, TX_SZ, RX_SZ, BUF_SIZE, CB> {
29    /// create one runner
30    #[inline]
31    pub fn new(stack: Stack<'d>, state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>, port: u16, cb: &'d CB) -> Self {
32        Self { stack, state, port, socket_timeout: None, read_timeout: Duration::from_millis(100), cb }
33    }
34
35    /// set socket timeout<br />
36    /// connection timeout and etc. <br />
37    /// tt is recommended not to set or set to None
38    #[inline]
39    pub fn socket_timeout(&mut self, timeout: Option<Duration>) {
40        self.socket_timeout = timeout;
41    }
42
43    /// set read timeout
44    #[inline]
45    pub fn read_timeout(&mut self, timeout: Duration) {
46        self.read_timeout = timeout;
47    }
48
49    /// run tcp server
50    pub async fn run<const CN: usize>(&self, wch: &WriteChannel<'_, CN>) {
51        loop {
52            if let Err(e) = self.run_logic(wch).await {
53                self.cb.err(e).await;
54            }
55        }
56    }
57
58    /// run logic
59    async fn run_logic<const CN: usize>(&self, wch: &WriteChannel<'_, CN>) -> SocketResult<()> {
60        let mut conn = self.try_accept().await?;
61        let endpoint = conn.socket.remote_endpoint().ok_or_else(SocketErr::no_route)?;
62
63        wch.enable().await;
64        self.cb.conn(endpoint, wch).await;
65        while !self.read_logic(&mut conn, endpoint, wch).await {}
66        wch.disable().await;
67        self.cb.dis_conn(endpoint).await;
68        Ok(())
69    }
70
71    /// read logic
72    async fn read_logic<const CN: usize>(&self, conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>, endpoint: IpEndpoint, wch: &WriteChannel<'_, CN>) -> bool {
73        if !conn.socket.can_recv() {
74            if let Err(e) = self.write_logic(conn, wch).await { self.cb.err(e).await; }
75            return matches!(conn.socket.state(), State::CloseWait|State::Closed);
76        }
77
78        match conn.try_read().await {
79            Ok(bytes) => {
80                self.cb.recv(endpoint, bytes, wch).await;
81                false
82            }
83            Err(e) => {
84                self.cb.err(e.into()).await;
85                true
86            }
87        }
88    }
89
90    /// write logic
91    async fn write_logic<const CN: usize>(
92        &self,
93        conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>,
94        wch: &WriteChannel<'_, CN>) -> SocketResult<()> {
95        if wch.is_empty().await {
96            Timer::after(self.read_timeout).await;
97            return Ok(());
98        }
99
100        wch.try_tcp_write(conn).await?;
101        Ok(())
102    }
103
104    /// try accept connection
105    async fn try_accept(&self) -> SocketResult<TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>> {
106        self.stack.wait_link_up().await;
107        self.stack.wait_config_up().await;
108
109        let mut conn = TcpConnection::new(self.stack, self.state)?;
110        conn.socket.set_timeout(self.socket_timeout);
111        conn.socket.accept(self.port).await?;
112        Ok(conn)
113    }
114}