use crate::context::ContextImpl;
use crate::error::Error;
use chrono::Local;
use core::{fmt::Debug, time::Duration};
use dimas_com::builder::LivelinessSubscriberBuilder;
use dimas_com::builder::{
ObservableBuilder, ObserverBuilder, PublisherBuilder, QuerierBuilder, QueryableBuilder,
SubscriberBuilder,
};
use dimas_com::traits::LivelinessSubscriber;
use dimas_com::traits::{Observer, Publisher, Querier, Responder};
use dimas_commands::messages::{AboutEntity, PingEntity};
use dimas_config::Config;
use dimas_core::{
Result,
builder_states::{NoCallback, NoInterval, NoSelector, Storage},
enums::{OperationState, Signal, TaskSignal},
message_types::{Message, QueryMsg},
traits::{Capability, Context, ContextAbstraction},
};
use dimas_time::{Timer, TimerBuilder};
use std::sync::Arc;
use std::sync::RwLock;
use tokio::{select, signal, sync::mpsc};
use tracing::{error, info, warn};
use zenoh::liveliness::LivelinessToken;
async fn callback_dispatcher<P>(ctx: Context<P>, request: QueryMsg) -> Result<()>
where
P: Send + Sync + 'static,
{
if let Some(value) = request.payload() {
let content: Vec<u8> = value.to_bytes().into_owned();
let msg = Message::new(content);
let signal: Signal = Message::decode(msg)?;
match signal {
Signal::About => about_handler(ctx, request)?,
Signal::Ping { sent } => ping_handler(ctx, request, sent)?,
Signal::Shutdown => shutdown_handler(ctx, request)?,
Signal::State { state } => state_handler(ctx, request, state)?,
}
}
Ok(())
}
fn about_handler<P>(ctx: Context<P>, request: QueryMsg) -> Result<()>
where
P: Send + Sync + 'static,
{
let name = ctx
.fq_name()
.unwrap_or_else(|| String::from("--"));
let mode = ctx.mode().clone();
let zid = ctx.uuid();
let state = ctx.state();
let value = AboutEntity::new(name, mode, zid, state);
drop(ctx);
request.reply(value)?;
Ok(())
}
fn ping_handler<P>(ctx: Context<P>, request: QueryMsg, sent: i64) -> Result<()>
where
P: Send + Sync + 'static,
{
let now = Local::now()
.naive_utc()
.and_utc()
.timestamp_nanos_opt()
.unwrap_or(0);
let name = ctx
.fq_name()
.unwrap_or_else(|| String::from("--"));
let zid = ctx.uuid();
let value = PingEntity::new(name, zid, now - sent);
drop(ctx);
request.reply(value)?;
Ok(())
}
fn shutdown_handler<P>(ctx: Context<P>, request: QueryMsg) -> Result<()>
where
P: Send + Sync + 'static,
{
let name = ctx
.fq_name()
.unwrap_or_else(|| String::from("--"));
let mode = ctx.mode().clone();
let zid = ctx.uuid();
let state = ctx.state();
let value = AboutEntity::new(name, mode, zid, state);
request.reply(value)?;
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
let _ = ctx.set_state(OperationState::Standby);
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = ctx.set_state(OperationState::Created);
let _ = ctx.sender().send(TaskSignal::Shutdown).await;
});
Ok(())
}
fn state_handler<P>(ctx: Context<P>, request: QueryMsg, state: Option<OperationState>) -> Result<()>
where
P: Send + Sync + 'static,
{
if let Some(value) = state {
let _ = ctx.set_state(value);
}
let name = ctx
.fq_name()
.unwrap_or_else(|| String::from("--"));
let mode = ctx.mode().clone();
let zid = ctx.uuid();
let state = ctx.state();
let value = AboutEntity::new(name, mode, zid, state);
drop(ctx);
request.reply(value)?;
Ok(())
}
#[allow(clippy::module_name_repetitions)]
#[derive(Debug)]
pub struct UnconfiguredAgent<P>
where
P: Debug + Send + Sync + 'static,
{
name: Option<String>,
prefix: Option<String>,
props: P,
}
impl<P> UnconfiguredAgent<P>
where
P: Debug + Send + Sync + 'static,
{
const fn new(properties: P) -> Self {
Self {
name: None,
prefix: None,
props: properties,
}
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
#[must_use]
pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
pub fn config(self, config: &Config) -> Result<Agent<P>> {
let (tx, rx) = mpsc::channel(32);
let context: Arc<ContextImpl<P>> = Arc::new(ContextImpl::new(
config,
self.props,
self.name,
tx,
self.prefix,
)?);
let agent = Agent {
rx,
context,
liveliness: false,
liveliness_token: RwLock::new(None),
};
let selector = format!("{}/signal", agent.context.uuid());
agent
.queryable()
.selector(&selector)
.callback(callback_dispatcher)
.activation_state(OperationState::Created)
.add()?;
if let Some(fq_name) = agent.context.fq_name() {
let selector = format!("{fq_name}/*");
agent
.queryable()
.selector(&selector)
.callback(callback_dispatcher)
.activation_state(OperationState::Created)
.add()?;
}
agent.context.set_state(OperationState::Created)?;
Ok(agent)
}
}
pub struct Agent<P>
where
P: Debug + Send + Sync + 'static,
{
rx: mpsc::Receiver<TaskSignal>,
context: Arc<ContextImpl<P>>,
liveliness: bool,
liveliness_token: RwLock<Option<LivelinessToken>>,
}
impl<P> Debug for Agent<P>
where
P: Debug + Send + Sync + 'static,
{
#[allow(clippy::or_fun_call)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Agent")
.field("id", &self.context.uuid())
.field(
"prefix",
self.context
.prefix()
.unwrap_or(&"None".to_string()),
)
.field("name", &self.context.name())
.finish_non_exhaustive()
}
}
impl<P> Agent<P>
where
P: Debug + Send + Sync + 'static,
{
#[allow(clippy::new_ret_no_self)]
pub const fn new(properties: P) -> UnconfiguredAgent<P> {
UnconfiguredAgent::new(properties)
}
pub const fn liveliness(&mut self, activate: bool) {
self.liveliness = activate;
}
#[must_use]
pub fn liveliness_subscriber(
&self,
) -> LivelinessSubscriberBuilder<P, NoCallback, Storage<Box<dyn LivelinessSubscriber>>> {
LivelinessSubscriberBuilder::new("default", self.context.clone())
.storage(self.context.liveliness_subscribers())
}
#[must_use]
pub fn liveliness_subscriber_for(
&self,
session_id: impl Into<String>,
) -> LivelinessSubscriberBuilder<P, NoCallback, Storage<Box<dyn LivelinessSubscriber>>> {
LivelinessSubscriberBuilder::new(session_id, self.context.clone())
.storage(self.context.liveliness_subscribers())
}
#[must_use]
pub fn observable(
&self,
) -> ObservableBuilder<
P,
NoSelector,
NoCallback,
NoCallback,
NoCallback,
Storage<Box<dyn Responder>>,
> {
ObservableBuilder::new("default", self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn observable_for(
&self,
session_id: impl Into<String>,
) -> ObservableBuilder<
P,
NoSelector,
NoCallback,
NoCallback,
NoCallback,
Storage<Box<dyn Responder>>,
> {
ObservableBuilder::new(session_id, self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn observer(
&self,
) -> ObserverBuilder<P, NoSelector, NoCallback, NoCallback, Storage<Box<dyn Observer>>> {
ObserverBuilder::new("default", self.context.clone()).storage(self.context.observers())
}
#[must_use]
pub fn observer_for(
&self,
session_id: impl Into<String>,
) -> ObserverBuilder<P, NoSelector, NoCallback, NoCallback, Storage<Box<dyn Observer>>> {
ObserverBuilder::new(session_id, self.context.clone()).storage(self.context.observers())
}
#[must_use]
pub fn publisher(&self) -> PublisherBuilder<P, NoSelector, Storage<Box<dyn Publisher>>> {
PublisherBuilder::new("default", self.context.clone()).storage(self.context.publishers())
}
#[must_use]
pub fn publisher_for(
&self,
session_id: impl Into<String>,
) -> PublisherBuilder<P, NoSelector, Storage<Box<dyn Publisher>>> {
PublisherBuilder::new(session_id, self.context.clone()).storage(self.context.publishers())
}
#[must_use]
pub fn querier(&self) -> QuerierBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Querier>>> {
QuerierBuilder::new("default", self.context.clone()).storage(self.context.queriers())
}
#[must_use]
pub fn querier_for(
&self,
session_id: impl Into<String>,
) -> QuerierBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Querier>>> {
QuerierBuilder::new(session_id, self.context.clone()).storage(self.context.queriers())
}
#[must_use]
pub fn queryable(
&self,
) -> QueryableBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Responder>>> {
QueryableBuilder::new("default", self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn queryable_for(
&self,
session_id: impl Into<String>,
) -> QueryableBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Responder>>> {
QueryableBuilder::new(session_id, self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn subscriber(
&self,
) -> SubscriberBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Responder>>> {
SubscriberBuilder::new("default", self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn subscriber_for(
&self,
session_id: impl Into<String>,
) -> SubscriberBuilder<P, NoSelector, NoCallback, Storage<Box<dyn Responder>>> {
SubscriberBuilder::new(session_id, self.context.clone()).storage(self.context.responders())
}
#[must_use]
pub fn timer(&self) -> TimerBuilder<P, NoSelector, NoInterval, NoCallback, Storage<Timer<P>>> {
TimerBuilder::new(self.context.clone()).storage(self.context.timers())
}
#[tracing::instrument(skip_all)]
pub async fn start(self) -> Result<Self> {
if self.liveliness {
let session = self.context.session("default");
let token_str = self.context.prefix().map_or_else(
|| self.context.uuid(),
|prefix| format!("{}/{}", prefix, self.context.uuid()),
);
let token = session
.expect("snh")
.liveliness()
.declare_token(&token_str)
.await
.map_err(|source| Error::ActivateLiveliness { source })?;
self.liveliness_token
.write()
.map_err(|_| Error::ModifyStruct("liveliness".into()))?
.replace(token);
}
self.context.set_state(OperationState::Active)?;
RunningAgent {
rx: self.rx,
context: self.context,
liveliness: self.liveliness,
liveliness_token: self.liveliness_token,
}
.run()
.await
}
}
#[allow(clippy::module_name_repetitions)]
pub struct RunningAgent<P>
where
P: Debug + Send + Sync + 'static,
{
rx: mpsc::Receiver<TaskSignal>,
context: Arc<ContextImpl<P>>,
liveliness: bool,
liveliness_token: RwLock<Option<LivelinessToken>>,
}
impl<P> RunningAgent<P>
where
P: Debug + Send + Sync + 'static,
{
async fn run(mut self) -> Result<Agent<P>> {
loop {
select! {
Some(signal) = self.rx.recv() => {
match signal {
TaskSignal::RestartLiveliness(selector) => {
self.context.liveliness_subscribers()
.write()
.map_err(|_| Error::WriteAccess)?
.get_mut(&selector)
.ok_or_else(|| Error::GetMut("liveliness".into()))?
.manage_operation_state(&self.context.state())?;
},
TaskSignal::RestartQueryable(selector) => {
self.context.responders()
.write()
.map_err(|_| Error::WriteAccess)?
.get_mut(&selector)
.ok_or_else(|| Error::GetMut("queryables".into()))?
.manage_operation_state(&self.context.state())?;
},
TaskSignal::RestartObservable(selector) => {
self.context.responders()
.write()
.map_err(|_| Error::WriteAccess)?
.get_mut(&selector)
.ok_or_else(|| Error::GetMut("observables".into()))?
.manage_operation_state(&self.context.state())?;
},
TaskSignal::RestartSubscriber(selector) => {
self.context.responders()
.write()
.map_err(|_| Error::WriteAccess)?
.get_mut(&selector)
.ok_or_else(|| Error::GetMut("subscribers".into()))?
.manage_operation_state(&self.context.state())?;
},
TaskSignal::RestartTimer(selector) => {
self.context.timers()
.write()
.map_err(|_| Error::WriteAccess)?
.get_mut(&selector)
.ok_or_else(|| Error::GetMut("timers".into()))?
.manage_operation_state(&self.context.state())?;
},
TaskSignal::Shutdown => {
return self.stop();
}
}
}
signal = signal::ctrl_c() => {
match signal {
Ok(()) => {
info!("shutdown due to 'ctrl-c'");
return self.stop();
}
Err(err) => {
error!("Unable to listen for 'Ctrl-C': {err}");
return self.stop();
}
}
}
}
}
}
#[tracing::instrument(skip_all)]
pub fn stop(self) -> Result<Agent<P>> {
self.context.set_state(OperationState::Created)?;
if self.liveliness {
self.liveliness_token
.write()
.map_err(|_| Error::ModifyStruct("liveliness".into()))?
.take();
}
let r = Agent {
rx: self.rx,
context: self.context,
liveliness: self.liveliness,
liveliness_token: self.liveliness_token,
};
Ok(r)
}
}
#[cfg(test)]
mod tests {
use super::*;
const fn is_normal<T: Sized + Send + Sync>() {}
#[derive(Debug)]
struct Props {}
#[test]
const fn normal_types() {
is_normal::<UnconfiguredAgent<Props>>();
is_normal::<Agent<Props>>();
is_normal::<RunningAgent<Props>>();
is_normal::<TaskSignal>();
}
}