embassy_socket/tcp_client/
mod.rs1use core::net::Ipv4Addr;
2use embassy_net::Stack;
3use embassy_net::tcp::State;
4use embassy_time::{Duration, Timer};
5use crate::channel::WriteChannel;
6use crate::connection::socket_state::SocketState;
7use crate::connection::TcpConnection;
8use crate::err::SocketResult;
9use crate::tcp_client::callback::TcpClientCallBack;
10
11pub mod callback;
12
13pub struct TcpClient<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpClientCallBack> {
15 stack: Stack<'d>,
17 state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>,
19 socket_timeout: Option<Duration>,
21 read_timeout: Duration,
23 ip: Ipv4Addr,
25 port: u16,
27 cb: CB,
29}
30
31impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize, CB: TcpClientCallBack> TcpClient<'d, N, TX_SZ, RX_SZ, BUF_SIZE, CB> {
33 #[inline]
35 pub fn new(stack: Stack<'d>, ip: Ipv4Addr, port: u16, cb: CB, state: &'d SocketState<N, TX_SZ, RX_SZ, BUF_SIZE>) -> Self {
36 Self { stack, state, socket_timeout: None, read_timeout: Duration::from_millis(100), ip, port, cb }
37 }
38
39 #[inline]
43 pub fn socket_timeout(&mut self, timeout: Option<Duration>) {
44 self.socket_timeout = timeout;
45 }
46
47 #[inline]
49 pub fn read_timeout(&mut self, timeout: Duration) {
50 self.read_timeout = timeout;
51 }
52
53 pub async fn run<const CH_N: usize>(&self, wch: &WriteChannel<'_, CH_N>) {
56 loop {
57 self.run_logic(wch).await;
58 self.cb.dis_conn().await;
59 }
60 }
61
62 async fn run_logic<const CH_N: usize>(&self, wch: &WriteChannel<'_, CH_N>) {
64 self.stack.wait_link_up().await;
66 self.stack.wait_config_up().await;
67
68 let mut conn = match self.try_conn().await {
69 Ok(conn) => conn,
70 Err(e) => {
71 self.cb.err(e).await;
72 return;
73 }
74 };
75
76 wch.enable().await;
77 self.cb.conn().await;
78 while !self.read_logic(&mut conn, wch).await {}
79 wch.disable().await;
80 }
81
82 async fn read_logic<const CH_N: usize>(&self, conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>, wch: &WriteChannel<'_, CH_N>) -> bool {
84 if !conn.socket.can_recv() {
85 if let Err(e) = self.write_logic(conn, wch).await { self.cb.err(e).await }
86 return matches!(conn.socket.state(), State::CloseWait|State::Closed);
87 }
88
89 match conn.try_read().await {
90 Ok(bytes) => {
91 self.cb.recv(bytes).await;
92 false
93 }
94 Err(e) => {
95 self.cb.err(e.into()).await;
96 true
97 }
98 }
99 }
100
101 async fn write_logic<const CH_N: usize>(
103 &self,
104 conn: &mut TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>,
105 wch: &WriteChannel<'_, CH_N>) -> SocketResult<()> {
106 if wch.is_empty().await {
108 Timer::after(self.read_timeout).await;
109 return Ok(());
110 }
111
112 wch.try_tcp_write(conn).await?;
113 Ok(())
114 }
115
116 async fn try_conn(&self) -> SocketResult<TcpConnection<'d, N, TX_SZ, RX_SZ, BUF_SIZE>> {
118 let mut socket = TcpConnection::new(self.stack, self.state)?;
119 socket.socket.set_timeout(self.socket_timeout);
120
121 socket.socket.connect((self.ip, self.port)).await?;
122 Ok(socket)
123 }
124}