use core::sync::atomic::{AtomicU8, Ordering};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::Channel;
use heapless::Vec;
use crate::at::processor::{AtProcessor, SocketEvent};
use crate::bus::SpiTransport;
use crate::error::{Error, Result};
use crate::sync::TmMutex;
use crate::types::*;
#[derive(Debug)]
pub struct Socket {
pub state: TmMutex<SocketState>,
pub protocol: TmMutex<Option<SocketProtocol>>,
pub rx_buffer: TmMutex<Vec<u8, 2048>>,
}
impl Socket {
pub const fn new() -> Self {
Self {
state: TmMutex::new(SocketState::Free),
protocol: TmMutex::new(None),
rx_buffer: TmMutex::new(Vec::new()),
}
}
pub async fn allocate(&self, protocol: SocketProtocol) -> Result<()> {
let mut state = self.state.lock().await;
if *state != SocketState::Free {
return Err(Error::SocketInUse);
}
*state = SocketState::Allocated;
let mut proto = self.protocol.lock().await;
*proto = Some(protocol);
Ok(())
}
pub async fn free(&self) -> Result<()> {
let mut state = self.state.lock().await;
*state = SocketState::Free;
let mut proto = self.protocol.lock().await;
*proto = None;
let mut buf = self.rx_buffer.lock().await;
buf.clear();
Ok(())
}
pub async fn get_state(&self) -> SocketState {
let state = self.state.lock().await;
*state
}
pub async fn set_state(&self, new_state: SocketState) {
let mut state = self.state.lock().await;
*state = new_state;
}
pub async fn append_rx_data(&self, data: &[u8]) -> Result<usize> {
let mut buffer = self.rx_buffer.lock().await;
let mut bytes_written = 0;
for &byte in data {
if buffer.push(byte).is_err() {
break;
}
bytes_written += 1;
}
Ok(bytes_written)
}
pub async fn read_rx_data(&self, dest: &mut [u8]) -> Result<usize> {
let mut buffer = self.rx_buffer.lock().await;
let to_read = core::cmp::min(dest.len(), buffer.len());
dest[..to_read].copy_from_slice(&buffer[..to_read]);
let remaining = buffer.len() - to_read;
if remaining > 0 {
for i in 0..remaining {
buffer[i] = buffer[i + to_read];
}
}
buffer.truncate(remaining);
Ok(to_read)
}
pub async fn available_rx_bytes(&self) -> usize {
let buffer = self.rx_buffer.lock().await;
buffer.len()
}
pub async fn clear_rx_buffer(&self) {
let mut buffer = self.rx_buffer.lock().await;
buffer.clear();
}
}
pub struct NetworkDevice {
sockets: [Socket; MAX_SOCKETS],
processor: &'static AtProcessor,
link_state: AtomicU8,
}
impl NetworkDevice {
pub const fn new(processor: &'static AtProcessor) -> Self {
const SOCKET: Socket = Socket::new();
Self {
sockets: [SOCKET; MAX_SOCKETS],
processor,
link_state: AtomicU8::new(0),
}
}
pub async fn allocate_socket(&self, protocol: SocketProtocol) -> Result<SocketId> {
for (id, socket) in self.sockets.iter().enumerate() {
if socket.allocate(protocol).await.is_ok() {
return Ok(SocketId::new(id as u8).unwrap());
}
}
Err(Error::NoSocketAvailable)
}
pub async fn free_socket(&self, id: SocketId) -> Result<()> {
if id.raw() >= MAX_SOCKETS as u8 {
return Err(Error::InvalidSocket);
}
self.sockets[id.raw() as usize].free().await
}
pub fn get_socket(&self, id: SocketId) -> Result<&Socket> {
if id.raw() >= MAX_SOCKETS as u8 {
return Err(Error::InvalidSocket);
}
Ok(&self.sockets[id.raw() as usize])
}
pub fn set_link_state(&self, up: bool) {
self.link_state
.store(if up { 1 } else { 0 }, Ordering::Relaxed);
}
pub fn is_link_up(&self) -> bool {
self.link_state.load(Ordering::Relaxed) != 0
}
pub fn socket_event_receiver(&self) -> &Channel<CriticalSectionRawMutex, SocketEvent, 16> {
self.processor.socket_event_receiver()
}
pub async fn connect_socket<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
id: SocketId,
host: &str,
port: u16,
timeout: embassy_time::Duration,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let socket = self.get_socket(id)?;
let protocol = {
let proto = socket.protocol.lock().await;
proto.ok_or(Error::InvalidSocket)?
};
socket.set_state(SocketState::Connecting).await;
let cmd = crate::at::command::network::connect(id.raw(), protocol, host, port)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), timeout)
.await?;
if response != crate::at::AtResponse::Ok {
socket.set_state(SocketState::Allocated).await;
return Err(Error::ConnectionFailed);
}
socket.set_state(SocketState::Connected).await;
Ok(())
}
pub async fn send_socket<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
id: SocketId,
data: &[u8],
timeout: embassy_time::Duration,
) -> Result<usize>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let socket = self.get_socket(id)?;
if socket.get_state().await != SocketState::Connected {
return Err(Error::NotConnected);
}
let cmd = crate::at::command::network::send(id.raw(), data.len())?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), timeout)
.await?;
if response != crate::at::AtResponse::ReadyPrompt {
return Err(Error::SocketError);
}
{
let mut spi_guard = spi.lock().await;
spi_guard.write(data).await?;
}
Ok(data.len())
}
pub async fn close_socket<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
id: SocketId,
timeout: embassy_time::Duration,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let socket = self.get_socket(id)?;
socket.set_state(SocketState::Closing).await;
let cmd = crate::at::command::network::close(id.raw())?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::SocketError);
}
socket.free().await?;
Ok(())
}
pub async fn receive_socket<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
id: SocketId,
buffer: &mut [u8],
timeout: embassy_time::Duration,
) -> Result<usize>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let socket = self.get_socket(id)?;
let state = socket.get_state().await;
if state != SocketState::Connected {
return Err(Error::NotConnected);
}
let available = socket.available_rx_bytes().await;
if available > 0 {
return socket.read_rx_data(buffer).await;
}
let length_to_request = core::cmp::min(buffer.len(), 2048);
let cmd = crate::at::command::network::receive(id.raw(), length_to_request)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), timeout)
.await?;
if let crate::at::AtResponse::Data { prefix, content: _ } = response {
if prefix.as_str() == "+CIPRECV" {
return Ok(0);
}
}
Ok(0)
}
pub async fn receive_socket_buffered(&self, id: SocketId, buffer: &mut [u8]) -> Result<usize> {
let socket = self.get_socket(id)?;
let state = socket.get_state().await;
if state != SocketState::Connected {
return Err(Error::NotConnected);
}
socket.read_rx_data(buffer).await
}
pub async fn available_bytes(&self, id: SocketId) -> Result<usize> {
let socket = self.get_socket(id)?;
Ok(socket.available_rx_bytes().await)
}
pub async fn handle_received_data(&self, link_id: u8, data: &[u8]) -> Result<()> {
if let Some(id) = SocketId::new(link_id) {
let socket = self.get_socket(id)?;
let bytes_written = socket.append_rx_data(data).await?;
if bytes_written < data.len() {
}
Ok(())
} else {
Err(Error::InvalidSocket)
}
}
pub async fn ipd_processor_task(&'static self) {
let ipd_channel = self.processor.ipd_data_receiver();
loop {
let ipd_data = ipd_channel.receive().await;
let _ = self
.handle_received_data(ipd_data.link_id, &ipd_data.data)
.await;
}
}
pub async fn get_connection_status<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
timeout: embassy_time::Duration,
) -> Result<ConnectionStatus>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let cmd = crate::at::command::network::get_status()?;
let (slot, slot_idx) = self
.processor
.send_multi_response_command(spi, cmd.as_bytes())
.await?;
let status = ConnectionStatus::default();
let status_timeout = embassy_time::Instant::now() + timeout;
loop {
if embassy_time::Instant::now() > status_timeout {
self.processor.release_multi_response_slot(slot_idx).await;
return Err(crate::error::Error::Timeout);
}
if let Some(response) = slot.try_receive_data_response() {
if let crate::at::AtResponse::Data { prefix, content: _content } = response {
if prefix.as_str() == "STATUS" {
}
}
continue;
}
match embassy_time::with_timeout(
embassy_time::Duration::from_millis(100),
slot.wait(timeout),
)
.await
{
Ok(Ok(crate::at::AtResponse::Ok)) => break,
Ok(Ok(crate::at::AtResponse::Error)) | Ok(Err(_)) => {
self.processor.release_multi_response_slot(slot_idx).await;
return Err(crate::error::Error::AtCommandFailed);
}
Err(_) => continue,
_ => continue,
}
}
self.processor.release_multi_response_slot(slot_idx).await;
Ok(status)
}
pub async fn get_socket_status(&self, id: SocketId) -> Result<SocketState> {
let socket = self.get_socket(id)?;
Ok(socket.get_state().await)
}
}
#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ConnectionStatus {
pub active_connections: u8,
pub wifi_connected: bool,
}