use crate::{
vendor::builtin_interfaces::msg::Time, CancellationClient, FeedbackClient, GoalStatus,
GoalStatusCode, ResultClient, StatusWatcher,
};
use rosidl_runtime_rs::Action;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::{Stream, StreamMap};
pub struct GoalClient<A: Action> {
pub feedback: FeedbackClient<A>,
pub status: StatusWatcher<A>,
pub result: ResultClient<A>,
pub cancellation: CancellationClient<A>,
pub stamp: Time,
}
impl<A: Action> Clone for GoalClient<A> {
fn clone(&self) -> Self {
Self {
feedback: self.feedback.clone(),
status: self.status.clone(),
result: self.result.clone(),
cancellation: self.cancellation.clone(),
stamp: self.stamp.clone(),
}
}
}
impl<A: Action> GoalClient<A> {
pub fn stream(self) -> GoalClientStream<A> {
let Self {
mut feedback,
mut status,
result,
..
} = self;
let rx_feedback = Box::pin(async_stream::stream! {
while let Some(msg) = feedback.recv().await {
yield GoalEvent::Feedback(msg);
}
}) as Pin<Box<dyn Stream<Item = GoalEvent<A>> + Send>>;
let rx_status = Box::pin(async_stream::stream! {
let initial_value = (*status.borrow_and_update()).clone();
yield GoalEvent::Status(initial_value);
while let Ok(_) = status.changed().await {
let value = (*status.borrow_and_update()).clone();
yield GoalEvent::Status(value);
}
}) as Pin<Box<dyn Stream<Item = GoalEvent<A>> + Send>>;
let rx_result = Box::pin(async_stream::stream! {
yield GoalEvent::Result(result.await);
}) as Pin<Box<dyn Stream<Item = GoalEvent<A>> + Send>>;
let mut stream_map: StreamMap<i32, Pin<Box<dyn Stream<Item = GoalEvent<A>> + Send>>> =
StreamMap::new();
stream_map.insert(0, rx_feedback);
stream_map.insert(1, rx_status);
stream_map.insert(2, rx_result);
GoalClientStream { stream_map }
}
}
pub struct GoalClientStream<A: Action> {
stream_map: StreamMap<i32, Pin<Box<dyn Stream<Item = GoalEvent<A>> + Send>>>,
}
impl<A: Action> Stream for GoalClientStream<A> {
type Item = GoalEvent<A>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Stream::poll_next(Pin::new(&mut self.get_mut().stream_map), cx)
.map(|r| r.map(|(_, event)| event))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream_map.size_hint()
}
}
pub enum GoalEvent<A: Action> {
Feedback(A::Feedback),
Status(GoalStatus),
Result((GoalStatusCode, A::Result)),
}