safe_drive 0.4.3

safe_drive: Formally Specified Rust Bindings for ROS2
Documentation
use super::{guard_condition::GuardCondition, CallbackResult};
use crate::{
    action,
    context::Context,
    error::DynError,
    service::{client::ClientData, server::ServerData},
    signal_handler,
    topic::subscriber::RCLSubscription,
};
use crossbeam_channel::{Receiver, Sender};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::{
    sync::Arc,
    thread::{self, yield_now, JoinHandle},
};

pub(crate) static SELECTOR: Lazy<Mutex<AsyncSelector>> =
    Lazy::new(|| Mutex::new(AsyncSelector::new()));

pub(crate) enum Command {
    Subscription(
        Arc<RCLSubscription>,
        Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    ),
    RemoveSubscription(Arc<RCLSubscription>),
    Server(
        Arc<ServerData>,
        Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    ),
    RemoveServer(Arc<ServerData>),
    Client(
        Arc<ClientData>,
        Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    ),
    RemoveClient(Arc<ClientData>),
    ActionClient {
        data: Arc<action::client::ClientData>,
        feedback: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        status: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        goal: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        cancel: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        result: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    },
    RemoveActionClient(Arc<action::client::ClientData>),
    ActionServer {
        data: Arc<action::server::ServerData>,
        goal: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        cancel: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
        result: Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    },
    RemoveActionServer(Arc<action::server::ServerData>),
    ConditionVar(
        GuardCondition,
        Box<dyn FnMut() -> CallbackResult + Send + Sync + 'static>,
    ),
    RemoveConditionVar(GuardCondition),
    Halt,
}

struct SelectorData {
    tx: Sender<Command>,
    th: JoinHandle<Result<(), DynError>>,
    cond: GuardCondition,
    _context: Arc<Context>,
}

pub(crate) struct AsyncSelector {
    data: Option<SelectorData>,
}

unsafe impl Sync for AsyncSelector {}
unsafe impl Send for AsyncSelector {}

impl AsyncSelector {
    fn new() -> Self {
        AsyncSelector { data: None }
    }

    pub(crate) fn halt(&mut self) -> Result<(), DynError> {
        if let Some(SelectorData { tx, cond, th, .. }) = self.data.take() {
            tx.send(Command::Halt)?;
            cond.trigger()?;

            yield_now();
            let _ = th.join();
        }

        Ok(())
    }

    pub(crate) fn send_command(
        &mut self,
        context: &Arc<Context>,
        cmd: Command,
    ) -> Result<(), DynError> {
        loop {
            if let Some(SelectorData { tx, cond, .. }) = &self.data {
                tx.send(cmd)?;
                cond.trigger()?;
                return Ok(());
            } else {
                if let Command::Halt = cmd {
                    return Ok(());
                }

                let (tx, rx) = crossbeam_channel::bounded(256);
                let guard = super::guard_condition::GuardCondition::new(context.clone())?;
                let ctx = context.clone();
                let guard2 = guard.clone();
                let th = thread::spawn(move || select(ctx, guard2, rx));
                self.data = Some(SelectorData {
                    tx,
                    th,
                    _context: context.clone(),
                    cond: guard,
                });
            }
        }
    }
}

fn select(
    context: Arc<Context>,
    guard: GuardCondition,
    rx: Receiver<Command>,
) -> Result<(), DynError> {
    let mut selector = super::Selector::new(context)?;

    selector.add_guard_condition(&guard, None, false);

    loop {
        for cmd in rx.try_iter() {
            match cmd {
                Command::Subscription(s, h) => selector.add_rcl_subscription(s, Some(h), true),
                Command::RemoveSubscription(s) => selector.remove_rcl_subscription(&s),
                Command::Server(s, h) => selector.add_server_data(s, Some(h), true),
                Command::RemoveServer(s) => selector.remove_server_data(&s),
                Command::Client(c, h) => selector.add_client_data(c, Some(h), true),
                Command::RemoveClient(c) => selector.remove_client_data(&c),
                Command::ActionClient {
                    data,
                    feedback,
                    status,
                    goal,
                    cancel,
                    result,
                } => selector.add_action_client_data(
                    data,
                    Some(feedback),
                    Some(status),
                    Some(goal),
                    Some(cancel),
                    Some(result),
                ),
                Command::RemoveActionClient(c) => selector.remove_action_client_data(&c),
                Command::ActionServer {
                    data,
                    goal,
                    cancel,
                    result,
                } => selector.add_action_server_data(data, Some(goal), Some(cancel), Some(result)),
                Command::RemoveActionServer(s) => selector.remove_action_server_data(&s),
                Command::ConditionVar(c, h) => selector.add_guard_condition(&c, Some(h), true),
                Command::RemoveConditionVar(c) => selector.remove_guard_condition(&c),
                Command::Halt => return Ok(()),
            }
        }

        if let Err(_e) = selector.wait() {
            if signal_handler::is_halt() {
                for (_, h) in selector.subscriptions.iter_mut() {
                    if let Some(handler) = &mut h.handler {
                        (*handler)();
                    }
                }

                for (_, h) in selector.services.iter_mut() {
                    if let Some(handler) = &mut h.handler {
                        (*handler)();
                    }
                }

                for (_, h) in selector.clients.iter_mut() {
                    if let Some(handler) = &mut h.handler {
                        (*handler)();
                    }
                }

                for (_, h) in selector.cond.iter_mut() {
                    if let Some(handler) = &mut h.handler {
                        (*handler)();
                    }
                }

                return Ok(());
            }
        }
    }
}