rclrust 0.0.2

ROS2 client written in Rust
use std::sync::{Arc, Weak};
use std::time::Duration;

use anyhow::Result;

use crate::context::Context;
use crate::error::RclRustError;
use crate::node::Node;
use crate::wait_set::RclWaitSet;

pub fn spin(node: &Arc<Node<'_>>) -> Result<()> {
    let mut exec = SingleThreadExecutor::new(node.context)?;
    exec.add_node(node);
    exec.spin()?;

    Ok(())
}

pub fn spin_some(node: &Arc<Node<'_>>) -> Result<()> {
    let mut exec = SingleThreadExecutor::new(node.context)?;
    exec.add_node(node);
    exec.spin_some(Duration::ZERO)?;

    Ok(())
}

pub struct SingleThreadExecutor<'ctx> {
    context: &'ctx Context,
    nodes: Vec<Weak<Node<'ctx>>>,
}

impl<'ctx> SingleThreadExecutor<'ctx> {
    pub const fn new(context: &'ctx Context) -> Result<Self> {
        Ok(Self {
            context,
            nodes: Vec::new(),
        })
    }

    pub fn spin(&self) -> Result<()> {
        while self.context.is_valid() {
            if let Err(e) = self.spin_some(Duration::from_nanos(500)) {
                match e.downcast_ref::<RclRustError>() {
                    Some(RclRustError::RclTimeout(_)) => continue,
                    _ => return Err(e),
                }
            }
        }

        Ok(())
    }

    pub fn spin_some(&self, max_duration: Duration) -> Result<()> {
        let (n_subscriptions, _, n_timers, n_clients, n_services, _) =
            self.nodes.iter().filter_map(|n| n.upgrade()).fold(
                (0, 0, 0, 0, 0, 0),
                |(subs, guards, timers, clients, services, events), node| {
                    (
                        subs + node.subscriptions.lock().unwrap().len(),
                        guards,
                        timers + node.timers.lock().unwrap().len(),
                        clients + node.clients.lock().unwrap().len(),
                        services + node.services.lock().unwrap().len(),
                        events,
                    )
                },
            );

        let mut wait_set = RclWaitSet::new(
            &mut self.context.handle.lock().unwrap(),
            n_subscriptions,
            0,
            n_timers,
            n_clients,
            n_services,
            0,
        )?;

        wait_set.clear()?;

        for node in self.nodes.iter().filter_map(|n| n.upgrade()) {
            node.add_to_wait_set(&mut wait_set)?;
        }

        wait_set.wait(max_duration.as_nanos() as i64)?;

        for node in self.nodes.iter().filter_map(|n| n.upgrade()) {
            node.call_callbacks()?;
        }

        Ok(())
    }

    fn add_node(&mut self, node: &Arc<Node<'ctx>>) {
        self.nodes.push(Arc::downgrade(node));
    }
}