use futures_util::future::join3;
use log::trace;
use prost::Message;
use std::{fmt::Display, marker::PhantomData};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::UnboundedSender,
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use crate::{errors_internal::Error, protobufs, types::EncodedToRadioPacketWithHeader, utils};
use crate::{
packet::PacketReceiver,
utils_internal::{current_epoch_secs_u32, generate_rand_id},
};
use super::{
handlers,
wrappers::{
encoded_data::{EncodedMeshPacketData, EncodedToRadioPacket, IncomingStreamData},
mesh_channel::MeshChannel,
NodeId,
},
PacketDestination, PacketRouter,
};
pub mod state {
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
pub struct Connected;
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
pub struct Configured;
}
#[derive(Debug)]
pub struct StreamApi;
#[derive(Debug)]
pub struct ConnectedStreamApi<State = state::Configured> {
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,
read_handle: JoinHandle<Result<(), Error>>,
write_handle: JoinHandle<Result<(), Error>>,
processing_handle: JoinHandle<Result<(), Error>>,
heartbeat_handle: JoinHandle<Result<(), Error>>,
cancellation_token: CancellationToken,
typestate: PhantomData<State>,
}
pub struct StreamHandle<T: AsyncReadExt + AsyncWriteExt + Send> {
pub stream: T,
pub join_handle: Option<JoinHandle<Result<(), Error>>>,
}
impl<T: AsyncReadExt + AsyncWriteExt + Send> StreamHandle<T> {
pub fn from_stream(stream: T) -> Self {
Self {
stream,
join_handle: None,
}
}
}
impl<State> ConnectedStreamApi<State> {
#[allow(clippy::too_many_arguments)]
pub async fn send_mesh_packet<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
packet_data: EncodedMeshPacketData,
port_num: protobufs::PortNum,
destination: PacketDestination,
channel: MeshChannel,
want_ack: bool,
want_response: bool,
echo_response: bool,
reply_id: Option<u32>,
emoji: Option<u32>,
) -> Result<(), Error> {
let own_node_id = packet_router.source_node_id();
let packet_destination: NodeId = match destination {
PacketDestination::Local => own_node_id,
PacketDestination::Broadcast => u32::MAX.into(),
PacketDestination::Node(id) => id,
};
let mut mesh_packet = protobufs::MeshPacket {
payload_variant: Some(protobufs::mesh_packet::PayloadVariant::Decoded(
protobufs::Data {
portnum: port_num as i32,
payload: packet_data.data_vec(),
want_response,
reply_id: reply_id.unwrap_or(0),
emoji: emoji.unwrap_or(0),
..Default::default()
},
)),
from: own_node_id.id(),
to: packet_destination.id(),
id: generate_rand_id(),
want_ack,
channel: channel.channel(),
..Default::default()
};
mesh_packet.rx_time = current_epoch_secs_u32();
let payload_variant = Some(protobufs::to_radio::PayloadVariant::Packet(
mesh_packet.clone(),
));
self.send_to_radio_packet(payload_variant).await?;
if echo_response {
packet_router.handle_mesh_packet(mesh_packet).map_err(|e| {
Error::PacketHandlerFailure {
source: Box::new(e),
}
})?;
}
Ok(())
}
pub async fn send_to_radio_packet(
&mut self,
payload_variant: Option<protobufs::to_radio::PayloadVariant>,
) -> Result<(), Error> {
let packet = protobufs::ToRadio { payload_variant };
let mut packet_buf = vec![];
packet.encode(&mut packet_buf)?;
self.send_raw(packet_buf.into()).await
}
pub async fn send_raw(&mut self, data: EncodedToRadioPacket) -> Result<(), Error> {
let channel = self.write_input_tx.clone();
let data_with_header = utils::format_data_packet(data)?;
channel
.send(data_with_header)
.map_err(|e| Error::InternalChannelError(e.into()))?;
Ok(())
}
pub fn write_input_sender(&self) -> UnboundedSender<EncodedToRadioPacketWithHeader> {
self.write_input_tx.clone()
}
}
impl StreamApi {
#[allow(clippy::new_without_default)]
pub fn new() -> StreamApi {
StreamApi
}
pub async fn connect<S>(
self,
stream_handle: StreamHandle<S>,
) -> (PacketReceiver, ConnectedStreamApi<state::Connected>)
where
S: AsyncReadExt + AsyncWriteExt + Send + 'static,
{
let (write_input_tx, write_input_rx) =
tokio::sync::mpsc::unbounded_channel::<EncodedToRadioPacketWithHeader>();
let (read_output_tx, read_output_rx) =
tokio::sync::mpsc::unbounded_channel::<IncomingStreamData>();
let (decoded_packet_tx, decoded_packet_rx) =
tokio::sync::mpsc::unbounded_channel::<protobufs::FromRadio>();
let (read_stream, write_stream) = tokio::io::split(stream_handle.stream);
let cancellation_token = CancellationToken::new();
let read_handle =
handlers::spawn_read_handler(cancellation_token.clone(), read_stream, read_output_tx);
let write_handle =
handlers::spawn_write_handler(cancellation_token.clone(), write_stream, write_input_rx);
let processing_handle = handlers::spawn_processing_handler(
cancellation_token.clone(),
read_output_rx,
decoded_packet_tx,
);
let heartbeat_handle =
handlers::spawn_heartbeat_handler(cancellation_token.clone(), write_input_tx.clone());
let write_input_tx = write_input_tx;
let cancellation_token = cancellation_token;
(
decoded_packet_rx,
ConnectedStreamApi::<state::Connected> {
write_input_tx,
read_handle,
write_handle,
processing_handle,
heartbeat_handle,
cancellation_token,
typestate: PhantomData,
},
)
}
}
impl ConnectedStreamApi<state::Connected> {
pub async fn configure(
mut self,
config_id: u32,
) -> Result<ConnectedStreamApi<state::Configured>, Error> {
let to_radio = protobufs::ToRadio {
payload_variant: Some(protobufs::to_radio::PayloadVariant::WantConfigId(config_id)),
};
let packet_buf: EncodedToRadioPacket = to_radio.encode_to_vec().into();
self.send_raw(packet_buf).await?;
Ok(ConnectedStreamApi::<state::Configured> {
write_input_tx: self.write_input_tx,
read_handle: self.read_handle,
write_handle: self.write_handle,
processing_handle: self.processing_handle,
heartbeat_handle: self.heartbeat_handle,
cancellation_token: self.cancellation_token,
typestate: PhantomData,
})
}
}
impl ConnectedStreamApi<state::Configured> {
pub async fn disconnect(self) -> Result<StreamApi, Error> {
self.cancellation_token.cancel();
drop(self.write_input_tx);
let (read_result, write_result, processing_result) =
join3(self.read_handle, self.write_handle, self.processing_handle).await;
read_result??;
write_result??;
processing_result??;
trace!("Handlers fully disconnected");
Ok(StreamApi)
}
}
impl ConnectedStreamApi<state::Configured> {
pub async fn send_text<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
text: String,
destination: PacketDestination,
want_ack: bool,
channel: MeshChannel,
) -> Result<(), Error> {
let byte_data: EncodedMeshPacketData = text.into_bytes().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::TextMessageApp,
destination,
channel,
want_ack,
false,
true,
None,
None,
)
.await?;
Ok(())
}
pub async fn send_waypoint<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
waypoint: crate::protobufs::Waypoint,
destination: PacketDestination,
want_ack: bool,
channel: MeshChannel,
) -> Result<(), Error> {
let mut waypoint = waypoint;
if waypoint.id == 0 {
waypoint.id = generate_rand_id();
}
let byte_data: EncodedMeshPacketData = waypoint.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::WaypointApp,
destination,
channel,
want_ack,
false,
true,
None,
None,
)
.await?;
Ok(())
}
pub async fn send_position<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
position: crate::protobufs::Position,
destination: PacketDestination,
want_ack: bool,
channel: MeshChannel,
) -> Result<(), Error> {
let byte_data: EncodedMeshPacketData = position.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::PositionApp,
destination,
channel,
want_ack,
false,
true,
None,
None,
)
.await?;
Ok(())
}
pub async fn update_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
config: protobufs::Config,
) -> Result<(), Error> {
let config_packet = protobufs::AdminMessage {
payload_variant: Some(protobufs::admin_message::PayloadVariant::SetConfig(config)),
session_passkey: Vec::new(),
};
let byte_data: EncodedMeshPacketData = config_packet.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::AdminApp,
PacketDestination::Local,
MeshChannel::new(0)?,
true,
true,
false,
None,
None,
)
.await?;
Ok(())
}
pub async fn update_module_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
module_config: protobufs::ModuleConfig,
) -> Result<(), Error> {
let module_config_packet = protobufs::AdminMessage {
payload_variant: Some(protobufs::admin_message::PayloadVariant::SetModuleConfig(
module_config,
)),
session_passkey: Vec::new(),
};
let byte_data: EncodedMeshPacketData = module_config_packet.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::AdminApp,
PacketDestination::Local,
MeshChannel::new(0)?,
true,
true,
false,
None,
None,
)
.await?;
Ok(())
}
pub async fn update_channel_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
channel_config: protobufs::Channel,
) -> Result<(), Error> {
let channel_packet = protobufs::AdminMessage {
payload_variant: Some(protobufs::admin_message::PayloadVariant::SetChannel(
channel_config,
)),
session_passkey: Vec::new(),
};
let byte_data: EncodedMeshPacketData = channel_packet.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::AdminApp,
PacketDestination::Local,
MeshChannel::new(0)?,
true,
true,
false,
None,
None,
)
.await?;
Ok(())
}
pub async fn update_user<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
user: protobufs::User,
) -> Result<(), Error> {
let user_packet = protobufs::AdminMessage {
payload_variant: Some(protobufs::admin_message::PayloadVariant::SetOwner(user)),
session_passkey: Vec::new(),
};
let byte_data: EncodedMeshPacketData = user_packet.encode_to_vec().into();
self.send_mesh_packet(
packet_router,
byte_data,
protobufs::PortNum::AdminApp,
PacketDestination::Local,
MeshChannel::new(0)?,
true,
true,
false,
None,
None,
)
.await?;
Ok(())
}
pub async fn start_config_transaction(&mut self) -> Result<(), Error> {
let to_radio = protobufs::AdminMessage {
payload_variant: Some(protobufs::admin_message::PayloadVariant::BeginEditSettings(
true,
)),
session_passkey: Vec::new(),
};
let mut packet_buf = vec![];
to_radio.encode(&mut packet_buf)?;
self.send_raw(packet_buf.into()).await
}
pub async fn commit_config_transaction(&mut self) -> Result<(), Error> {
let to_radio = protobufs::AdminMessage {
payload_variant: Some(
protobufs::admin_message::PayloadVariant::CommitEditSettings(true),
),
session_passkey: Vec::new(),
};
let mut packet_buf = vec![];
to_radio.encode(&mut packet_buf)?;
self.send_raw(packet_buf.into()).await
}
pub async fn set_local_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
local_config: protobufs::LocalConfig,
) -> Result<(), Error> {
if let Some(c) = local_config.bluetooth {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Bluetooth(c)),
},
)
.await?;
}
if let Some(c) = local_config.device {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Device(c)),
},
)
.await?;
}
if let Some(c) = local_config.display {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Display(c)),
},
)
.await?;
}
if let Some(c) = local_config.lora {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Lora(c)),
},
)
.await?;
}
if let Some(c) = local_config.network {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Network(c)),
},
)
.await?;
}
if let Some(c) = local_config.position {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Position(c)),
},
)
.await?;
}
if let Some(c) = local_config.power {
self.update_config(
packet_router,
protobufs::Config {
payload_variant: Some(protobufs::config::PayloadVariant::Power(c)),
},
)
.await?;
}
Ok(())
}
pub async fn set_local_module_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
local_module_config: protobufs::LocalModuleConfig,
) -> Result<(), Error> {
if let Some(c) = local_module_config.audio {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::Audio(c)),
},
)
.await?;
}
if let Some(c) = local_module_config.canned_message {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::CannedMessage(
c,
)),
},
)
.await?;
}
if let Some(c) = local_module_config.external_notification {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(
protobufs::module_config::PayloadVariant::ExternalNotification(c),
),
},
)
.await?;
}
if let Some(c) = local_module_config.mqtt {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::Mqtt(c)),
},
)
.await?;
}
if let Some(c) = local_module_config.range_test {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::RangeTest(c)),
},
)
.await?;
}
if let Some(c) = local_module_config.remote_hardware {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(
protobufs::module_config::PayloadVariant::RemoteHardware(c),
),
},
)
.await?;
}
if let Some(c) = local_module_config.serial {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::Serial(c)),
},
)
.await?;
}
if let Some(c) = local_module_config.store_forward {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::StoreForward(
c,
)),
},
)
.await?;
}
if let Some(c) = local_module_config.telemetry {
self.update_module_config(
packet_router,
protobufs::ModuleConfig {
payload_variant: Some(protobufs::module_config::PayloadVariant::Telemetry(c)),
},
)
.await?;
}
Ok(())
}
pub async fn set_message_channel_config<
M,
E: Display + std::error::Error + Send + Sync + 'static,
R: PacketRouter<M, E>,
>(
&mut self,
packet_router: &mut R,
channel_config: Vec<protobufs::Channel>,
) -> Result<(), Error> {
for channel in channel_config {
self.update_channel_config(packet_router, channel).await?;
}
Ok(())
}
}