embassy_socket/channel/
mod.rs1use embassy_net::tcp;
2use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
3use embassy_sync::mutex::Mutex;
4use embassy_sync::zerocopy_channel::Channel;
5use embassy_time::Timer;
6use crate::channel::socket_msg::SocketMsg;
7use crate::connection::TcpConnection;
8
9pub mod socket_msg;
10
11pub struct WriteChannel<'d, const N: usize> {
14 channel: Mutex<CriticalSectionRawMutex, Channel<'d, CriticalSectionRawMutex, SocketMsg<N>>>,
16 can_send: Mutex<CriticalSectionRawMutex, bool>,
18}
19
20impl<'d, const N: usize> WriteChannel<'d, N> {
22 #[inline]
24 pub fn new(buf: &'d mut [SocketMsg<N>]) -> Self {
25 Self { channel: Mutex::new(Channel::new(buf)), can_send: Mutex::new(false) }
26 }
27
28 #[inline]
30 pub async fn is_empty(&self) -> bool {
31 self.channel.lock().await.is_empty()
32 }
33
34 #[inline]
36 pub async fn enable(&self) {
37 *self.can_send.lock().await = true;
38 }
39
40 pub async fn disable(&self) {
42 *self.can_send.lock().await = false;
43 self.channel.lock().await.clear();
44 }
45
46 pub async fn try_tcp_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
48 &self,
49 conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>) -> Result<(), tcp::Error> {
50 let mut ch = self.channel.lock().await;
51 let mut recv = ch.split().1;
52 let msg = recv.receive().await;
53
54 if let Err(e) = Self::try_conn_write(conn, msg.as_bytes()).await {
55 recv.receive_done();
56 return Err(e);
57 }
58 recv.receive_done();
59 Ok(())
60 }
61
62 async fn try_conn_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
64 conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>, buf: &[u8]) -> Result<(), tcp::Error> {
65 conn.socket.write(buf).await?;
66 conn.socket.flush().await
67 }
68
69 #[inline]
70 pub async fn tcp_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
71 &self, conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>) {
72 self.try_tcp_write(conn).await.ok();
73 }
74
75 pub async fn send_bytes(&self, bytes: &[u8]) {
77 if !*self.can_send.lock().await { return; }
78
79 let mut bytes_iter = bytes.chunks_exact(N);
81 for byte in bytes_iter.by_ref() {
82 loop {
83 if let Ok(mut msg) = self.channel.try_lock() {
84 let mut sender = msg.split().0;
85 let socket_msg = sender.send().await;
86 socket_msg.bytes.copy_from_slice(byte);
87 socket_msg.len = N;
88 sender.send_done();
89 drop(msg);
90 break;
91 }
92 Timer::after_millis(100).await;
93 }
94 }
95
96 let byte = bytes_iter.remainder();
98 if byte.is_empty() { return; }
99 let mut ch = self.channel.lock().await;
100 let mut sender = ch.split().0;
101 let msg = sender.send().await;
102 msg.bytes[..byte.len()].copy_from_slice(byte);
103 msg.len = byte.len();
104 sender.send_done();
105 }
106
107 #[inline]
109 pub async fn send_str(&self, s: &str) {
110 self.send_bytes(s.as_bytes()).await;
111 }
112}