use crate::error::Result;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use rosrustext_msgs::lifecycle_msgs::msg::{TransitionDescription, TransitionEvent};
use rosrustext_msgs::lifecycle_msgs::srv::{ChangeState, GetAvailableStates, GetAvailableTransitions, GetState};
use rclrs::{
Client, ClientOptions, Executor, IntoNodeOptions, IntoNodeServiceCallback, IntoNodeSubscriptionCallback, Node,
Service, ServiceOptions, Subscription, SubscriptionOptions, TimerOptions,
};
use rosrustext_core::lifecycle::{
begin, finish_with_error_handling, ActivationGate, CallbackResult, State, Transition,
};
#[cfg(feature = "transition_graph")]
use rosrustext_msgs::rosrustext_interfaces::srv::GetTransitionGraph;
#[cfg(feature = "bond")]
use super::BondAgent;
use super::{utils, ManagedPublisher, ManagedTimer};
#[derive(Clone)]
pub struct LifecycleNode {
node: Arc<Node>,
gate: Arc<ActivationGate>,
state: Arc<Mutex<LifecycleState>>,
completion_tx: mpsc::Sender<TransitionOutcome>,
completion_rx: Arc<Mutex<mpsc::Receiver<TransitionOutcome>>>,
transition_event_pub: Arc<Mutex<Option<Arc<rclrs::Publisher<TransitionEvent>>>>>,
#[cfg(feature = "bond")]
bond: Arc<Mutex<Option<Arc<BondAgent>>>>,
internals: Arc<Mutex<Vec<Box<dyn std::any::Any + Send + Sync>>>>,
}
impl LifecycleNode {
pub fn create<'a>(executor: &'a Executor, options: impl IntoNodeOptions<'a>) -> Result<Self> {
let node = Arc::new(executor.create_node(options)?);
Self::try_new(node)
}
pub fn try_new(node: Arc<Node>) -> Result<Self> {
let (completion_tx, completion_rx) = mpsc::channel();
let ln = Self {
node,
gate: Arc::new(ActivationGate::new()),
state: Arc::new(Mutex::new(LifecycleState::new())),
completion_tx,
completion_rx: Arc::new(Mutex::new(completion_rx)),
transition_event_pub: Arc::new(Mutex::new(None)),
#[cfg(feature = "bond")]
bond: Arc::new(Mutex::new(None)),
internals: Arc::new(Mutex::new(Vec::new())),
};
ln.enable_defaults()?;
Ok(ln)
}
pub fn from_node(node: Arc<Node>) -> Result<Self> {
Self::try_new(node)
}
pub fn try_with_gate(node: Arc<Node>, gate: Arc<ActivationGate>) -> Result<Self> {
let (completion_tx, completion_rx) = mpsc::channel();
let ln = Self {
node,
gate,
state: Arc::new(Mutex::new(LifecycleState::new())),
completion_tx,
completion_rx: Arc::new(Mutex::new(completion_rx)),
transition_event_pub: Arc::new(Mutex::new(None)),
#[cfg(feature = "bond")]
bond: Arc::new(Mutex::new(None)),
internals: Arc::new(Mutex::new(Vec::new())),
};
ln.enable_defaults()?;
Ok(ln)
}
pub fn node(&self) -> &Arc<Node> {
&self.node
}
pub fn node_arc(&self) -> Arc<Node> {
Arc::clone(&self.node)
}
pub fn name(&self) -> String {
self.node.name()
}
pub fn namespace(&self) -> String {
self.node.namespace()
}
pub fn create_service<'a, T, Args>(
&self, options: impl Into<ServiceOptions<'a>>, callback: impl IntoNodeServiceCallback<T, Args>,
) -> Result<Service<T>>
where
T: rclrs::ServiceIDL,
{
Ok(self.node.create_service::<T, Args>(options, callback)?)
}
pub fn create_subscription<'a, T, Args>(
&self, options: impl Into<SubscriptionOptions<'a>>, callback: impl IntoNodeSubscriptionCallback<T, Args>,
) -> Result<Subscription<T>>
where
T: rclrs::MessageIDL,
{
Ok(self.node.create_subscription::<T, Args>(options, callback)?)
}
pub fn create_client<'a, T>(&self, options: impl Into<ClientOptions<'a>>) -> Result<Client<T>>
where
T: rclrs::ServiceIDL,
{
Ok(self.node.create_client::<T>(options)?)
}
pub fn activate(&self) {
self.gate.activate();
}
pub fn deactivate(&self) {
self.gate.deactivate();
}
pub fn is_active(&self) -> bool {
self.gate.is_active()
}
pub fn create_publisher<T>(&self, topic: &str) -> Result<ManagedPublisher<T>>
where
T: rclrs::MessageIDL,
{
let pub_ = self.node.create_publisher::<T>(topic)?;
Ok(ManagedPublisher::new(pub_, Arc::clone(&self.gate)))
}
pub fn create_timer_repeating_gated<F>(&self, period: Duration, mut callback: F) -> Result<ManagedTimer>
where
F: FnMut() + Send + 'static,
{
let gate = Arc::clone(&self.gate);
let timer = self.node.create_timer_repeating(TimerOptions::new(period), move || {
if gate.is_active() {
callback();
}
})?;
Ok(ManagedTimer::new(timer))
}
}
impl LifecycleNode {
pub(crate) fn enable_defaults(&self) -> Result<()> {
self.enable_transition_event_publisher()?;
self.enable_get_state_service()?;
self.enable_change_state_service()?;
self.enable_completion_pump()?;
self.enable_get_available_states_service()?;
self.enable_get_available_transitions_service()?;
#[cfg(feature = "transition_graph")]
self.enable_get_transition_graph_service()?;
self.enable_bond()?;
Ok(())
}
pub(crate) fn enable_transition_event_publisher(&self) -> Result<()> {
let topic = format!("/{}/transition_event", self.node.name());
let pub_ = Arc::new(self.node.create_publisher::<TransitionEvent>(&topic)?);
*self.transition_event_pub.lock().expect("transition_event_pub mutex poisoned") = Some(Arc::clone(&pub_));
self.keep_internal(pub_);
Ok(())
}
pub(crate) fn enable_get_state_service(&self) -> Result<()> {
let service_name = format!("/{}/get_state", self.node.name());
let state = Arc::clone(&self.state);
let svc = self.node.create_service::<GetState, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetState_Request| {
let current = state.lock().expect("state mutex poisoned").state;
rosrustext_msgs::lifecycle_msgs::srv::GetState_Response { current_state: utils::ros_state_msg(current) }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_change_state_service(&self) -> Result<()> {
let service_name = format!("/{}/change_state", self.node.name());
let state = Arc::clone(&self.state);
let gate = Arc::clone(&self.gate);
let tev_pub = Arc::clone(&self.transition_event_pub);
#[cfg(feature = "bond")]
let bond = Arc::clone(&self.bond);
let completion_tx = self.completion_tx.clone();
let svc = self.node.create_service::<ChangeState, _>(
&service_name,
move |req: rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Request| {
let transition_id = req.transition.id;
let delay_ms = utils::change_state_delay_ms();
let mut state_guard = state.lock().expect("state mutex poisoned");
if state_guard.in_flight.is_some() {
return rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: false };
}
let start = state_guard.state;
let spec = match utils::transition_spec_for_ros_id(start, transition_id) {
Some(spec) => spec,
None => return rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: false },
};
let intermediate = match begin(start, spec.transition) {
Ok(intermediate) => intermediate,
Err(_) => {
return rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: false };
}
};
let in_flight = TransitionInFlight {
start,
intermediate,
transition: spec.transition,
transition_id,
label: spec.label,
};
state_guard.in_flight = Some(in_flight);
drop(state_guard);
if delay_ms == 0 {
let outcome = Self::compute_outcome(in_flight);
Self::apply_outcome(
&state,
&gate,
&tev_pub,
#[cfg(feature = "bond")]
&bond,
outcome,
);
} else {
let completion_tx = completion_tx.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(delay_ms));
let outcome = Self::compute_outcome(in_flight);
let _ = completion_tx.send(outcome);
});
}
rosrustext_msgs::lifecycle_msgs::srv::ChangeState_Response { success: true }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_get_available_transitions_service(&self) -> Result<()> {
let service_name = format!("/{}/get_available_transitions", self.node.name());
let state = Arc::clone(&self.state);
let svc = self.node.create_service::<GetAvailableTransitions, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetAvailableTransitions_Request| {
let guard = state.lock().expect("state mutex poisoned");
let transitions: Vec<TransitionDescription> = if guard.in_flight.is_some() {
Vec::new()
} else {
utils::transition_entries_for_start(guard.state)
.into_iter()
.map(|entry| {
utils::transition_description(
entry.spec.start,
entry.goal,
entry.spec.transition_id,
entry.spec.label,
)
})
.collect()
};
rosrustext_msgs::lifecycle_msgs::srv::GetAvailableTransitions_Response { available_transitions: transitions }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_completion_pump(&self) -> Result<()> {
let state = Arc::clone(&self.state);
let gate = Arc::clone(&self.gate);
let tev_pub = Arc::clone(&self.transition_event_pub);
let completion_rx = Arc::clone(&self.completion_rx);
#[cfg(feature = "bond")]
let bond = Arc::clone(&self.bond);
let timer = self.node.create_timer_repeating(TimerOptions::new(Duration::from_millis(10)), move || {
let outcomes: Vec<TransitionOutcome> = {
let rx = completion_rx.lock().expect("completion_rx mutex poisoned");
let mut pending = Vec::new();
loop {
match rx.try_recv() {
Ok(outcome) => pending.push(outcome),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
pending
};
for outcome in outcomes {
Self::apply_outcome(
&state,
&gate,
&tev_pub,
#[cfg(feature = "bond")]
&bond,
outcome,
);
}
})?;
self.keep_internal(timer);
Ok(())
}
#[cfg(feature = "transition_graph")]
pub(crate) fn enable_get_transition_graph_service(&self) -> Result<()> {
let service_name = format!("/{}/get_transition_graph", self.node.name());
let svc = self.node.create_service::<GetTransitionGraph, _>(
&service_name,
move |_req: rosrustext_msgs::rosrustext_interfaces::srv::GetTransitionGraph_Request| {
let states = vec![
utils::ros_state_msg(State::Unconfigured),
utils::ros_state_msg(State::Inactive),
utils::ros_state_msg(State::Active),
utils::ros_state_msg(State::Finalized),
];
let mut transitions = Vec::new();
for start in [State::Unconfigured, State::Inactive, State::Active, State::Finalized] {
transitions.extend(utils::transition_entries_for_start(start).into_iter().map(|entry| {
utils::transition_description(
entry.spec.start,
entry.goal,
entry.spec.transition_id,
entry.spec.label,
)
}));
}
rosrustext_msgs::rosrustext_interfaces::srv::GetTransitionGraph_Response { states, transitions }
},
)?;
self.keep_internal(svc);
Ok(())
}
pub(crate) fn enable_get_available_states_service(&self) -> Result<()> {
let service_name = format!("/{}/get_available_states", self.node.name());
let svc = self.node.create_service::<GetAvailableStates, _>(
&service_name,
move |_req: rosrustext_msgs::lifecycle_msgs::srv::GetAvailableStates_Request| {
let states = vec![
utils::ros_state_msg(State::Unconfigured),
utils::ros_state_msg(State::Inactive),
utils::ros_state_msg(State::Active),
utils::ros_state_msg(State::Finalized),
];
rosrustext_msgs::lifecycle_msgs::srv::GetAvailableStates_Response { available_states: states }
},
)?;
self.keep_internal(svc);
Ok(())
}
#[cfg(feature = "bond")]
pub(crate) fn enable_bond(&self) -> Result<()> {
let heartbeat_period = Duration::from_secs(1);
let heartbeat_timeout = Duration::from_secs(4);
let agent = Arc::new(BondAgent::new(Arc::clone(&self.node), heartbeat_period, heartbeat_timeout)?);
agent.set_active(false);
*self.bond.lock().expect("bond mutex poisoned") = Some(Arc::clone(&agent));
self.keep_internal(agent);
Ok(())
}
#[cfg(not(feature = "bond"))]
pub(crate) fn enable_bond(&self) -> Result<()> {
Ok(())
}
pub(crate) fn keep_internal<T>(&self, handle: T)
where
T: Send + Sync + 'static,
{
self.internals.lock().expect("LifecycleNode internals poisoned").push(Box::new(handle));
}
fn apply_outcome(
state: &Arc<Mutex<LifecycleState>>, gate: &Arc<ActivationGate>,
tev_pub: &Arc<Mutex<Option<Arc<rclrs::Publisher<TransitionEvent>>>>>,
#[cfg(feature = "bond")] bond: &Arc<Mutex<Option<Arc<BondAgent>>>>, outcome: TransitionOutcome,
) {
let mut state_guard = state.lock().expect("state mutex poisoned");
let in_flight = match state_guard.in_flight {
Some(in_flight) => in_flight,
None => return,
};
if in_flight.transition_id != outcome.transition_id
|| in_flight.start != outcome.start
|| in_flight.transition != outcome.transition
{
return;
}
state_guard.state = outcome.goal;
state_guard.in_flight = None;
drop(state_guard);
match outcome.goal {
State::Active => gate.activate(),
_ => gate.deactivate(),
}
#[cfg(feature = "bond")]
if let Some(agent) = bond.lock().expect("bond mutex poisoned").as_ref() {
agent.set_active(outcome.goal == State::Active);
}
if let Some(pub_) = tev_pub.lock().expect("transition_event_pub mutex poisoned").as_ref() {
let evt = utils::make_transition_event(outcome.start, outcome.goal, outcome.transition_id, outcome.label);
let _ = pub_.publish(evt);
}
}
fn compute_outcome(in_flight: TransitionInFlight) -> TransitionOutcome {
let result = utils::transition_result_for(in_flight.transition);
let on_error =
if result == CallbackResult::Error { Some(utils::on_error_result_for(in_flight.transition)) } else { None };
let goal = finish_with_error_handling(in_flight.intermediate, in_flight.transition, result, on_error)
.unwrap_or(in_flight.start);
TransitionOutcome {
start: in_flight.start,
goal,
transition: in_flight.transition,
transition_id: in_flight.transition_id,
label: in_flight.label,
}
}
}
#[derive(Debug, Clone, Copy)]
struct TransitionInFlight {
start: State,
intermediate: State,
transition: Transition,
transition_id: u8,
label: &'static str,
}
#[derive(Debug, Clone, Copy)]
struct TransitionOutcome {
start: State,
goal: State,
transition: Transition,
transition_id: u8,
label: &'static str,
}
#[derive(Debug)]
struct LifecycleState {
state: State,
in_flight: Option<TransitionInFlight>,
}
impl LifecycleState {
fn new() -> Self {
Self { state: State::Unconfigured, in_flight: None }
}
}