use super::*;
use bevy_ecs::prelude::{In, Res, World};
use futures_lite::future::race;
use std::time::Duration;
use thiserror::Error as ThisError;
use tokio::sync::mpsc::unbounded_channel;
use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "snake_case")]
pub struct ZenohSubscriptionConfig {
pub key: Arc<str>,
pub decoder: ZenohEncodingConfig,
#[serde(
default,
skip_serializing_if = "ZenohSubscriptionHistoryConfig::is_default"
)]
pub history: ZenohSubscriptionHistoryConfig,
#[serde(default, skip_serializing_if = "is_default")]
pub recovery: ZenohSubscriptionRecoveryConfig,
#[serde(default, skip_serializing_if = "is_default")]
pub locality: ZenohLocalityConfig,
}
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "snake_case")]
pub struct ZenohSubscriptionHistoryConfig {
#[serde(default = "default_as_true", skip_serializing_if = "is_true")]
pub detect_late_publishers: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_samples: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_age: Option<Duration>,
}
impl ZenohSubscriptionHistoryConfig {
pub fn is_default(&self) -> bool {
!self.detect_late_publishers && self.max_samples.is_none() && self.max_age.is_none()
}
}
impl From<ZenohSubscriptionHistoryConfig> for HistoryConfig {
fn from(value: ZenohSubscriptionHistoryConfig) -> Self {
let mut config = HistoryConfig::default();
if value.detect_late_publishers {
config = config.detect_late_publishers();
}
if let Some(depth) = value.max_samples {
config = config.max_samples(depth);
}
if let Some(duration) = value.max_age {
config = config.max_age(duration.as_secs_f64());
}
config
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ZenohSubscriptionRecoveryConfig {
None,
PeriodicQueries(Duration),
Heartbeat,
}
impl Default for ZenohSubscriptionRecoveryConfig {
fn default() -> Self {
ZenohSubscriptionRecoveryConfig::Heartbeat
}
}
#[derive(ThisError, Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ZenohSubscriptionError {
#[error("the subscription was already active")]
AlreadyActive,
#[error("the active buffer tracker was despawned")]
BufferDespawned,
#[error("the zenoh session was removed from its resource")]
SessionRemoved,
#[error("{}", .0)]
ZenohError(#[from] ArcError),
}
impl DiagramElementRegistry {
pub(super) fn register_zenoh_subscription(&mut self, ensure_session: EnsureZenohSession) {
self.register_node_builder_fallible(
NodeBuilderOptions::new("zenoh_subscription")
.with_default_display_text("Zenoh Subscription"),
move |builder, config: ZenohSubscriptionConfig| {
builder.commands().queue(ensure_session.clone());
let active_buffer = builder.create_buffer::<()>(BufferSettings::default());
let access = builder.create_buffer_access::<(), _>(active_buffer);
let decoder: Codec = (&config.decoder).try_into()?;
let callback = move |In(input): AsyncCallbackInput<
((), BufferKey<()>),
ZenohNodeStreams,
>,
mut active: BufferAccessMut<()>,
session: Res<ZenohSession>| {
let already_active = active
.get_mut(&input.request.1)
.map_err(|_| ZenohSubscriptionError::BufferDespawned)
.and_then(|mut active_buffer| {
if active_buffer.is_empty() {
active_buffer.push(());
return Ok(());
} else {
return Err(ZenohSubscriptionError::AlreadyActive);
}
});
let session = session.outcome.clone();
let (sender, mut receiver) = unbounded_channel();
input.streams.canceller.send(sender);
let config = config.clone();
let decoder = decoder.clone();
async move {
already_active?;
let cancel = receiver.recv();
let active_key = input.request.1;
let r = async move {
let subscribing = async move {
let session = session
.await
.map_err(|_| ZenohSubscriptionError::SessionRemoved)?
.map_err(ZenohSubscriptionError::ZenohError)?;
let subscription_builder = session
.declare_subscriber(config.key.as_ref())
.allowed_origin(config.locality.into())
.history(config.history.into());
let subscription = match config.recovery {
ZenohSubscriptionRecoveryConfig::None => subscription_builder,
ZenohSubscriptionRecoveryConfig::PeriodicQueries(period) => {
subscription_builder.recovery(
RecoveryConfig::default().periodic_queries(period),
)
}
ZenohSubscriptionRecoveryConfig::Heartbeat => {
subscription_builder
.recovery(RecoveryConfig::default().heartbeat())
}
}
.await
.map_err(ArcError::new)?;
loop {
let next_sample = match subscription.recv_async().await {
Ok(sample) => sample,
Err(err) => {
input.streams.out_error.send(format!("{err}"));
continue;
}
};
match decoder.decode(&next_sample) {
Ok(msg) => {
input.streams.out.send(msg);
}
Err(msg) => {
input.streams.out_error.send(msg);
}
}
}
};
race(
subscribing,
receive_cancel::<ZenohSubscriptionError>(cancel),
)
.await
}
.await;
let _ = input
.channel
.commands(move |commands| {
commands.queue(move |world: &mut World| {
let _ = world.buffer_mut(&active_key, |mut active_buffer| {
active_buffer.drain(..);
});
});
})
.await;
r
}
};
let node = builder.create_node(callback.as_callback());
builder.connect(access.output, node.input);
Ok(Node::<_, _, ZenohNodeStreams> {
input: access.input,
output: node.output,
streams: node.streams,
})
},
)
.with_result();
}
}