#[doc(hidden)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use crate::error::Error;
use crate::{
traits::Responder,
zenoh::observable::{
ArcControlCallback, ArcExecutionCallback, ArcFeedbackCallback, ControlCallback,
ExecutionCallback, FeedbackCallback, Observable,
},
};
use alloc::{
boxed::Box,
string::{String, ToString},
sync::Arc,
};
use core::time::Duration;
use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
use dimas_core::{
Result,
enums::OperationState,
message_types::{ControlResponse, Message},
traits::Context,
utils::selector_from,
};
use futures::future::{BoxFuture, Future};
#[cfg(feature = "std")]
use std::{collections::HashMap, sync::RwLock};
#[cfg(feature = "std")]
use tokio::sync::Mutex;
pub struct ObservableBuilder<P, K, CC, FC, EF, S>
where
P: Send + Sync + 'static,
{
session_id: String,
context: Context<P>,
activation_state: OperationState,
feedback_interval: Duration,
selector: K,
control_callback: CC,
feedback_callback: FC,
execution_callback: EF,
storage: S,
}
impl<P> ObservableBuilder<P, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
Self {
session_id: session_id.into(),
context,
activation_state: OperationState::Active,
feedback_interval: Duration::from_millis(100),
selector: NoSelector,
control_callback: NoCallback,
feedback_callback: NoCallback,
execution_callback: NoCallback,
storage: NoStorage,
}
}
}
impl<P, K, CC, FC, EC, S> ObservableBuilder<P, K, CC, FC, EC, S>
where
P: Send + Sync + 'static,
{
#[must_use]
pub const fn activation_state(mut self, state: OperationState) -> Self {
self.activation_state = state;
self
}
#[must_use]
pub const fn feedback_interval(mut self, interval: Duration) -> Self {
self.feedback_interval = interval;
self
}
#[must_use]
pub fn session_id(mut self, session_id: &str) -> Self {
self.session_id = session_id.into();
self
}
}
impl<P, CC, FC, EF, S> ObservableBuilder<P, NoSelector, CC, FC, EF, S>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn selector(self, selector: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
let Self {
session_id,
context,
activation_state,
feedback_interval,
storage,
control_callback,
feedback_callback,
execution_callback,
..
} = self;
ObservableBuilder {
session_id,
context,
activation_state,
feedback_interval,
selector: Selector {
selector: selector.into(),
},
control_callback,
feedback_callback,
execution_callback,
storage,
}
}
#[must_use]
pub fn topic(self, topic: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
let selector = selector_from(topic, self.context.prefix());
self.selector(&selector)
}
}
impl<P, K, FC, EF, S> ObservableBuilder<P, K, NoCallback, FC, EF, S>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn control_callback<C, F>(
self,
mut callback: C,
) -> ObservableBuilder<P, K, Callback<ArcControlCallback<P>>, FC, EF, S>
where
C: FnMut(Context<P>, Message) -> F + Send + Sync + 'static,
F: Future<Output = Result<ControlResponse>> + Send + Sync + 'static,
{
let Self {
session_id,
context,
activation_state,
feedback_interval,
selector,
storage,
feedback_callback,
execution_callback,
..
} = self;
let callback: ControlCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
let callback: ArcControlCallback<P> = Arc::new(Mutex::new(callback));
ObservableBuilder {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback: Callback { callback },
feedback_callback,
execution_callback,
storage,
}
}
}
impl<P, K, CC, EF, S> ObservableBuilder<P, K, CC, NoCallback, EF, S>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn feedback_callback<C, F>(
self,
mut callback: C,
) -> ObservableBuilder<P, K, CC, Callback<ArcFeedbackCallback<P>>, EF, S>
where
C: FnMut(Context<P>) -> F + Send + Sync + 'static,
F: Future<Output = Result<Message>> + Send + Sync + 'static,
{
let Self {
session_id,
context,
activation_state,
feedback_interval,
selector,
storage,
control_callback,
execution_callback,
..
} = self;
let callback: FeedbackCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
let callback: ArcFeedbackCallback<P> = Arc::new(Mutex::new(callback));
ObservableBuilder {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback,
feedback_callback: Callback { callback },
execution_callback,
storage,
}
}
}
impl<P, K, CC, FC, S> ObservableBuilder<P, K, CC, FC, NoCallback, S>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn execution_callback<C, F>(
self,
mut callback: C,
) -> ObservableBuilder<P, K, CC, FC, Callback<ArcExecutionCallback<P>>, S>
where
C: FnMut(Context<P>) -> F + Send + Sync + 'static,
F: Future<Output = Result<Message>> + Send + Sync + 'static,
{
let Self {
session_id,
context,
activation_state,
feedback_interval,
selector,
storage,
control_callback,
feedback_callback,
..
} = self;
let callback: ExecutionCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
let callback = Arc::new(Mutex::new(callback));
ObservableBuilder {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback,
feedback_callback,
execution_callback: Callback { callback },
storage,
}
}
}
impl<P, K, CC, FC, EF> ObservableBuilder<P, K, CC, FC, EF, NoStorage>
where
P: Send + Sync + 'static,
{
#[must_use]
pub fn storage(
self,
storage: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
) -> ObservableBuilder<P, K, CC, FC, EF, Storage<Box<dyn Responder>>> {
let Self {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback,
feedback_callback,
execution_callback,
..
} = self;
ObservableBuilder {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback,
feedback_callback,
execution_callback,
storage: Storage { storage },
}
}
}
impl<P, S>
ObservableBuilder<
P,
Selector,
Callback<ArcControlCallback<P>>,
Callback<ArcFeedbackCallback<P>>,
Callback<
Arc<
Mutex<
Box<dyn FnMut(Context<P>) -> BoxFuture<'static, Result<Message>> + Send + Sync>,
>,
>,
>,
S,
>
where
P: Send + Sync + 'static,
{
pub fn build(self) -> Result<Observable<P>> {
let Self {
session_id,
context,
activation_state,
feedback_interval,
selector,
control_callback,
feedback_callback,
execution_callback,
..
} = self;
let session = context
.session(&session_id)
.ok_or_else(|| Error::NoZenohSession)?;
Ok(Observable::new(
session,
selector.selector,
context,
activation_state,
feedback_interval,
control_callback.callback,
feedback_callback.callback,
execution_callback.callback,
))
}
}
impl<P>
ObservableBuilder<
P,
Selector,
Callback<ArcControlCallback<P>>,
Callback<ArcFeedbackCallback<P>>,
Callback<ArcExecutionCallback<P>>,
Storage<Box<dyn Responder>>,
>
where
P: Send + Sync + 'static,
{
pub fn add(self) -> Result<Option<Box<dyn Responder>>> {
let collection = self.storage.storage.clone();
let q = self.build()?;
let r = collection
.write()
.map_err(|_| Error::MutexPoison(String::from("ObservableBuilder")))?
.insert(q.selector().to_string(), Box::new(q));
Ok(r)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug)]
struct Props {}
const fn is_normal<T: Sized + Send + Sync>() {}
#[test]
const fn normal_types() {
is_normal::<
ObservableBuilder<Props, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>,
>();
}
}