use core::{
cell::OnceCell,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
ops::ControlFlow,
task::{Context, Poll},
};
use alloc::{boxed::Box, format, sync::Arc};
use embassy_executor::Spawner;
use embassy_net::{Stack, dns::DnsQueryType, udp::PacketMetadata};
use kdeconnect_proto::{
device::Device,
error::Result,
io::{
IoImpl, KnownFunctionName, TcpListenerImpl, TcpStreamImpl, TlsStreamImpl,
UdpSocketImpl,
rustls::{
ClientConfig, CommonState, ServerConfig,
client::UnbufferedClientConnection,
pki_types::ServerName,
server::UnbufferedServerConnection,
unbuffered::{AppDataRecord, ConnectionState, UnbufferedStatus, WriteTraffic},
},
},
};
use crate::slot_list::SlotList;
pub const MAX_SIMULTANEOUS_DEVICES: usize = const {
if let Some(str_value) = option_env!("KDECONNECT_MAX_SIMULTANEOUS_DEVICES") {
if let Ok(value) = usize::from_str_radix(str_value, 10) {
value
} else {
panic!("Failed to parse environment variable `KDECONNECT_MAX_SIMULTANEOUS_DEVICES`");
}
} else {
4
}
};
const UDP_META_BUFFER_SIZE: usize = 8;
const UDP_BUFFER_SIZE: usize = 512;
const TCP_BUFFER_SIZE: usize = 2500;
const TLS_BUFFER_SIZE: usize = 3000;
const NUM_UDP_CONNS: usize = 2 + MAX_SIMULTANEOUS_DEVICES;
static mut UDP_BUFFERS: SlotList<([u8; UDP_BUFFER_SIZE], [u8; UDP_BUFFER_SIZE]), NUM_UDP_CONNS> =
SlotList::new();
static mut UDP_META_BUFFERS: SlotList<
(
[PacketMetadata; UDP_META_BUFFER_SIZE],
[PacketMetadata; UDP_META_BUFFER_SIZE],
),
NUM_UDP_CONNS,
> = SlotList::new();
static mut TCP_BUFFERS: SlotList<
([u8; TCP_BUFFER_SIZE], [u8; TCP_BUFFER_SIZE]),
MAX_SIMULTANEOUS_DEVICES,
> = SlotList::new();
static mut TLS_BUFFERS: SlotList<
([u8; TLS_BUFFER_SIZE], [u8; TLS_BUFFER_SIZE]),
MAX_SIMULTANEOUS_DEVICES,
> = SlotList::new();
#[embassy_executor::task]
async fn exec_setup_udp(
device: Arc<Device<EmbassyIoImpl, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
kdeconnect_proto::transport::udp::setup_udp(device).await
}
#[embassy_executor::task]
async fn exec_setup_mdns(
device: Arc<Device<EmbassyIoImpl, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
kdeconnect_proto::transport::mdns::setup_mdns(device).await
}
#[embassy_executor::task]
async fn exec_setup_tcp(
device: Arc<Device<EmbassyIoImpl, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
kdeconnect_proto::transport::tcp::setup_tcp(device).await
}
#[embassy_executor::task(pool_size = MAX_SIMULTANEOUS_DEVICES)]
async fn exec_per_tcp_stream(
stream: TcpStream,
device: Arc<Device<EmbassyIoImpl, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
kdeconnect_proto::transport::tcp::per_tcp_stream(stream, device).await
}
fn encrypt<Data>(
buf: &[u8],
may_encrypt: &mut WriteTraffic<'_, Data>,
tls_tx_buffer: &mut [u8],
tls_tx_used: &mut usize,
) -> bool {
let written = may_encrypt
.encrypt(buf, &mut tls_tx_buffer[*tls_tx_used..])
.expect("encrypted packet does not fit in `outgoing_tls`");
*tls_tx_used += written;
written != 0
}
pub struct EmbassyIoImpl {
spawner: Spawner,
stack: Stack<'static>,
time_at_startup: OnceCell<u64>,
}
impl EmbassyIoImpl {
pub fn new(spawner: Spawner, stack: Stack<'static>) -> Self {
Self {
spawner,
stack,
time_at_startup: OnceCell::new(),
}
}
}
impl IoImpl<UdpSocket, TcpStream, TcpListener, TlsStream> for EmbassyIoImpl {
async fn bind_udp(&self, addr: SocketAddr) -> Result<UdpSocket> {
let mut socket = UdpSocket::new(self.stack).ok_or("no udp connection slot remaining")?;
let mut endpoint = embassy_net::IpListenEndpoint::from(addr);
if addr.ip() == Ipv4Addr::UNSPECIFIED || addr.ip() == Ipv6Addr::UNSPECIFIED {
endpoint.addr = None;
}
socket.inner.bind(endpoint).unwrap();
Ok(socket)
}
async fn bind_udp_reuse_v4(&self, addr: SocketAddr) -> Result<UdpSocket> {
let mut socket = UdpSocket::new(self.stack).ok_or("no udp connection slot remaining")?;
let mut endpoint = embassy_net::IpListenEndpoint::from(addr);
if addr.ip() == Ipv4Addr::UNSPECIFIED || addr.ip() == Ipv6Addr::UNSPECIFIED {
endpoint.addr = None;
}
socket.inner.bind(endpoint).unwrap();
Ok(socket)
}
async fn bind_udp_reuse_multicast_v4(
&self,
addr: SocketAddr,
multicast_addr: (Ipv4Addr, Ipv4Addr),
) -> Result<UdpSocket> {
self.stack.join_multicast_group(multicast_addr.0).unwrap();
let mut socket = UdpSocket::new(self.stack).ok_or("no udp connection slot remaining")?;
let mut endpoint = embassy_net::IpListenEndpoint::from(addr);
if addr.ip() == Ipv4Addr::UNSPECIFIED || addr.ip() == Ipv6Addr::UNSPECIFIED {
endpoint.addr = None;
}
socket.inner.bind(endpoint).unwrap();
Ok(socket)
}
async fn listen_tcp(&self, addr: SocketAddr) -> Result<TcpListener> {
Ok(TcpListener::new(addr, self.stack))
}
async fn connect_tcp(&self, addr: SocketAddr) -> Result<TcpStream> {
let mut stream = TcpStream::new(self.stack).ok_or("no tcp connection slot remaining")?;
stream
.inner
.connect(embassy_net::IpEndpoint::from(addr))
.await
.unwrap();
Ok(stream)
}
async fn accept_server_tls(
&self,
config: ServerConfig,
stream: TcpStream,
) -> Result<TlsStream> {
Ok(TlsStream::new(
UnbufferedConnection::Server(
UnbufferedServerConnection::new(Arc::new(config)).unwrap(),
),
stream,
)
.ok_or("no tls connection slot remaining")?)
}
async fn connect_client_tls(
&self,
config: ClientConfig,
server_name: ServerName<'static>,
stream: TcpStream,
) -> Result<TlsStream> {
Ok(TlsStream::new(
UnbufferedConnection::Client(
UnbufferedClientConnection::new(Arc::new(config), server_name).unwrap(),
),
stream,
)
.ok_or("no tls connection slot remaining")?)
}
async fn get_host_addresses(&self) -> (Option<Ipv4Addr>, Option<Ipv6Addr>) {
(
self.stack.config_v4().map(|c| c.address.address()),
self.stack.config_v6().map(|c| c.address.address()),
)
}
async fn sleep(&self, duration: core::time::Duration) {
embassy_time::Timer::after_millis(duration.as_millis() as u64).await;
}
fn spawn(
&self,
name: KnownFunctionName<TcpStream>,
device: Arc<Device<Self, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
match name {
KnownFunctionName::SetupUdp => {
self.spawner
.spawn(exec_setup_udp(device))
.expect("failed to spawn setup udp task");
}
KnownFunctionName::SetupMdns => {
self.spawner
.spawn(exec_setup_mdns(device))
.expect("failed to spawn setup mdns task");
}
KnownFunctionName::PerTcpStream(stream) => {
self.spawner
.spawn(exec_per_tcp_stream(stream, device))
.expect("failed to spawn per tcp stream task");
}
};
}
fn start(
&self,
device: alloc::sync::Arc<Device<Self, UdpSocket, TcpStream, TcpListener, TlsStream>>,
) {
self.spawner
.spawn(exec_setup_tcp(device))
.expect("failed to spawn setup tcp task");
}
async fn get_current_timestamp(&self) -> u64 {
const NTP_SERVER_HOST: &str = "pool.ntp.org";
const NTP_SERVER_PORT: u16 = 123;
const NTP_TIMESTAMP_DELTA: u32 = 2_208_988_800;
#[derive(PartialEq, Eq, Debug, Default, Clone)]
#[repr(C)]
struct NtpPacket {
meta: u8,
stratum: u8,
poll: i8,
precision: i8,
root_delay: u32,
root_dispersion: u32,
reference_id: u32,
reference_timestamp: u64,
origin_timestamp: u64,
receive_timestamp: u64,
transmit_timestamp: u64,
}
impl NtpPacket {
pub fn with_leap(mut self, leap: u8) -> Self {
assert!(leap <= 0b11, "leap indicator should be two bytes at most");
self.meta |= leap << 6;
self
}
pub fn with_version(mut self, version: u8) -> Self {
assert!(version <= 0b111, "version should be three bytes at most");
self.meta |= version << 3;
self
}
pub fn with_mode(mut self, mode: u8) -> Self {
assert!(mode <= 0b111, "mode should be three bytes at most");
self.meta |= mode;
self
}
pub fn get_timestamp_seconds(&self) -> u32 {
(self.transmit_timestamp >> 32) as u32 - NTP_TIMESTAMP_DELTA
}
pub fn deserialize(buf: [u8; 48]) -> Self {
Self {
meta: buf[0],
stratum: buf[1],
poll: buf[2] as i8,
precision: buf[3] as i8,
root_delay: u32::from_be_bytes(buf[4..8].try_into().unwrap()),
root_dispersion: u32::from_be_bytes(buf[8..12].try_into().unwrap()),
reference_id: u32::from_be_bytes(buf[12..16].try_into().unwrap()),
reference_timestamp: u64::from_be_bytes(buf[16..24].try_into().unwrap()),
origin_timestamp: u64::from_be_bytes(buf[24..32].try_into().unwrap()),
receive_timestamp: u64::from_be_bytes(buf[32..40].try_into().unwrap()),
transmit_timestamp: u64::from_be_bytes(buf[40..48].try_into().unwrap()),
}
}
pub fn serialize(self) -> [u8; 48] {
let mut tmp_buf = [0u8; size_of::<NtpPacket>()];
tmp_buf[0] = self.meta;
tmp_buf[1] = self.stratum;
tmp_buf[2] = self.poll as u8;
tmp_buf[3] = self.precision as u8;
tmp_buf[4..8].copy_from_slice(&self.root_delay.to_be_bytes());
tmp_buf[8..12].copy_from_slice(&self.root_dispersion.to_be_bytes());
tmp_buf[12..16].copy_from_slice(&self.reference_id.to_be_bytes());
tmp_buf[16..24].copy_from_slice(&self.reference_timestamp.to_be_bytes());
tmp_buf[24..32].copy_from_slice(&self.origin_timestamp.to_be_bytes());
tmp_buf[32..40].copy_from_slice(&self.receive_timestamp.to_be_bytes());
tmp_buf[40..48].copy_from_slice(&self.transmit_timestamp.to_be_bytes());
tmp_buf
}
}
let time_at_startup = if let Some(time_at_startup) = self.time_at_startup.get() {
*time_at_startup
} else {
let mut rx_meta = [PacketMetadata::EMPTY; 16];
let mut rx_buffer = [0; 4096];
let mut tx_meta = [PacketMetadata::EMPTY; 16];
let mut tx_buffer = [0; 4096];
let mut socket = embassy_net::udp::UdpSocket::new(
self.stack,
&mut rx_meta,
&mut rx_buffer,
&mut tx_meta,
&mut tx_buffer,
);
socket.bind(0).unwrap();
let ntp_addrs = self
.stack
.dns_query(NTP_SERVER_HOST, DnsQueryType::A)
.await
.expect("failed to resolve DNS to connect to the NTP server");
if ntp_addrs.is_empty() {
log::error!("failed to resolve DNS to connect to the NTP server");
return 0;
}
let addr = SocketAddr::from((IpAddr::from(ntp_addrs[0]), NTP_SERVER_PORT));
let buf = NtpPacket::default()
.with_leap(0)
.with_version(4)
.with_mode(3)
.serialize();
socket.send_to(&buf, addr).await.unwrap();
let mut buf = [0; 48];
socket.recv_from(&mut buf).await.unwrap();
let packet = NtpPacket::deserialize(buf);
let timestamp = packet.get_timestamp_seconds();
log::info!("NTP server successfully queried, current timestamp: {timestamp}");
let time_at_startup = timestamp as u64 - embassy_time::Instant::now().as_secs();
let _ = self.time_at_startup.set(time_at_startup);
time_at_startup
};
time_at_startup + embassy_time::Instant::now().as_secs()
}
}
pub struct UdpSocket {
inner: embassy_net::udp::UdpSocket<'static>,
udp_buffer_index: usize,
udp_meta_buffer_index: usize,
}
impl UdpSocket {
fn new(stack: Stack<'static>) -> Option<Self> {
let Some((udp_buffer_index, (udp_rx_buffer, udp_tx_buffer))) = (unsafe {
#[allow(static_mut_refs)]
UDP_BUFFERS.insert_in_free_slot(([0; UDP_BUFFER_SIZE], [0; UDP_BUFFER_SIZE]))
}) else {
log::warn!(
"no udp connection slot remaining: the maximum number of simultaneously connected devices is reached"
);
return None;
};
let Some((udp_meta_buffer_index, (udp_rx_meta_buffer, udp_tx_meta_buffer))) = (unsafe {
#[allow(static_mut_refs)]
UDP_META_BUFFERS.insert_in_free_slot((
[PacketMetadata::EMPTY; UDP_META_BUFFER_SIZE],
[PacketMetadata::EMPTY; UDP_META_BUFFER_SIZE],
))
}) else {
log::warn!(
"no udp metadata connection slot remaining: the maximum number of simultaneously connected devices is reached"
);
return None;
};
let inner = embassy_net::udp::UdpSocket::new(
stack,
udp_rx_meta_buffer,
udp_rx_buffer,
udp_tx_meta_buffer,
udp_tx_buffer,
);
Some(Self {
inner,
udp_buffer_index,
udp_meta_buffer_index,
})
}
}
impl Drop for UdpSocket {
fn drop(&mut self) {
#[allow(static_mut_refs)]
unsafe {
UDP_BUFFERS.free_slot(self.udp_buffer_index);
UDP_META_BUFFERS.free_slot(self.udp_meta_buffer_index);
}
}
}
impl UdpSocketImpl for UdpSocket {
fn set_broadcast(&self, _on: bool) -> Result<()> {
Ok(())
}
fn poll_recv(&self, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<()>> {
match self.inner.poll_recv_from(buf, cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(format!("{e:?}").into())),
Poll::Pending => Poll::Pending,
}
}
async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
match self.inner.recv_from(buf).await {
Ok(r) => {
let endpoint = r.1.endpoint;
let ip = match endpoint.addr {
embassy_net::IpAddress::Ipv4(ipv4_addr) => IpAddr::V4(ipv4_addr),
embassy_net::IpAddress::Ipv6(ipv6_addr) => IpAddr::V6(ipv6_addr),
};
Ok((r.0, SocketAddr::new(ip, endpoint.port)))
}
Err(e) => Err(format!("{e:?}").into()),
}
}
async fn send_to(&mut self, buf: &[u8], addr: SocketAddr) -> Result<usize> {
match self.inner.send_to(buf, addr).await {
Ok(_) => {
self.inner.flush().await;
Ok(buf.len())
}
Err(e) => Err(format!("{e:?}").into()),
}
}
}
pub struct TcpStream {
inner: embassy_net::tcp::TcpSocket<'static>,
tcp_buffer_index: usize,
}
impl TcpStream {
pub fn new(stack: Stack<'static>) -> Option<Self> {
let Some((tcp_buffer_index, (tcp_rx_buffer, tcp_tx_buffer))) = (unsafe {
#[allow(static_mut_refs)]
TCP_BUFFERS.insert_in_free_slot(([0; TCP_BUFFER_SIZE], [0; TCP_BUFFER_SIZE]))
}) else {
log::warn!(
"no tcp connection slot remaining: the maximum number of simultaneously connected devices is reached"
);
return None;
};
let inner = embassy_net::tcp::TcpSocket::new(stack, tcp_rx_buffer, tcp_tx_buffer);
Some(Self {
inner,
tcp_buffer_index,
})
}
}
impl Drop for TcpStream {
fn drop(&mut self) {
#[allow(static_mut_refs)]
unsafe {
TCP_BUFFERS.free_slot(self.tcp_buffer_index);
}
}
}
impl TcpStreamImpl for TcpStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.inner
.read(buf)
.await
.map_err(|e| format!("{e:?}").into())
}
async fn writable(&self) -> Result<()> {
self.inner.wait_write_ready().await;
Ok(())
}
async fn write_all(&mut self, src: &[u8]) -> Result<()> {
match self.inner.write(src).await {
Ok(_) => Ok(()),
Err(e) => Err(format!("{e:?}").into()),
}
}
}
enum UnbufferedConnection {
Client(UnbufferedClientConnection),
Server(UnbufferedServerConnection),
}
#[allow(clippy::too_many_arguments)]
async fn handle_tls_state<T>(
discard: &mut usize,
state: ConnectionState<'_, '_, T>,
tls_tx_buffer: &mut [u8],
tls_tx_used: &mut usize,
tls_rx_buffer: &mut [u8],
tls_rx_used: &mut usize,
socket: &mut embassy_net::tcp::TcpSocket<'static>,
mut buf_read: Option<&mut [u8]>,
buf_write: Option<&[u8]>,
) -> ControlFlow<Result<usize>> {
match state {
ConnectionState::EncodeTlsData(mut state) => {
let len = state.encode(&mut tls_tx_buffer[*tls_tx_used..]).unwrap();
*tls_tx_used += len;
}
ConnectionState::TransmitTlsData(state) => {
match socket.write(&tls_tx_buffer[..*tls_tx_used]).await {
Ok(_) => *tls_tx_used = 0,
Err(e) => return ControlFlow::Break(Err(Box::new(e))),
}
state.done();
}
ConnectionState::BlockedHandshake { .. } => {
match socket.read(tls_rx_buffer).await {
Ok(read) => *tls_rx_used += read,
Err(e) => return ControlFlow::Break(Err(Box::new(e))),
}
}
ConnectionState::ReadTraffic(mut state) => {
let mut total_len = 0;
while let Some(res) = state.next_record() {
let AppDataRecord {
discard: new_discard,
payload,
..
} = res.unwrap();
*discard += new_discard;
if let Some(buf) = buf_read.as_mut() {
let len = payload.len();
buf[total_len..total_len + len].copy_from_slice(payload);
total_len += len;
}
}
if buf_read.is_some() {
return ControlFlow::Break(Ok(total_len));
} else {
log::warn!("Ready to read traffic but no read buffer given");
}
}
ConnectionState::WriteTraffic(mut may_encrypt) => {
if let Some(buf) = buf_write
&& encrypt(buf, &mut may_encrypt, tls_tx_buffer, tls_tx_used)
{
match socket.write(&tls_tx_buffer[..*tls_tx_used]).await {
Ok(n) => {
*tls_tx_used = 0;
return ControlFlow::Break(Ok(n));
}
Err(e) => return ControlFlow::Break(Err(Box::new(e))),
}
}
if buf_read.is_some() {
match socket.read(tls_rx_buffer).await {
Ok(read) => {
if read == 0 {
return ControlFlow::Break(Ok(0));
}
*tls_rx_used += read;
}
Err(e) => return ControlFlow::Break(Err(Box::new(e))),
}
}
}
ConnectionState::Closed | ConnectionState::PeerClosed => {
return ControlFlow::Break(Ok(0));
}
_ => log::warn!("Unhandled TLS state: {state:?}"),
}
ControlFlow::Continue(())
}
pub struct TlsStream {
inner: UnbufferedConnection,
stream: TcpStream,
tls_buffer_index: usize,
tls_rx_buffer: &'static mut [u8; TLS_BUFFER_SIZE],
tls_tx_buffer: &'static mut [u8; TLS_BUFFER_SIZE],
tls_rx_used: usize,
tls_tx_used: usize,
}
impl TlsStream {
fn new(inner: UnbufferedConnection, stream: TcpStream) -> Option<Self> {
let Some((tls_buffer_index, (tls_rx_buffer, tls_tx_buffer))) = (unsafe {
#[allow(static_mut_refs)]
TLS_BUFFERS.insert_in_free_slot(([0; TLS_BUFFER_SIZE], [0; TLS_BUFFER_SIZE]))
}) else {
log::warn!(
"no tls connection slot remaining: the maximum number of simultaneously connected devices is reached"
);
return None;
};
Some(Self {
inner,
stream,
tls_buffer_index,
tls_rx_buffer,
tls_tx_buffer,
tls_rx_used: 0,
tls_tx_used: 0,
})
}
}
impl Drop for TlsStream {
fn drop(&mut self) {
#[allow(static_mut_refs)]
unsafe {
TLS_BUFFERS.free_slot(self.tls_buffer_index);
}
}
}
impl TlsStreamImpl for TlsStream {
fn get_common_state(&self) -> &CommonState {
match &self.inner {
UnbufferedConnection::Client(conn) => conn as &CommonState,
UnbufferedConnection::Server(conn) => conn as &CommonState,
}
}
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
loop {
let (tls_rx_buffer_used, tls_rx_buffer_free) =
self.tls_rx_buffer.split_at_mut(self.tls_rx_used);
let (res, discard) = match &mut self.inner {
UnbufferedConnection::Client(conn) => {
let UnbufferedStatus {
mut discard, state, ..
} = conn.process_tls_records(tls_rx_buffer_used);
let res = handle_tls_state(
&mut discard,
state.unwrap(),
self.tls_tx_buffer,
&mut self.tls_tx_used,
tls_rx_buffer_free,
&mut self.tls_rx_used,
&mut self.stream.inner,
Some(buf),
None,
)
.await;
(res, discard)
}
UnbufferedConnection::Server(conn) => {
let UnbufferedStatus {
mut discard, state, ..
} = conn.process_tls_records(tls_rx_buffer_used);
let res = handle_tls_state(
&mut discard,
state.unwrap(),
self.tls_tx_buffer,
&mut self.tls_tx_used,
tls_rx_buffer_free,
&mut self.tls_rx_used,
&mut self.stream.inner,
Some(buf),
None,
)
.await;
(res, discard)
}
};
if discard != 0 {
assert!(discard <= self.tls_rx_used);
self.tls_rx_buffer.copy_within(discard..self.tls_rx_used, 0);
self.tls_rx_used -= discard;
}
if let ControlFlow::Break(v) = res {
break v;
}
embassy_time::Timer::after_millis(100).await;
}
}
async fn write_all(&mut self, src: &[u8]) -> Result<()> {
loop {
let (tls_rx_buffer_used, tls_rx_buffer_free) =
self.tls_rx_buffer.split_at_mut(self.tls_rx_used);
let (res, discard) = match &mut self.inner {
UnbufferedConnection::Client(conn) => {
let UnbufferedStatus {
mut discard, state, ..
} = conn.process_tls_records(tls_rx_buffer_used);
let res = handle_tls_state(
&mut discard,
state.unwrap(),
self.tls_tx_buffer,
&mut self.tls_tx_used,
tls_rx_buffer_free,
&mut self.tls_rx_used,
&mut self.stream.inner,
None,
Some(src),
)
.await;
(res, discard)
}
UnbufferedConnection::Server(conn) => {
let UnbufferedStatus {
mut discard, state, ..
} = conn.process_tls_records(tls_rx_buffer_used);
let res = handle_tls_state(
&mut discard,
state.unwrap(),
self.tls_tx_buffer,
&mut self.tls_tx_used,
tls_rx_buffer_free,
&mut self.tls_rx_used,
&mut self.stream.inner,
None,
Some(src),
)
.await;
(res, discard)
}
};
if discard != 0 {
assert!(discard <= self.tls_rx_used);
self.tls_rx_buffer.copy_within(discard..self.tls_rx_used, 0);
self.tls_rx_used -= discard;
}
if let ControlFlow::Break(_) = res {
break Ok(());
}
embassy_time::Timer::after_millis(100).await;
}
}
}
pub struct TcpListener {
addr: SocketAddr,
stack: Stack<'static>,
}
impl TcpListener {
pub fn new(addr: SocketAddr, stack: Stack<'static>) -> Self {
Self { addr, stack }
}
}
impl TcpListenerImpl<TcpStream> for TcpListener {
async fn accept(&self) -> Result<TcpStream> {
let mut stream = TcpStream::new(self.stack).ok_or("no tcp connection slot remaining")?;
let mut endpoint = embassy_net::IpListenEndpoint::from(self.addr);
if self.addr.ip() == Ipv4Addr::UNSPECIFIED || self.addr.ip() == Ipv6Addr::UNSPECIFIED {
endpoint.addr = None;
}
stream.inner.accept(endpoint).await.unwrap();
Ok(stream)
}
}