Skip to main content

action_core/
client_server.rs

1use std::collections::VecDeque;
2
3use core_types::{ActionGoalId, RtError, Timestamp};
4
5use crate::message::{
6    ActionFeedback, ActionResult, ActionSchema, ActionSessionHealth, GoalAck, GoalStatus,
7};
8
9mod client;
10mod server;
11
12pub use self::client::BasicActionClient;
13pub use self::server::BasicActionServer;
14
15// ─── Traits ───────────────────────────────────────────────────────────────────
16
17/// Client-side handle: sends goals, polls feedback, and retrieves the final result.
18///
19/// # Comparison with Service and Mission
20/// | Feature        | Service       | Action                        | Mission             |
21/// |----------------|---------------|-------------------------------|---------------------|
22/// | Direction      | req → res     | goal → feedback* → result     | long-lived duplex   |
23/// | Streaming      | ✗             | ✓ (best-effort)               | ✓ (reliable)        |
24/// | Cancel         | ✗             | ✓                             | ✓ (Pause/Cancel)    |
25/// | Reconnect      | ✗             | ✗                             | ✓ (Reconcile)       |
26/// | State-machine  | none          | simple (6 states)             | complex (8 states)  |
27pub trait ActionClient<G, F, R>
28where
29    G: Send + 'static,
30    F: Send + 'static,
31    R: Send + 'static,
32{
33    fn action_name(&self) -> &str;
34    fn schema(&self) -> &ActionSchema;
35
36    /// Send a goal and immediately receive the server's accept/reject acknowledgement.
37    ///
38    /// The returned `ActionGoalId` is used for subsequent `poll_feedback`,
39    /// `poll_result`, and `cancel` calls. If the server rejects the goal
40    /// (`GoalAck::accepted == false`), the ID remains valid but no feedback
41    /// or result will ever be produced for it.
42    fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError>;
43
44    /// Non-blocking poll: dequeue the next feedback item for the given goal, or `None` if empty.
45    fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>>;
46
47    /// Non-blocking poll: take the final result for the given goal, or `None` if not ready.
48    ///
49    /// Once `Some` is returned the goal's lifecycle is over; the ID need not be tracked further.
50    fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>>;
51
52    /// Request cancellation of a goal.
53    ///
54    /// Cancellation is a *request*, not a command; the server may refuse (e.g. the goal
55    /// has entered a critical phase). On success, `poll_result` will eventually return
56    /// `GoalStatus::Canceled`.
57    fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
58
59    /// Query the current status of a goal without consuming any feedback or result.
60    fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus>;
61
62    /// Evaluate heartbeat timeouts for executing goals.
63    ///
64    /// Returns the number of goals that were transitioned into timeout failure.
65    fn tick_timeouts(&mut self, _now: Timestamp) -> usize {
66        0
67    }
68
69    /// Return health state of a specific goal.
70    fn goal_health(&self, _goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
71        None
72    }
73
74    /// Return health states for all known goals.
75    fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
76        Vec::new()
77    }
78
79    fn close(&mut self) -> Result<(), RtError>;
80}
81
82/// Server-side handle: receives goals, publishes feedback, and sends the final result.
83pub trait ActionServer<G, F, R>
84where
85    G: Send + 'static,
86    F: Send + 'static,
87    R: Send + 'static,
88{
89    fn action_name(&self) -> &str;
90    fn schema(&self) -> &ActionSchema;
91
92    /// Receive the next pending goal (FIFO).
93    ///
94    /// After returning `Some`, the server **must** call either `accept_goal` or
95    /// `reject_goal`; otherwise the client will wait indefinitely for a `GoalAck`.
96    fn recv_goal(&mut self) -> Result<Option<(ActionGoalId, G)>, RtError>;
97
98    /// Accept a goal; sends `GoalAck { accepted: true }` to the client.
99    fn accept_goal(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
100
101    /// Reject a goal; sends `GoalAck { accepted: false, reason }` to the client.
102    fn reject_goal(
103        &mut self,
104        goal_id: ActionGoalId,
105        reason: impl Into<String>,
106    ) -> Result<(), RtError>;
107
108    /// Push a feedback update to the client (may be called multiple times).
109    fn publish_feedback(&mut self, goal_id: ActionGoalId, feedback: F) -> Result<(), RtError>;
110
111    /// Publish a keepalive heartbeat for a running goal.
112    fn heartbeat(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
113
114    /// Mark the goal as succeeded and deliver the final result.
115    fn succeed(&mut self, goal_id: ActionGoalId, result: R) -> Result<(), RtError>;
116
117    /// Mark the goal as failed and deliver the final result with an error reason.
118    fn fail(&mut self, goal_id: ActionGoalId, reason: impl Into<String>) -> Result<(), RtError>;
119
120    /// Poll for a pending cancel request from the client.
121    fn poll_cancel_request(&mut self) -> Option<ActionGoalId>;
122
123    /// Confirm that cancellation is complete; delivers a `GoalStatus::Canceled` result.
124    fn confirm_cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
125
126    fn close(&mut self) -> Result<(), RtError>;
127}
128
129// ─── Shared in-process channel ────────────────────────────────────────────────
130
131/// In-memory channel shared between a client and a server (same-process semantics).
132///
133/// Analogous to `middleware_core::ServiceChannel`: both sides hold an `Arc` to
134/// the same instance; individual queues are protected by `Mutex`.
135pub struct ActionChannel<G, F, R> {
136    /// Client → Server: pending goal queue.
137    pub goals: std::sync::Mutex<VecDeque<(ActionGoalId, G)>>,
138    /// Server → Client: per-goal acknowledgement.
139    pub acks: std::sync::Mutex<std::collections::HashMap<u64, GoalAck>>,
140    /// Server → Client: feedback queues keyed by goal id.
141    pub feedbacks: std::sync::Mutex<std::collections::HashMap<u64, VecDeque<ActionFeedback<F>>>>,
142    /// Server → Client: per-goal final result.
143    pub results: std::sync::Mutex<std::collections::HashMap<u64, ActionResult<R>>>,
144    /// Client → Server: cancel request queue.
145    pub cancels: std::sync::Mutex<VecDeque<ActionGoalId>>,
146    /// Current status per goal; readable by both client and server.
147    pub statuses: std::sync::Mutex<std::collections::HashMap<u64, GoalStatus>>,
148    /// Last heartbeat timestamp per active goal.
149    pub heartbeats: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
150    /// Last feedback timestamp per goal.
151    pub feedback_timestamps: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
152    /// Last terminal result timestamp per goal.
153    pub result_timestamps: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
154}
155
156impl<G, F, R> ActionChannel<G, F, R> {
157    pub fn new() -> std::sync::Arc<Self> {
158        std::sync::Arc::new(Self {
159            goals: Default::default(),
160            acks: Default::default(),
161            feedbacks: Default::default(),
162            results: Default::default(),
163            cancels: Default::default(),
164            statuses: Default::default(),
165            heartbeats: Default::default(),
166            feedback_timestamps: Default::default(),
167            result_timestamps: Default::default(),
168        })
169    }
170}