#![allow(unused_imports)]
#[cfg(doc)]
use crate::agent::Agent;
use crate::error::Error;
use core::fmt::Debug;
use dimas_com::traits::LivelinessSubscriber;
use dimas_com::traits::{
Communicator, CommunicatorMethods, Observer, Publisher, Querier, Responder,
};
use dimas_config::Config;
#[cfg(doc)]
use dimas_core::traits::Context;
use dimas_core::{
Result,
enums::{OperationState, TaskSignal},
message_types::{Message, QueryableMsg},
traits::{Capability, ContextAbstraction},
};
use dimas_time::Timer;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::Sender;
use tracing::{Level, info, instrument};
use zenoh::Session;
const INITIAL_SIZE: usize = 9;
#[derive(Debug, Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct ContextImpl<P>
where
P: Send + Sync + 'static,
{
uuid: String,
name: Option<String>,
prefix: Option<String>,
state: Arc<RwLock<OperationState>>,
sender: Sender<TaskSignal>,
props: Arc<RwLock<P>>,
communicator: Arc<dyn Communicator>,
timers: Arc<RwLock<HashMap<String, Timer<P>>>>,
}
impl<P> ContextAbstraction for ContextImpl<P>
where
P: Debug + Send + Sync + 'static,
{
type Props = P;
fn name(&self) -> Option<&String> {
self.name.as_ref()
}
fn fq_name(&self) -> Option<String> {
if self.name().is_some() && self.prefix().is_some() {
Some(format!(
"{}/{}",
self.prefix().expect("snh"),
self.name().expect("snh")
))
} else if self.name().is_some() {
Some(self.name().expect("snh").to_owned())
} else {
None
}
}
fn state(&self) -> OperationState {
self.state.read().expect("snh").clone()
}
fn uuid(&self) -> String {
self.uuid.clone()
}
fn prefix(&self) -> Option<&String> {
self.prefix.as_ref()
}
fn sender(&self) -> &Sender<TaskSignal> {
&self.sender
}
fn read(&self) -> Result<std::sync::RwLockReadGuard<'_, P>> {
self.props
.read()
.map_err(|_| Error::ReadAccess.into())
}
fn write(&self) -> Result<std::sync::RwLockWriteGuard<'_, P>> {
self.props
.write()
.map_err(|_| Error::WriteAccess.into())
}
fn set_state(&self, state: OperationState) -> Result<()> {
info!("changing state to {}", &state);
let final_state = state;
let mut next_state;
while self.state() < final_state {
match self.state() {
OperationState::Error => {
return Err(Error::ManageState.into());
}
OperationState::Created => {
next_state = OperationState::Configured;
}
OperationState::Configured => {
next_state = OperationState::Inactive;
}
OperationState::Inactive => {
next_state = OperationState::Standby;
}
OperationState::Standby => {
next_state = OperationState::Active;
}
OperationState::Active => {
return self.modify_state_property(OperationState::Error);
}
}
self.upgrade_registered_tasks(next_state)?;
}
while self.state() > final_state {
match self.state() {
OperationState::Active => {
next_state = OperationState::Standby;
}
OperationState::Standby => {
next_state = OperationState::Inactive;
}
OperationState::Inactive => {
next_state = OperationState::Configured;
}
OperationState::Configured => {
next_state = OperationState::Created;
}
OperationState::Created => {
return self.modify_state_property(OperationState::Error);
}
OperationState::Error => {
return Err(Error::ManageState.into());
}
}
self.downgrade_registered_tasks(next_state)?;
}
Ok(())
}
#[instrument(level = Level::ERROR, skip_all)]
fn put_with(&self, selector: &str, message: Message) -> Result<()> {
if self
.publishers()
.read()
.map_err(|_| Error::ReadContext("publishers".into()))?
.get(selector)
.is_some()
{
self.publishers()
.read()
.map_err(|_| Error::ReadContext("publishers".into()))?
.get(selector)
.ok_or_else(|| Error::Get("publishers".into()))?
.put(message)?;
} else {
self.communicator.put(selector, message)?;
}
Ok(())
}
#[instrument(level = Level::ERROR, skip_all)]
fn delete_with(&self, selector: &str) -> Result<()> {
if self
.publishers()
.read()
.map_err(|_| Error::ReadContext("publishers".into()))?
.get(selector)
.is_some()
{
self.publishers()
.read()
.map_err(|_| Error::ReadContext("publishers".into()))?
.get(selector)
.ok_or_else(|| Error::Get("publishers".into()))?
.delete()?;
} else {
todo!(); }
Ok(())
}
#[instrument(level = Level::ERROR, skip_all)]
fn get_with(
&self,
selector: &str,
message: Option<Message>,
callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
) -> Result<()> {
if self
.queriers()
.read()
.map_err(|_| Error::ReadContext("queries".into()))?
.get(selector)
.is_some()
{
self.queriers()
.read()
.map_err(|_| Error::ReadContext("queries".into()))?
.get(selector)
.ok_or_else(|| Error::Get("queries".into()))?
.get(message, callback)?;
} else {
self.communicator
.get(selector, message, callback)?;
}
Ok(())
}
#[instrument(level = Level::ERROR, skip_all)]
fn observe_with(&self, selector: &str, message: Option<Message>) -> Result<()> {
self.observers()
.read()
.map_err(|_| Error::ReadContext("observers".into()))?
.get(selector)
.ok_or_else(|| Error::Get("observers".into()))?
.request(message)?;
Ok(())
}
#[instrument(level = Level::ERROR, skip_all)]
fn cancel_observe_with(&self, selector: &str) -> Result<()> {
self.observers()
.read()
.map_err(|_| Error::ReadContext("observers".into()))?
.get(selector)
.ok_or_else(|| Error::Get("observers".into()))?
.cancel()?;
Ok(())
}
fn mode(&self) -> &String {
self.communicator.mode()
}
fn default_session(&self) -> Arc<Session> {
self.communicator.default_session()
}
fn session(&self, session_id: &str) -> Option<Arc<Session>> {
if session_id == "default" {
Some(self.communicator.default_session())
} else {
self.communicator.session(session_id)
}
}
}
impl<P> ContextImpl<P>
where
P: Send + Sync + 'static,
{
pub fn new(
config: &Config,
props: P,
name: Option<String>,
sender: Sender<TaskSignal>,
prefix: Option<String>,
) -> Result<Self> {
let communicator = dimas_com::communicator::from(config)?;
let uuid = communicator.uuid();
Ok(Self {
uuid,
name,
prefix,
state: Arc::new(RwLock::new(OperationState::Created)),
sender,
communicator,
props: Arc::new(RwLock::new(props)),
timers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
})
}
fn modify_state_property(&self, state: OperationState) -> Result<()> {
*(self
.state
.write()
.map_err(|_| Error::ModifyStruct("state".into()))?) = state;
Ok(())
}
#[must_use]
pub fn liveliness_subscribers(
&self,
) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
self.communicator.liveliness_subscribers()
}
#[must_use]
pub fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
self.communicator.observers()
}
#[must_use]
pub fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
self.communicator.publishers()
}
#[must_use]
pub fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
self.communicator.queriers()
}
#[must_use]
pub fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
self.communicator.responders()
}
#[must_use]
pub fn timers(&self) -> Arc<RwLock<HashMap<String, Timer<P>>>> {
self.timers.clone()
}
fn upgrade_registered_tasks(&self, new_state: OperationState) -> Result<()> {
self.communicator
.manage_operation_state(&new_state)?;
self.timers
.write()
.map_err(|_| Error::ModifyStruct("timers".into()))?
.iter_mut()
.for_each(|timer| {
let _ = timer.1.manage_operation_state(&new_state);
});
self.modify_state_property(new_state)?;
Ok(())
}
fn downgrade_registered_tasks(&self, new_state: OperationState) -> Result<()> {
self.timers
.write()
.map_err(|_| Error::ModifyStruct("timers".into()))?
.iter_mut()
.for_each(|timer| {
let _ = timer.1.manage_operation_state(&new_state);
});
self.communicator
.manage_operation_state(&new_state)?;
self.modify_state_property(new_state)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
const fn is_normal<T: Sized + Send + Sync>() {}
#[derive(Debug)]
struct Props {}
#[test]
const fn normal_types() {
is_normal::<ContextImpl<Props>>();
}
}