use crate::error::Result;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use rosrustext_msgs::lifecycle_msgs::msg::{TransitionDescription, TransitionEvent};
use rosrustext_msgs::lifecycle_msgs::srv::{ChangeState, GetAvailableStates, GetAvailableTransitions, GetState};
use rclrs::{
Client, ClientOptions, Executor, IntoNodeOptions, IntoNodeServiceCallback, IntoNodeSubscriptionCallback, Node,
Service, ServiceOptions, Subscription, SubscriptionOptions, TimerOptions,
};
use rosrustext_core::lifecycle::{ActivationGate, CallbackResult, CompleteInput, State, StateMachine, Transition};
#[cfg(feature = "bond")]
use super::BondAgent;
use super::{utils, ManagedPublisher, ManagedTimer};
pub trait LifecycleCallbacksWithNode {
fn on_configure(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
fn on_activate(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
fn on_deactivate(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
fn on_cleanup(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
fn on_shutdown(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
fn on_error(&mut self, node: &LifecycleNode, state: &State) -> CallbackResult;
}
fn run_transition_callback(
callbacks: &mut dyn LifecycleCallbacksWithNode, transition: Transition, node: &LifecycleNode, state: State,
) -> CallbackResult {
match transition {
Transition::Configure => callbacks.on_configure(node, &state),
Transition::Activate => callbacks.on_activate(node, &state),
Transition::Deactivate => callbacks.on_deactivate(node, &state),
Transition::Cleanup => callbacks.on_cleanup(node, &state),
Transition::Shutdown => callbacks.on_shutdown(node, &state),
}
}
#[derive(Clone)]
pub struct LifecycleNode {
node: Arc<Node>,
gate: Arc<ActivationGate>,
machine: Arc<Mutex<StateMachine>>,
callbacks: Arc<Mutex<Box<dyn LifecycleCallbacksWithNode + Send>>>,
completion_tx: mpsc::Sender<AsyncOutcome>,
completion_rx: Arc<Mutex<mpsc::Receiver<AsyncOutcome>>>,
transition_event_pub: Arc<Mutex<Option<Arc<rclrs::Publisher<TransitionEvent>>>>>,
#[cfg(feature = "bond")]
bond: Arc<Mutex<Option<Arc<BondAgent>>>>,
internals: Arc<Mutex<Vec<Box<dyn std::any::Any + Send + Sync>>>>,
}
struct DefaultCallbacks;
impl LifecycleCallbacksWithNode for DefaultCallbacks {
fn on_configure(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
fn on_activate(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
fn on_deactivate(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
fn on_cleanup(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
fn on_shutdown(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
fn on_error(&mut self, _node: &LifecycleNode, _state: &State) -> CallbackResult {
CallbackResult::Success
}
}
impl LifecycleNode {
pub fn create<'a>(executor: &'a Executor, options: impl IntoNodeOptions<'a>) -> Result<Self> {
let node = Arc::new(executor.create_node(options)?);
Self::try_new(node)
}
pub fn try_new(node: Arc<Node>) -> Result<Self> {
let (completion_tx, completion_rx) = mpsc::channel();
let ln = Self {
node,
gate: Arc::new(ActivationGate::new()),
machine: Arc::new(Mutex::new(StateMachine::new())),
callbacks: Arc::new(Mutex::new(Box::new(DefaultCallbacks))),
completion_tx,
completion_rx: Arc::new(Mutex::new(completion_rx)),
transition_event_pub: Arc::new(Mutex::new(None)),
#[cfg(feature = "bond")]
bond: Arc::new(Mutex::new(None)),
internals: Arc::new(Mutex::new(Vec::new())),
};
ln.enable_defaults()?;
Ok(ln)
}
pub fn new_with_callbacks(node: Arc<Node>, callbacks: Box<dyn LifecycleCallbacksWithNode + Send>) -> Result<Self> {
let (completion_tx, completion_rx) = mpsc::channel();
let ln = Self {
node,
gate: Arc::new(ActivationGate::new()),
machine: Arc::new(Mutex::new(StateMachine::new())),
callbacks: Arc::new(Mutex::new(callbacks)),
completion_tx,
completion_rx: Arc::new(Mutex::new(completion_rx)),
transition_event_pub: Arc::new(Mutex::new(None)),
#[cfg(feature = "bond")]
bond: Arc::new(Mutex::new(None)),
internals: Arc::new(Mutex::new(Vec::new())),
};
ln.enable_defaults()?;
Ok(ln)
}
pub fn from_node(node: Arc<Node>) -> Result<Self> {
Self::try_new(node)
}
pub fn try_with_gate(node: Arc<Node>, gate: Arc<ActivationGate>) -> Result<Self> {
let (completion_tx, completion_rx) = mpsc::channel();
let ln = Self {
node,
gate,
machine: Arc::new(Mutex::new(StateMachine::new())),
callbacks: Arc::new(Mutex::new(Box::new(DefaultCallbacks))),
completion_tx,
completion_rx: Arc::new(Mutex::new(completion_rx)),
transition_event_pub: Arc::new(Mutex::new(None)),
#[cfg(feature = "bond")]
bond: Arc::new(Mutex::new(None)),
internals: Arc::new(Mutex::new(Vec::new())),
};
ln.enable_defaults()?;
Ok(ln)
}
pub fn create_with_callbacks<'a>(
executor: &'a Executor, options: impl IntoNodeOptions<'a>,
callbacks: Box<dyn LifecycleCallbacksWithNode + Send>,
) -> Result<Self> {
let node = Arc::new(executor.create_node(options)?);
Self::new_with_callbacks(node, callbacks)
}
pub fn node(&self) -> &Arc<Node> {
&self.node
}
pub fn node_arc(&self) -> Arc<Node> {
Arc::clone(&self.node)
}
pub fn name(&self) -> String {
self.node.name()
}
pub fn namespace(&self) -> String {
self.node.namespace()
}
pub fn create_service<'a, T, Args>(
&self, options: impl Into<ServiceOptions<'a>>, callback: impl IntoNodeServiceCallback<T, Args>,
) -> Result<Service<T>>
where
T: rclrs::ServiceIDL,
{
Ok(self.node.create_service::<T, Args>(options, callback)?)
}
pub fn create_subscription<'a, T, Args>(
&self, options: impl Into<SubscriptionOptions<'a>>, callback: impl IntoNodeSubscriptionCallback<T, Args>,
) -> Result<Subscription<T>>
where
T: rclrs::MessageIDL,
{
Ok(self.node.create_subscription::<T, Args>(options, callback)?)
}
pub fn create_client<'a, T>(&self, options: impl Into<ClientOptions<'a>>) -> Result<Client<T>>
where
T: rclrs::ServiceIDL,
{
Ok(self.node.create_client::<T>(options)?)
}
pub fn activate(&self) {
self.gate.activate();
}
pub fn deactivate(&self) {
self.gate.deactivate();
}
pub fn is_active(&self) -> bool {
self.gate.is_active()
}
pub fn create_publisher<T>(&self, topic: &str) -> Result<ManagedPublisher<T>>
where
T: rclrs::MessageIDL,
{
let pub_ = self.node.create_publisher::<T>(topic)?;
Ok(ManagedPublisher::new(pub_, Arc::clone(&self.gate)))
}
pub fn create_timer_repeating_gated<F>(&self, period: Duration, mut callback: F) -> Result<ManagedTimer>
where
F: FnMut() + Send + 'static,
{
let gate = Arc::clone(&self.gate);
let timer = self.node.create_timer_repeating(TimerOptions::new(period), move || {
if gate.is_active() {
callback();
}
})?;
Ok(ManagedTimer::new(timer))
}
fn handle_completion_outcome(&self, outcome: AsyncOutcome) {
let mut machine_guard = self.machine.lock().expect("machine mutex poisoned");
let result = match machine_guard.complete(outcome.input) {
Ok(res) => res,
Err(_e) => {
return;
}
};
drop(machine_guard);
if result.gate_active {
self.gate.activate();
} else {
self.gate.deactivate();
}
#[cfg(feature = "bond")]
if let Some(agent) = self.bond.lock().expect("bond mutex poisoned").as_ref() {
agent.set_active(result.gate_active);
}
if let Some(pub_) = self.transition_event_pub.lock().expect("transition_event_pub mutex poisoned").as_ref() {
let evt = utils::make_transition_event(
result.start_state,
result.final_state,
outcome.transition_id,
outcome.label,
);
let _ = pub_.publish(evt);
}
}
}
impl LifecycleNode {
pub(crate) fn enable_defaults(&self) -> Result<()> {
self.enable_transition_event_publisher()?;
self.enable_get_state_service()?;
self.enable_change_state_service()?;
self.enable_completion_pump()?;
self.enable_get_available_states_service()?;
self.enable_get_available_transitions_service()?;
self.enable_bond()?;
Ok(())
}
pub(crate) fn enable_transition_event_publisher(&self) -> Result<()> {
let topic = format!("/{}/transition_event", self.node.name());
let pub_ = Arc::new(self.node.create_publisher::<TransitionEvent>(&topic)?);
*self.transition_event_pub.lock().expect("transition_event_pub mutex poisoned") = Some(Arc::clone(&pub_));
self.keep_internal(pub_);
Ok(())
}
pub(crate) fn enable_get_state_service(&self) -> Result<()> {
let service_name = format!("/{}/get_state", self.node.name());
let machine = Arc::clone(&self.machine);
let svc = self.node.create_service::<GetState, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetState_Request| {
let current = machine.lock().expect("machine mutex poisoned").stable_state();
rosrustext_msgs::lifecycle_msgs::srv::GetState_Response { current_state: utils::ros_state_msg(current) }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_change_state_service(&self) -> Result<()> {
let service_name = format!("/{}/change_state", self.node.name());
let machine = Arc::clone(&self.machine);
let callbacks = Arc::clone(&self.callbacks);
let completion_tx = self.completion_tx.clone();
let active_self = self.clone();
let svc = self.node.create_service::<ChangeState, _>(
&service_name,
move |req: rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Request| {
let transition_id = req.transition.id;
let delay_ms = utils::change_state_delay_ms();
let mut machine_guard = machine.lock().expect("machine mutex poisoned");
let start_state = machine_guard.stable_state();
let spec = match utils::transition_spec_for_ros_id(start_state, transition_id) {
Some(spec) => spec,
None => return rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: false },
};
let flight = match machine_guard.begin(spec.transition) {
Ok(flight) => flight,
Err(_) => return rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: false },
};
let label = spec.label;
let transition_id = spec.transition_id;
drop(machine_guard);
let callbacks_clone = callbacks.clone();
let completion_tx_clone = completion_tx.clone();
let active_self_for_cb = active_self.clone();
let start_state = flight.start;
let do_callback = move || -> CompleteInput {
if delay_ms > 0 {
std::thread::sleep(Duration::from_millis(delay_ms));
}
let mut cb_guard = callbacks_clone.lock().expect("callbacks mutex poisoned");
let mut result =
run_transition_callback(cb_guard.as_mut(), flight.transition, &active_self_for_cb, start_state);
if let Some(forced) = utils::transition_result_override_for(flight.transition) {
result = forced;
}
let on_error_result = if result == CallbackResult::Error {
let mut on_error = cb_guard.on_error(&active_self_for_cb, &start_state);
if let Some(forced) = utils::on_error_result_override_for(flight.transition) {
on_error = forced;
}
Some(on_error)
} else {
None
};
drop(cb_guard);
CompleteInput { result, on_error_result }
};
if delay_ms == 0 {
let input = do_callback();
let active_self_clone = active_self.clone();
active_self_clone.handle_completion_outcome(AsyncOutcome { input, label, transition_id });
} else {
std::thread::spawn(move || {
let input = do_callback();
let _ = completion_tx_clone.send(AsyncOutcome { input, label, transition_id });
});
}
rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: true }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_get_available_transitions_service(&self) -> Result<()> {
let service_name = format!("/{}/get_available_transitions", self.node.name());
let machine = Arc::clone(&self.machine);
let svc = self.node.create_service::<GetAvailableTransitions, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetAvailableTransitions_Request| {
let guard = machine.lock().expect("machine mutex poisoned");
let current = guard.current_state();
let transitions: Vec<TransitionDescription> = utils::transition_entries_for_start(current)
.into_iter()
.map(|entry| {
utils::transition_description(
entry.spec.start,
entry.goal,
entry.spec.transition_id,
entry.spec.label,
)
})
.collect();
rosrustext_msgs::lifecycle_msgs::srv::GetAvailableTransitions_Response {
available_transitions: transitions,
}
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_completion_pump(&self) -> Result<()> {
let completion_rx = Arc::clone(&self.completion_rx);
let ln = self.clone();
let timer = self.node.create_timer_repeating(TimerOptions::new(Duration::from_millis(10)), move || {
let outcomes: Vec<AsyncOutcome> = {
let rx = completion_rx.lock().expect("completion_rx mutex poisoned");
let mut pending = Vec::new();
loop {
match rx.try_recv() {
Ok(outcome) => pending.push(outcome),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
pending
};
for outcome in outcomes {
ln.handle_completion_outcome(outcome);
}
})?;
self.keep_internal(timer);
Ok(())
}
pub(crate) fn enable_get_available_states_service(&self) -> Result<()> {
let service_name = format!("/{}/get_available_states", self.node.name());
let svc = self.node.create_service::<GetAvailableStates, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetAvailableStates_Request| {
let states = vec![
utils::ros_state_msg(State::Unconfigured),
utils::ros_state_msg(State::Inactive),
utils::ros_state_msg(State::Active),
utils::ros_state_msg(State::Finalized),
];
rosrustext_msgs::lifecycle_msgs::srv::GetAvailableStates_Response { available_states: states }
},
)?;
self.keep_internal(svc);
Ok(())
}
#[cfg(feature = "bond")]
pub(crate) fn enable_bond(&self) -> Result<()> {
let heartbeat_period = Duration::from_secs(1);
let heartbeat_timeout = Duration::from_secs(4);
let agent = Arc::new(BondAgent::new(Arc::clone(&self.node), heartbeat_period, heartbeat_timeout)?);
agent.set_active(false);
*self.bond.lock().expect("bond mutex poisoned") = Some(Arc::clone(&agent));
self.keep_internal(agent);
Ok(())
}
#[cfg(not(feature = "bond"))]
pub(crate) fn enable_bond(&self) -> Result<()> {
Ok(())
}
pub(crate) fn keep_internal<T>(&self, handle: T)
where
T: Send + Sync + 'static,
{
self.internals.lock().expect("LifecycleNode internals poisoned").push(Box::new(handle));
}
}
#[derive(Debug, Clone, Copy)]
struct AsyncOutcome {
input: CompleteInput,
label: &'static str,
transition_id: u8,
}