use super::adapter::{AdapterEvent, ConstrainedEngineAdapter, EngineOutput};
use super::engine::EngineConfig;
use super::types::{ConnectionId, ConstrainedError};
use crate::transport::{TransportAddr, TransportCapabilities};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct ConstrainedTransportConfig {
pub engine_config: EngineConfig,
pub outbound_buffer_size: usize,
pub event_buffer_size: usize,
}
impl Default for ConstrainedTransportConfig {
fn default() -> Self {
Self {
engine_config: EngineConfig::default(),
outbound_buffer_size: 64,
event_buffer_size: 32,
}
}
}
impl ConstrainedTransportConfig {
pub fn for_ble() -> Self {
Self {
engine_config: EngineConfig::for_ble(),
outbound_buffer_size: 32,
event_buffer_size: 16,
}
}
pub fn for_lora() -> Self {
Self {
engine_config: EngineConfig::for_lora(),
outbound_buffer_size: 8,
event_buffer_size: 8,
}
}
}
#[derive(Clone, Debug)]
pub struct ConstrainedHandle {
adapter: Arc<Mutex<ConstrainedEngineAdapter>>,
outbound_tx: mpsc::Sender<EngineOutput>,
}
impl ConstrainedHandle {
pub fn connect(&self, remote: &TransportAddr) -> Result<ConnectionId, ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
let (conn_id, outputs) = adapter.connect(remote)?;
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
Ok(conn_id)
}
pub fn send(&self, connection_id: ConnectionId, data: &[u8]) -> Result<(), ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
let outputs = adapter.send(connection_id, data)?;
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
Ok(())
}
pub fn recv(&self, connection_id: ConnectionId) -> Result<Option<Vec<u8>>, ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
Ok(adapter.recv(connection_id))
}
pub fn close(&self, connection_id: ConnectionId) -> Result<(), ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
let outputs = adapter.close(connection_id)?;
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
Ok(())
}
pub fn connection_count(&self) -> usize {
self.adapter
.lock()
.map(|a| a.connection_count())
.unwrap_or(0)
}
pub fn process_incoming(
&self,
source: &TransportAddr,
data: &[u8],
) -> Result<(), ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
let outputs = adapter.process_incoming(source, data)?;
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
Ok(())
}
pub fn poll(&self) -> Vec<EngineOutput> {
let mut adapter = match self.adapter.lock() {
Ok(a) => a,
Err(_) => return Vec::new(),
};
adapter.poll()
}
pub fn next_event(&self) -> Option<AdapterEvent> {
self.adapter.lock().ok().and_then(|mut a| a.next_event())
}
pub fn connection_state(
&self,
connection_id: ConnectionId,
) -> Option<crate::constrained::ConnectionState> {
self.adapter
.lock()
.ok()
.and_then(|a| a.connection_state(connection_id))
}
pub fn active_connections(&self) -> Vec<ConnectionId> {
self.adapter
.lock()
.ok()
.map(|a| a.active_connections())
.unwrap_or_default()
}
}
pub struct ConstrainedTransport {
adapter: Arc<Mutex<ConstrainedEngineAdapter>>,
outbound_tx: mpsc::Sender<EngineOutput>,
outbound_rx: mpsc::Receiver<EngineOutput>,
config: ConstrainedTransportConfig,
}
impl ConstrainedTransport {
pub fn new(config: ConstrainedTransportConfig) -> Self {
let (outbound_tx, outbound_rx) = mpsc::channel(config.outbound_buffer_size);
let adapter = ConstrainedEngineAdapter::new(config.engine_config.clone());
Self {
adapter: Arc::new(Mutex::new(adapter)),
outbound_tx,
outbound_rx,
config,
}
}
pub fn for_ble() -> Self {
Self::new(ConstrainedTransportConfig::for_ble())
}
pub fn for_lora() -> Self {
Self::new(ConstrainedTransportConfig::for_lora())
}
pub fn handle(&self) -> ConstrainedHandle {
ConstrainedHandle {
adapter: Arc::clone(&self.adapter),
outbound_tx: self.outbound_tx.clone(),
}
}
pub fn take_outbound_rx(&mut self) -> mpsc::Receiver<EngineOutput> {
let (new_tx, new_rx) = mpsc::channel(self.config.outbound_buffer_size);
let _ = std::mem::replace(&mut self.outbound_tx, new_tx);
std::mem::replace(&mut self.outbound_rx, new_rx)
}
pub fn process_incoming(
&self,
source: &TransportAddr,
data: &[u8],
) -> Result<(), ConstrainedError> {
let mut adapter = self
.adapter
.lock()
.map_err(|_| ConstrainedError::Transport("adapter lock poisoned".into()))?;
let outputs = adapter.process_incoming(source, data)?;
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
Ok(())
}
pub fn poll(&self) {
if let Ok(mut adapter) = self.adapter.lock() {
let outputs = adapter.poll();
for output in outputs {
let _ = self.outbound_tx.try_send(output);
}
}
}
pub fn should_use_constrained(capabilities: &TransportCapabilities) -> bool {
!capabilities.supports_full_quic()
}
}
impl std::fmt::Debug for ConstrainedTransport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConstrainedTransport")
.field("config", &self.config)
.field(
"connection_count",
&self
.adapter
.lock()
.map(|a| a.connection_count())
.unwrap_or(0),
)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_constrained_transport_creation() {
let transport = ConstrainedTransport::for_ble();
let handle = transport.handle();
assert_eq!(handle.connection_count(), 0);
}
#[test]
fn test_constrained_handle_connect() {
let transport = ConstrainedTransport::for_ble();
let handle = transport.handle();
let addr = TransportAddr::Ble {
device_id: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF],
service_uuid: None,
};
let result = handle.connect(&addr);
assert!(result.is_ok());
assert_eq!(handle.connection_count(), 1);
}
#[test]
fn test_constrained_config_presets() {
let ble_config = ConstrainedTransportConfig::for_ble();
assert_eq!(ble_config.outbound_buffer_size, 32);
let lora_config = ConstrainedTransportConfig::for_lora();
assert_eq!(lora_config.outbound_buffer_size, 8);
}
#[test]
fn test_should_use_constrained() {
use crate::transport::TransportCapabilities;
let ble_caps = TransportCapabilities::ble();
assert!(ConstrainedTransport::should_use_constrained(&ble_caps));
let lora_caps = TransportCapabilities::lora_long_range();
assert!(ConstrainedTransport::should_use_constrained(&lora_caps));
let broadband_caps = TransportCapabilities::broadband();
assert!(!ConstrainedTransport::should_use_constrained(
&broadband_caps
));
}
#[tokio::test]
async fn test_handle_clone() {
let transport = ConstrainedTransport::for_ble();
let handle1 = transport.handle();
let handle2 = transport.handle();
let addr = TransportAddr::Ble {
device_id: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66],
service_uuid: None,
};
let _ = handle1.connect(&addr);
assert_eq!(handle1.connection_count(), 1);
assert_eq!(handle2.connection_count(), 1);
}
}