#[doc(hidden)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use alloc::sync::Arc;
use alloc::{
boxed::Box,
collections::BTreeSet,
string::{String, ToString},
};
use dimas_core::{
Result,
enums::{OperationState, TaskSignal},
traits::{Capability, Context},
};
use futures::future::BoxFuture;
#[cfg(feature = "std")]
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::info;
use tracing::{Level, error, instrument, warn};
use zenoh::Session;
use zenoh::sample::SampleKind;
#[allow(clippy::module_name_repetitions)]
pub type LivelinessCallback<P> =
Box<dyn FnMut(Context<P>, String) -> BoxFuture<'static, Result<()>> + Send + Sync>;
pub type ArcLivelinessCallback<P> = Arc<Mutex<LivelinessCallback<P>>>;
#[allow(clippy::module_name_repetitions)]
pub struct LivelinessSubscriber<P>
where
P: Send + Sync + 'static,
{
session: Arc<Session>,
token: String,
context: Context<P>,
activation_state: OperationState,
put_callback: ArcLivelinessCallback<P>,
delete_callback: Option<ArcLivelinessCallback<P>>,
handle: std::sync::Mutex<Option<JoinHandle<()>>>,
known_agents: Arc<Mutex<BTreeSet<String>>>,
}
impl<P> core::fmt::Debug for LivelinessSubscriber<P>
where
P: Send + Sync + 'static,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("LivelinessSubscriber")
.finish_non_exhaustive()
}
}
impl<P> crate::traits::LivelinessSubscriber for LivelinessSubscriber<P>
where
P: Send + Sync + 'static,
{
fn token(&self) -> &String {
&self.token
}
}
impl<P> Capability for LivelinessSubscriber<P>
where
P: Send + Sync + 'static,
{
fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
if state >= &self.activation_state {
self.start()
} else if state < &self.activation_state {
self.stop()
} else {
Ok(())
}
}
}
impl<P> LivelinessSubscriber<P>
where
P: Send + Sync + 'static,
{
pub fn new(
session: Arc<Session>,
token: String,
context: Context<P>,
activation_state: OperationState,
put_callback: ArcLivelinessCallback<P>,
delete_callback: Option<ArcLivelinessCallback<P>>,
) -> Self {
Self {
session,
token,
context,
activation_state,
put_callback,
delete_callback,
handle: std::sync::Mutex::new(None),
#[cfg(feature = "std")]
known_agents: Arc::new(Mutex::new(BTreeSet::new())),
}
}
#[instrument(level = Level::TRACE, skip_all)]
fn start(&self) -> Result<()> {
self.stop()?;
let key = self.token.clone();
let known_agents = self.known_agents.clone();
let session2 = self.session.clone();
let token2 = self.token.clone();
let p_cb2 = self.put_callback.clone();
let d_cb = self.delete_callback.clone();
let ctx = self.context.clone();
let ctx2 = self.context.clone();
self.handle.lock().map_or_else(
|_| todo!(),
|mut handle| {
handle.replace(tokio::task::spawn(async move {
std::panic::set_hook(Box::new(move |reason| {
error!("liveliness subscriber panic: {}", reason);
if let Err(reason) = ctx
.sender()
.blocking_send(TaskSignal::RestartLiveliness(key.clone()))
{
error!("could not restart liveliness subscriber: {}", reason);
} else {
info!("restarting liveliness subscriber!");
}
}));
if let Err(error) =
run_liveliness(session2, token2, p_cb2, d_cb, ctx2, known_agents).await
{
error!("running liveliness subscriber failed with {error}");
}
}));
Ok(())
},
)
}
#[instrument(level = Level::TRACE)]
fn stop(&self) -> Result<()> {
self.handle.lock().map_or_else(
|_| todo!(),
|mut handle| {
handle.take();
Ok(())
},
)
}
}
#[instrument(name="liveliness", level = Level::ERROR, skip_all)]
async fn run_liveliness<P>(
session: Arc<Session>,
token: String,
p_cb: ArcLivelinessCallback<P>,
d_cb: Option<ArcLivelinessCallback<P>>,
ctx: Context<P>,
known_agents: Arc<Mutex<BTreeSet<String>>>,
) -> Result<()> {
let subscriber = session
.liveliness()
.declare_subscriber(&token)
.history(true)
.await?;
while let Ok(sample) = subscriber.recv_async().await {
let id = sample.key_expr().split('/').next_back().unwrap_or("");
if id == ctx.uuid() {
continue;
}
let mut guard = known_agents.lock().await;
match sample.kind() {
SampleKind::Put => {
if guard.get(id).is_none() {
guard.insert(id.into());
drop(guard);
let ctx = ctx.clone();
let mut lock = p_cb.lock().await;
if let Err(error) = lock(ctx, id.to_string()).await {
error!("liveliness put callback failed with {error}");
}
}
}
SampleKind::Delete => {
if guard.get(id).is_some() {
guard.remove(id);
drop(guard);
if let Some(cb) = d_cb.clone() {
let ctx = ctx.clone();
let mut lock = cb.lock().await;
if let Err(err) = lock(ctx, id.to_string()).await {
error!("liveliness delete callback failed with {err}");
}
}
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug)]
struct Props {}
const fn is_normal<T: Sized + Send + Sync>() {}
#[test]
const fn normal_types() {
is_normal::<LivelinessSubscriber<Props>>();
}
}