use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use crate::app::*;
use crate::decode::DecodeLevel;
use crate::link::EndpointAddress;
use crate::master::association::AssociationConfig;
use crate::master::error::{AssociationError, CommandError, PollError, TaskError, TimeSyncError};
use crate::master::messages::{AssociationMsg, AssociationMsgType, MasterMsg, Message};
use crate::master::poll::{PollHandle, PollMsg};
use crate::master::promise::Promise;
use crate::master::request::{CommandHeaders, CommandMode, ReadRequest, TimeSyncProcedure};
use crate::master::tasks::command::CommandTask;
use crate::master::tasks::deadbands::WriteDeadBandsTask;
use crate::master::tasks::empty_response::EmptyResponseTask;
use crate::master::tasks::file::authenticate::AuthFileTask;
use crate::master::tasks::file::close::CloseFileTask;
use crate::master::tasks::file::directory::DirectoryReader;
use crate::master::tasks::file::get_info::GetFileInfoTask;
use crate::master::tasks::file::open::{OpenFileRequest, OpenFileTask};
use crate::master::tasks::file::read::{FileReadTask, FileReaderType};
use crate::master::tasks::file::write_block::{WriteBlockRequest, WriteBlockTask};
use crate::master::tasks::read::SingleReadTask;
use crate::master::tasks::restart::{RestartTask, RestartType};
use crate::master::tasks::time::TimeSyncTask;
use crate::master::tasks::Task;
use crate::master::{
AuthKey, BlockNumber, DeadBandHeader, DirReadConfig, FileCredentials, FileError, FileHandle,
FileInfo, FileMode, FileReadConfig, FileReader, Headers, OpenFile, ReadHandler, WriteError,
};
use crate::transport::FragmentAddr;
use crate::util::channel::Sender;
use crate::util::phys::PhysAddr;
use crate::util::session::Enabled;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum MasterChannelType {
Udp,
Stream,
}
#[derive(Debug, Clone)]
pub struct MasterChannel {
channel_type: MasterChannelType,
sender: Sender<Message>,
}
#[derive(Clone, Debug)]
pub struct AssociationHandle {
address: EndpointAddress,
master: MasterChannel,
}
#[derive(Copy, Clone, Debug)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub struct MasterChannelConfig {
pub master_address: EndpointAddress,
#[cfg_attr(feature = "serialization", serde(default))]
pub decode_level: DecodeLevel,
#[cfg_attr(feature = "serialization", serde(default))]
pub tx_buffer_size: BufferSize<249, 2048>,
#[cfg_attr(feature = "serialization", serde(default))]
pub rx_buffer_size: BufferSize<2048, 2048>,
}
impl MasterChannelConfig {
pub fn new(master_address: EndpointAddress) -> Self {
Self {
master_address,
decode_level: DecodeLevel::nothing(),
tx_buffer_size: BufferSize::default(),
rx_buffer_size: BufferSize::default(),
}
}
}
impl MasterChannel {
pub(crate) fn new(sender: Sender<Message>, channel_type: MasterChannelType) -> Self {
Self {
sender,
channel_type,
}
}
pub async fn enable(&mut self) -> Result<(), Shutdown> {
self.send_master_message(MasterMsg::EnableCommunication(Enabled::Yes))
.await?;
Ok(())
}
pub async fn disable(&mut self) -> Result<(), Shutdown> {
self.send_master_message(MasterMsg::EnableCommunication(Enabled::No))
.await?;
Ok(())
}
pub async fn set_decode_level(&mut self, decode_level: DecodeLevel) -> Result<(), Shutdown> {
self.send_master_message(MasterMsg::SetDecodeLevel(decode_level))
.await?;
Ok(())
}
pub async fn get_decode_level(&mut self) -> Result<DecodeLevel, Shutdown> {
let (promise, rx) = Promise::one_shot();
self.send_master_message(MasterMsg::GetDecodeLevel(promise))
.await?;
rx.await?
}
fn assert_channel_type(&self, required: MasterChannelType) -> Result<(), AssociationError> {
if self.channel_type == required {
Ok(())
} else {
Err(AssociationError::WrongChannelType {
actual: self.channel_type,
required,
})
}
}
pub async fn add_association(
&mut self,
address: EndpointAddress,
config: AssociationConfig,
read_handler: Box<dyn ReadHandler>,
assoc_handler: Box<dyn AssociationHandler>,
assoc_information: Box<dyn AssociationInformation>,
) -> Result<AssociationHandle, AssociationError> {
self.assert_channel_type(MasterChannelType::Stream)?;
let (promise, rx) = Promise::one_shot();
let addr = FragmentAddr {
link: address,
phys: PhysAddr::None,
};
self.send_master_message(MasterMsg::AddAssociation(
addr,
config,
read_handler,
assoc_handler,
assoc_information,
promise,
))
.await?;
rx.await?
.map(|_| (AssociationHandle::new(address, self.clone())))
}
pub async fn add_udp_association(
&mut self,
address: EndpointAddress,
destination: SocketAddr,
config: AssociationConfig,
read_handler: Box<dyn ReadHandler>,
assoc_handler: Box<dyn AssociationHandler>,
assoc_information: Box<dyn AssociationInformation>,
) -> Result<AssociationHandle, AssociationError> {
self.assert_channel_type(MasterChannelType::Udp)?;
let (promise, rx) = Promise::one_shot();
let addr = FragmentAddr {
link: address,
phys: PhysAddr::Udp(destination),
};
self.send_master_message(MasterMsg::AddAssociation(
addr,
config,
read_handler,
assoc_handler,
assoc_information,
promise,
))
.await?;
rx.await?
.map(|_| (AssociationHandle::new(address, self.clone())))
}
pub async fn remove_association(&mut self, address: EndpointAddress) -> Result<(), Shutdown> {
self.send_master_message(MasterMsg::RemoveAssociation(address))
.await?;
Ok(())
}
async fn send_master_message(&mut self, msg: MasterMsg) -> Result<(), Shutdown> {
self.sender.send(Message::Master(msg)).await?;
Ok(())
}
async fn send_association_message(
&mut self,
address: EndpointAddress,
msg: AssociationMsgType,
) -> Result<(), Shutdown> {
self.sender
.send(Message::Association(AssociationMsg {
address,
details: msg,
}))
.await
}
}
impl AssociationHandle {
#[doc(hidden)]
#[cfg(feature = "ffi")]
pub fn create(address: EndpointAddress, master: MasterChannel) -> Self {
Self::new(address, master)
}
pub(crate) fn new(address: EndpointAddress, master: MasterChannel) -> Self {
Self { address, master }
}
pub fn address(&self) -> EndpointAddress {
self.address
}
pub async fn add_poll(
&mut self,
request: ReadRequest,
period: Duration,
) -> Result<PollHandle, PollError> {
let (promise, rx) = Promise::one_shot();
self.send_poll_message(PollMsg::AddPoll(self.clone(), request, period, promise))
.await?;
rx.await?
}
pub async fn remove(mut self) -> Result<(), Shutdown> {
self.master
.send_master_message(MasterMsg::RemoveAssociation(self.address))
.await?;
Ok(())
}
pub async fn read(&mut self, request: ReadRequest) -> Result<(), TaskError> {
let (promise, rx) = Promise::one_shot();
let task = SingleReadTask::new(request, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn send_and_expect_empty_response(
&mut self,
function: FunctionCode,
headers: Headers,
) -> Result<(), WriteError> {
let (promise, rx) = Promise::one_shot();
let task = EmptyResponseTask::new(function, headers, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn read_with_handler(
&mut self,
request: ReadRequest,
handler: Box<dyn ReadHandler>,
) -> Result<(), TaskError> {
let (promise, rx) = Promise::one_shot();
let task = SingleReadTask::new_with_custom_handler(request, handler, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn operate(
&mut self,
mode: CommandMode,
headers: CommandHeaders,
) -> Result<(), CommandError> {
let (promise, rx) = Promise::one_shot();
let task = CommandTask::from_mode(mode, headers, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn warm_restart(&mut self) -> Result<Duration, TaskError> {
self.restart(RestartType::WarmRestart).await
}
pub async fn cold_restart(&mut self) -> Result<Duration, TaskError> {
self.restart(RestartType::ColdRestart).await
}
async fn restart(&mut self, restart_type: RestartType) -> Result<Duration, TaskError> {
let (promise, rx) = Promise::one_shot();
let task = RestartTask::new(restart_type, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn synchronize_time(
&mut self,
procedure: TimeSyncProcedure,
) -> Result<(), TimeSyncError> {
let (promise, rx) = Promise::one_shot();
let task = TimeSyncTask::get_procedure(procedure, Some(promise));
self.send_task(task).await?;
rx.await?
}
pub async fn write_dead_bands(
&mut self,
headers: Vec<DeadBandHeader>,
) -> Result<(), WriteError> {
let (promise, rx) = Promise::one_shot();
let task = WriteDeadBandsTask::new(headers, promise);
self.send_task(task).await?;
rx.await?
}
pub async fn check_link_status(&mut self) -> Result<(), TaskError> {
let (promise, rx) = Promise::one_shot();
self.send_task(Task::LinkStatus(promise)).await?;
rx.await?
}
pub async fn get_file_auth_key(
&mut self,
credentials: FileCredentials,
) -> Result<AuthKey, FileError> {
let (promise, rx) = Promise::one_shot();
let task = AuthFileTask {
credentials,
promise,
};
self.send_task(task).await?;
rx.await?
}
pub async fn open_file<T: ToString>(
&mut self,
file_name: T,
auth_key: AuthKey,
permissions: Permissions,
file_size: u32,
file_mode: FileMode,
max_block_size: u16,
) -> Result<OpenFile, FileError> {
let (promise, rx) = Promise::one_shot();
let task = OpenFileTask {
request: OpenFileRequest {
file_name: file_name.to_string(),
auth_key,
file_size,
file_mode,
permissions,
max_block_size,
},
promise,
};
self.send_task(task).await?;
rx.await?
}
pub async fn write_file_block(
&mut self,
handle: FileHandle,
block_number: BlockNumber,
block_data: Vec<u8>,
) -> Result<(), FileError> {
let (promise, rx) = Promise::one_shot();
let task = WriteBlockTask {
request: WriteBlockRequest {
handle,
block_number,
block_data,
},
promise,
};
self.send_task(task).await?;
rx.await?
}
pub async fn close_file(&mut self, handle: FileHandle) -> Result<(), FileError> {
let (promise, rx) = Promise::one_shot();
let task = CloseFileTask { handle, promise };
self.send_task(task).await?;
rx.await?
}
pub async fn read_file<T: ToString>(
&mut self,
remote_file_path: T,
config: FileReadConfig,
reader: Box<dyn FileReader>,
credentials: Option<FileCredentials>,
) -> Result<(), Shutdown> {
let task = FileReadTask::start(
remote_file_path.to_string(),
config,
FileReaderType::from_reader(reader),
credentials,
);
self.send_task(task).await
}
pub async fn read_directory<T: ToString>(
&mut self,
dir_path: T,
config: DirReadConfig,
credentials: Option<FileCredentials>,
) -> Result<Vec<FileInfo>, FileError> {
let (promise, rx) = Promise::one_shot();
let reader = Box::new(DirectoryReader::new(promise));
self.read_file(dir_path, config.into(), reader, credentials)
.await?;
rx.await?
}
pub async fn get_file_info<T: ToString>(
&mut self,
file_path: T,
) -> Result<FileInfo, FileError> {
let (promise, reply) = Promise::one_shot();
let task = GetFileInfoTask::new(file_path.to_string(), promise);
self.send_task(task).await?;
reply.await?
}
async fn send_task<T: Into<Task>>(&mut self, task: T) -> Result<(), Shutdown> {
self.master
.send_association_message(self.address, AssociationMsgType::QueueTask(task.into()))
.await
}
pub(crate) async fn send_poll_message(&mut self, msg: PollMsg) -> Result<(), Shutdown> {
self.master
.send_association_message(self.address, AssociationMsgType::Poll(msg))
.await
}
}
#[cfg_attr(not(feature = "ffi"), non_exhaustive)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TaskType {
UserRead,
PeriodicPoll,
StartupIntegrity,
AutoEventScan,
Command,
ClearRestartBit,
EnableUnsolicited,
DisableUnsolicited,
TimeSync,
Restart,
WriteDeadBands,
GenericEmptyResponse(FunctionCode),
FileRead,
FileAuth,
FileOpen,
FileWriteBlock,
FileClose,
GetFileInfo,
}
pub trait AssociationHandler: Send + Sync {
fn get_current_time(&self) -> Option<Timestamp> {
Timestamp::try_from_system_time(SystemTime::now())
}
}
pub trait AssociationInformation: Send + Sync {
fn task_start(&mut self, _task_type: TaskType, _fc: FunctionCode, _seq: Sequence) {}
fn task_success(&mut self, _task_type: TaskType, _fc: FunctionCode, _seq: Sequence) {}
fn task_fail(&mut self, _task_type: TaskType, _error: TaskError) {}
fn unsolicited_response(&mut self, _is_duplicate: bool, _seq: Sequence) {}
}