embassy_socket/channel/
mod.rs

1use embassy_net::tcp;
2use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
3use embassy_sync::mutex::Mutex;
4use embassy_sync::zerocopy_channel::Channel;
5use embassy_time::Timer;
6use crate::channel::socket_msg::SocketMsg;
7use crate::connection::TcpConnection;
8
9pub mod socket_msg;
10
11/// socket write channel<br />
12/// N is channel len<br />
13pub struct WriteChannel<'d, const N: usize> {
14    /// channel
15    channel: Mutex<CriticalSectionRawMutex, Channel<'d, CriticalSectionRawMutex, SocketMsg<N>>>,
16    /// can send data
17    can_send: Mutex<CriticalSectionRawMutex, bool>,
18}
19
20/// custom method
21impl<'d, const N: usize> WriteChannel<'d, N> {
22    /// create write channel
23    #[inline]
24    pub fn new(buf: &'d mut [SocketMsg<N>]) -> Self {
25        Self { channel: Mutex::new(Channel::new(buf)), can_send: Mutex::new(false) }
26    }
27
28    /// channel is empty
29    #[inline]
30    pub async fn is_empty(&self) -> bool {
31        self.channel.lock().await.is_empty()
32    }
33
34    /// enable channel, allow channels to send data
35    #[inline]
36    pub async fn enable(&self) {
37        *self.can_send.lock().await = true;
38    }
39
40    /// disable channel, disable channel from sending data
41    pub async fn disable(&self) {
42        *self.can_send.lock().await = false;
43        self.channel.lock().await.clear();
44    }
45
46    /// try tcp write data
47    pub async fn try_tcp_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
48        &self,
49        conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>) -> Result<(), tcp::Error> {
50        let mut ch = self.channel.lock().await;
51        let mut recv = ch.split().1;
52        let msg = recv.receive().await;
53
54        if let Err(e) = Self::try_conn_write(conn, msg.as_bytes()).await {
55            recv.receive_done();
56            return Err(e);
57        }
58        recv.receive_done();
59        Ok(())
60    }
61
62    /// try conn write data
63    async fn try_conn_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
64        conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>, buf: &[u8]) -> Result<(), tcp::Error> {
65        conn.socket.write(buf).await?;
66        conn.socket.flush().await
67    }
68
69    #[inline]
70    pub async fn tcp_write<const CN: usize, const TX_SZ: usize, const RX_SZ: usize, const BUF_SIZE: usize>(
71        &self, conn: &mut TcpConnection<'_, CN, TX_SZ, RX_SZ, BUF_SIZE>) {
72        self.try_tcp_write(conn).await.ok();
73    }
74
75    /// send bytes data
76    pub async fn send_bytes(&self, bytes: &[u8]) {
77        if !*self.can_send.lock().await { return; }
78
79        // split send
80        let mut bytes_iter = bytes.chunks_exact(N);
81        for byte in bytes_iter.by_ref() {
82            loop {
83                if let Ok(mut msg) = self.channel.try_lock() {
84                    let mut sender = msg.split().0;
85                    let socket_msg = sender.send().await;
86                    socket_msg.bytes.copy_from_slice(byte);
87                    socket_msg.len = N;
88                    sender.send_done();
89                    drop(msg);
90                    break;
91                }
92                Timer::after_millis(100).await;
93            }
94        }
95
96        // send last bytes
97        let byte = bytes_iter.remainder();
98        if byte.is_empty() { return; }
99        let mut ch = self.channel.lock().await;
100        let mut sender = ch.split().0;
101        let msg = sender.send().await;
102        msg.bytes[..byte.len()].copy_from_slice(byte);
103        msg.len = byte.len();
104        sender.send_done();
105    }
106
107    /// send str data
108    #[inline]
109    pub async fn send_str(&self, s: &str) {
110        self.send_bytes(s.as_bytes()).await;
111    }
112}