use futures::{
channel::{
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
oneshot,
},
future::{select, select_all, BoxFuture, Either},
stream::StreamFuture,
task::{waker_ref, ArcWake},
StreamExt,
};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex, Weak,
},
task::Context as TaskContext,
time::Instant,
};
use crate::{
log_debug, log_fatal, log_warn, ExecutorChannel, ExecutorRuntime, ExecutorWorkerOptions,
GuardCondition, PayloadTask, RclrsError, SpinConditions, WaitSetRunConditions, WaitSetRunner,
Waitable, WeakActivityListener, WorkerChannel,
};
static FAILED_TO_SEND_WORKER: &'static str =
"Failed to send the new runner. This should never happen. \
Please report this to the rclrs maintainers with a minimal reproducible example.";
pub struct BasicExecutorRuntime {
ready_queue: Receiver<Arc<Task>>,
task_sender: TaskSender,
wait_set_runners: Vec<WaitSetRunner>,
all_guard_conditions: AllGuardConditions,
new_worker_receiver: Option<StreamFuture<UnboundedReceiver<WaitSetRunner>>>,
new_worker_sender: UnboundedSender<WaitSetRunner>,
}
#[derive(Clone, Default)]
struct AllGuardConditions {
inner: Arc<Mutex<Vec<Weak<GuardCondition>>>>,
}
impl AllGuardConditions {
fn trigger(&self) {
self.inner.lock().unwrap().retain(|guard_condition| {
if let Some(guard_condition) = guard_condition.upgrade() {
if let Err(err) = guard_condition.trigger() {
log_fatal!(
"rclrs.executor.basic_executor",
"Failed to trigger a guard condition. This should never happen. \
Please report this to the rclrs maintainers with a minimal reproducible example. \
Error: {err}",
);
}
true
} else {
false
}
});
}
fn push(&self, guard_condition: Weak<GuardCondition>) {
let mut inner = self.inner.lock().unwrap();
if inner
.iter()
.find(|other| guard_condition.ptr_eq(other))
.is_some()
{
return;
}
inner.push(guard_condition);
}
}
impl ExecutorRuntime for BasicExecutorRuntime {
fn spin(&mut self, conditions: SpinConditions) -> Vec<RclrsError> {
let conditions = self.process_spin_conditions(conditions);
let new_workers = self.new_worker_receiver.take().expect(
"Basic executor was missing its new_worker_receiver at the start of its spinning. \
This is a critical bug in rclrs. \
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
);
let all_guard_conditions = self.all_guard_conditions.clone();
let (worker_result_sender, worker_result_receiver) = channel();
let workers_finished = Arc::new(AtomicBool::new(false));
for runner in self.wait_set_runners.drain(..) {
if let Err(err) = self.new_worker_sender.unbounded_send(runner) {
log_fatal!(
"rclrs.executor.basic_executor",
"{FAILED_TO_SEND_WORKER} Error: {err}",
);
}
}
let workers_finished_clone = Arc::clone(&workers_finished);
self.task_sender.add_async_task(Box::pin(async move {
let workers = manage_workers(new_workers, all_guard_conditions, conditions).await;
if let Err(err) = worker_result_sender.send(workers) {
log_fatal!(
"rclrs.executor.basic_executor",
"Failed to send a runner result. This should never happen. \
Please report this to the rclrs maintainers with a minimal \
reproducible example. Error: {err}",
);
}
workers_finished_clone.store(true, Ordering::Release);
}));
while let Ok(task) = self.next_task(&workers_finished) {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let task_context = &mut TaskContext::from_waker(&waker);
if future.as_mut().poll(task_context).is_pending() {
*future_slot = Some(future);
}
}
}
let (runners, new_worker_receiver, errors) = worker_result_receiver.recv().expect(
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
This is a critical bug in rclrs. \
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
);
self.wait_set_runners = runners;
self.new_worker_receiver = Some(new_worker_receiver);
errors
}
fn spin_async(
mut self: Box<Self>,
conditions: SpinConditions,
) -> BoxFuture<'static, (Box<dyn ExecutorRuntime>, Vec<RclrsError>)> {
let (sender, receiver) = oneshot::channel();
std::thread::spawn(move || {
let result = self.spin(conditions);
sender.send((self as Box<dyn ExecutorRuntime>, result)).ok();
});
Box::pin(async move {
receiver.await.expect(
"The basic executor async spin thread was dropped without finishing. \
This is a critical bug in rclrs. \
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
)
})
}
fn channel(&self) -> Arc<dyn ExecutorChannel> {
Arc::new(BasicExecutorChannel {
task_sender: self.task_sender.clone(),
new_worker_sender: self.new_worker_sender.clone(),
all_guard_conditions: self.all_guard_conditions.clone(),
})
}
}
impl BasicExecutorRuntime {
pub(crate) fn new() -> Self {
let (task_sender, ready_queue) = channel();
let (new_worker_sender, new_worker_receiver) = unbounded();
Self {
ready_queue,
task_sender: TaskSender { task_sender },
wait_set_runners: Vec::new(),
all_guard_conditions: AllGuardConditions::default(),
new_worker_receiver: Some(new_worker_receiver.into_future()),
new_worker_sender,
}
}
fn process_spin_conditions(&self, mut conditions: SpinConditions) -> WaitSetRunConditions {
if let Some(promise) = conditions.options.until_promise_resolved.take() {
let halt_spinning = Arc::clone(&conditions.halt_spinning);
let all_guard_conditions = self.all_guard_conditions.clone();
self.task_sender.add_async_task(Box::pin(async move {
if let Err(err) = promise.await {
log_warn!(
"rclrs.executor.basic_executor",
"Sender for SpinOptions::until_promise_resolved was \
dropped, so the Promise will never be fulfilled. \
Spinning will stop now. Error message: {err}"
);
}
halt_spinning.store(true, Ordering::Release);
all_guard_conditions.trigger();
}));
}
WaitSetRunConditions {
only_next_available_work: conditions.options.only_next_available_work,
stop_time: conditions.options.timeout.map(|t| Instant::now() + t),
context: conditions.context,
halt_spinning: conditions.halt_spinning,
}
}
fn next_task(&mut self, wait_set_finished: &AtomicBool) -> Result<Arc<Task>, ()> {
if wait_set_finished.load(Ordering::Acquire) {
self.ready_queue.try_recv().map_err(|_| ())
} else {
self.ready_queue.recv().map_err(|_| ())
}
}
}
struct BasicExecutorChannel {
task_sender: TaskSender,
all_guard_conditions: AllGuardConditions,
new_worker_sender: UnboundedSender<WaitSetRunner>,
}
impl ExecutorChannel for BasicExecutorChannel {
fn create_worker(&self, options: ExecutorWorkerOptions) -> Arc<dyn WorkerChannel> {
let runner = WaitSetRunner::new(options);
let waitable_sender = runner.waitable_sender();
let payload_task_sender = runner.payload_task_sender();
let activity_listeners = runner.activity_listeners();
if let Err(err) = self.new_worker_sender.unbounded_send(runner) {
log_fatal!(
"rclrs.executor.basic_executor",
"{FAILED_TO_SEND_WORKER} Error: {err}",
);
}
Arc::new(BasicWorkerChannel {
waitable_sender,
task_sender: self.task_sender.clone(),
payload_task_sender,
activity_listeners,
})
}
fn wake_all_wait_sets(&self) {
self.all_guard_conditions.trigger();
}
}
struct BasicWorkerChannel {
task_sender: TaskSender,
waitable_sender: UnboundedSender<Waitable>,
payload_task_sender: UnboundedSender<PayloadTask>,
activity_listeners: Arc<Mutex<Vec<WeakActivityListener>>>,
}
impl WorkerChannel for BasicWorkerChannel {
fn add_to_wait_set(&self, new_entity: Waitable) {
if let Err(err) = self.waitable_sender.unbounded_send(new_entity) {
log_debug!(
"rclrs.basic_executor.add_to_waitset",
"Failed to add an item to the waitset: {err}",
);
}
}
fn add_async_task(&self, f: BoxFuture<'static, ()>) {
self.task_sender.add_async_task(f);
}
fn send_payload_task(&self, f: PayloadTask) {
if let Err(err) = self.payload_task_sender.unbounded_send(f) {
log_debug!(
"rclrs.BasicWorkerChannel",
"Failed to send a payload task: {err}",
);
}
}
fn add_activity_listener(&self, listener: WeakActivityListener) {
self.activity_listeners.lock().unwrap().push(listener);
}
}
#[derive(Clone)]
struct TaskSender {
task_sender: Sender<Arc<Task>>,
}
impl TaskSender {
fn add_async_task(&self, f: BoxFuture<'static, ()>) {
let task = Arc::new(Task {
future: Mutex::new(Some(f)),
task_sender: self.task_sender.clone(),
});
if let Err(_) = self.task_sender.send(task) {
log_debug!(
"rclrs.TaskSender.add_async_task",
"Failed to send a task. This indicates the Worker has shut down.",
);
}
}
}
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: Sender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = Arc::clone(arc_self);
arc_self.task_sender.send(cloned).ok();
}
}
async fn manage_workers(
mut new_workers: StreamFuture<UnboundedReceiver<WaitSetRunner>>,
all_guard_conditions: AllGuardConditions,
conditions: WaitSetRunConditions,
) -> (
Vec<WaitSetRunner>,
StreamFuture<UnboundedReceiver<WaitSetRunner>>,
Vec<RclrsError>,
) {
let mut active_runners: Vec<oneshot::Receiver<(WaitSetRunner, Result<(), RclrsError>)>> =
Vec::new();
let mut finished_runners: Vec<WaitSetRunner> = Vec::new();
let mut errors: Vec<RclrsError> = Vec::new();
let add_runner = |new_runner: Option<WaitSetRunner>,
active_runners: &mut Vec<_>,
finished_runners: &mut Vec<_>| {
if let Some(runner) = new_runner {
all_guard_conditions.push(Arc::downgrade(runner.guard_condition()));
if conditions.halt_spinning.load(Ordering::Acquire) {
finished_runners.push(runner);
} else {
active_runners.push(runner.run(conditions.clone()));
}
}
};
let (initial_worker, new_worker_receiver) = new_workers.await;
new_workers = new_worker_receiver.into_future();
add_runner(initial_worker, &mut active_runners, &mut finished_runners);
while !active_runners.is_empty() {
let next_event = select(select_all(active_runners), new_workers);
match next_event.await {
Either::Left(((finished_worker, _, remaining_workers), new_worker_stream)) => {
match finished_worker {
Ok((runner, result)) => {
finished_runners.push(runner);
if let Err(err) = result {
errors.push(err);
}
}
Err(_) => {
log_fatal!(
"rclrs.basic_executor",
"WaitSetRunner unexpectedly dropped. This should never happen. \
Please report this to the rclrs maintainers with a minimal \
reproducible example.",
);
}
}
active_runners = remaining_workers;
new_workers = new_worker_stream;
}
Either::Right(((new_worker, new_worker_receiver), remaining_workers)) => {
active_runners = remaining_workers.into_inner();
add_runner(new_worker, &mut active_runners, &mut finished_runners);
new_workers = new_worker_receiver.into_future();
}
}
}
(finished_runners, new_workers, errors)
}