action_core/client_server.rs
1use std::collections::VecDeque;
2
3use core_types::{ActionGoalId, RtError, Timestamp};
4
5use crate::message::{
6 ActionFeedback, ActionResult, ActionSchema, ActionSessionHealth, GoalAck, GoalStatus,
7};
8
9mod client;
10mod server;
11
12pub use self::client::BasicActionClient;
13pub use self::server::BasicActionServer;
14
15// ─── Traits ───────────────────────────────────────────────────────────────────
16
17/// Client-side handle: sends goals, polls feedback, and retrieves the final result.
18///
19/// # Comparison with Service and Mission
20/// | Feature | Service | Action | Mission |
21/// |----------------|---------------|-------------------------------|---------------------|
22/// | Direction | req → res | goal → feedback* → result | long-lived duplex |
23/// | Streaming | ✗ | ✓ (best-effort) | ✓ (reliable) |
24/// | Cancel | ✗ | ✓ | ✓ (Pause/Cancel) |
25/// | Reconnect | ✗ | ✗ | ✓ (Reconcile) |
26/// | State-machine | none | simple (6 states) | complex (8 states) |
27pub trait ActionClient<G, F, R>
28where
29 G: Send + 'static,
30 F: Send + 'static,
31 R: Send + 'static,
32{
33 fn action_name(&self) -> &str;
34 fn schema(&self) -> &ActionSchema;
35
36 /// Send a goal and immediately receive the server's accept/reject acknowledgement.
37 ///
38 /// The returned `ActionGoalId` is used for subsequent `poll_feedback`,
39 /// `poll_result`, and `cancel` calls. If the server rejects the goal
40 /// (`GoalAck::accepted == false`), the ID remains valid but no feedback
41 /// or result will ever be produced for it.
42 fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError>;
43
44 /// Non-blocking poll: dequeue the next feedback item for the given goal, or `None` if empty.
45 fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>>;
46
47 /// Non-blocking poll: take the final result for the given goal, or `None` if not ready.
48 ///
49 /// Once `Some` is returned the goal's lifecycle is over; the ID need not be tracked further.
50 fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>>;
51
52 /// Request cancellation of a goal.
53 ///
54 /// Cancellation is a *request*, not a command; the server may refuse (e.g. the goal
55 /// has entered a critical phase). On success, `poll_result` will eventually return
56 /// `GoalStatus::Canceled`.
57 fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
58
59 /// Query the current status of a goal without consuming any feedback or result.
60 fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus>;
61
62 /// Evaluate heartbeat timeouts for executing goals.
63 ///
64 /// Returns the number of goals that were transitioned into timeout failure.
65 fn tick_timeouts(&mut self, _now: Timestamp) -> usize {
66 0
67 }
68
69 /// Return health state of a specific goal.
70 fn goal_health(&self, _goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
71 None
72 }
73
74 /// Return health states for all known goals.
75 fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
76 Vec::new()
77 }
78
79 fn close(&mut self) -> Result<(), RtError>;
80}
81
82/// Server-side handle: receives goals, publishes feedback, and sends the final result.
83pub trait ActionServer<G, F, R>
84where
85 G: Send + 'static,
86 F: Send + 'static,
87 R: Send + 'static,
88{
89 fn action_name(&self) -> &str;
90 fn schema(&self) -> &ActionSchema;
91
92 /// Receive the next pending goal (FIFO).
93 ///
94 /// After returning `Some`, the server **must** call either `accept_goal` or
95 /// `reject_goal`; otherwise the client will wait indefinitely for a `GoalAck`.
96 fn recv_goal(&mut self) -> Result<Option<(ActionGoalId, G)>, RtError>;
97
98 /// Accept a goal; sends `GoalAck { accepted: true }` to the client.
99 fn accept_goal(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
100
101 /// Reject a goal; sends `GoalAck { accepted: false, reason }` to the client.
102 fn reject_goal(
103 &mut self,
104 goal_id: ActionGoalId,
105 reason: impl Into<String>,
106 ) -> Result<(), RtError>;
107
108 /// Push a feedback update to the client (may be called multiple times).
109 fn publish_feedback(&mut self, goal_id: ActionGoalId, feedback: F) -> Result<(), RtError>;
110
111 /// Publish a keepalive heartbeat for a running goal.
112 fn heartbeat(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
113
114 /// Mark the goal as succeeded and deliver the final result.
115 fn succeed(&mut self, goal_id: ActionGoalId, result: R) -> Result<(), RtError>;
116
117 /// Mark the goal as failed and deliver the final result with an error reason.
118 fn fail(&mut self, goal_id: ActionGoalId, reason: impl Into<String>) -> Result<(), RtError>;
119
120 /// Poll for a pending cancel request from the client.
121 fn poll_cancel_request(&mut self) -> Option<ActionGoalId>;
122
123 /// Confirm that cancellation is complete; delivers a `GoalStatus::Canceled` result.
124 fn confirm_cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError>;
125
126 fn close(&mut self) -> Result<(), RtError>;
127}
128
129// ─── Shared in-process channel ────────────────────────────────────────────────
130
131/// In-memory channel shared between a client and a server (same-process semantics).
132///
133/// Analogous to `middleware_core::ServiceChannel`: both sides hold an `Arc` to
134/// the same instance; individual queues are protected by `Mutex`.
135pub struct ActionChannel<G, F, R> {
136 /// Client → Server: pending goal queue.
137 pub goals: std::sync::Mutex<VecDeque<(ActionGoalId, G)>>,
138 /// Server → Client: per-goal acknowledgement.
139 pub acks: std::sync::Mutex<std::collections::HashMap<u64, GoalAck>>,
140 /// Server → Client: feedback queues keyed by goal id.
141 pub feedbacks: std::sync::Mutex<std::collections::HashMap<u64, VecDeque<ActionFeedback<F>>>>,
142 /// Server → Client: per-goal final result.
143 pub results: std::sync::Mutex<std::collections::HashMap<u64, ActionResult<R>>>,
144 /// Client → Server: cancel request queue.
145 pub cancels: std::sync::Mutex<VecDeque<ActionGoalId>>,
146 /// Current status per goal; readable by both client and server.
147 pub statuses: std::sync::Mutex<std::collections::HashMap<u64, GoalStatus>>,
148 /// Last heartbeat timestamp per active goal.
149 pub heartbeats: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
150 /// Last feedback timestamp per goal.
151 pub feedback_timestamps: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
152 /// Last terminal result timestamp per goal.
153 pub result_timestamps: std::sync::Mutex<std::collections::HashMap<u64, Timestamp>>,
154}
155
156impl<G, F, R> ActionChannel<G, F, R> {
157 pub fn new() -> std::sync::Arc<Self> {
158 std::sync::Arc::new(Self {
159 goals: Default::default(),
160 acks: Default::default(),
161 feedbacks: Default::default(),
162 results: Default::default(),
163 cancels: Default::default(),
164 statuses: Default::default(),
165 heartbeats: Default::default(),
166 feedback_timestamps: Default::default(),
167 result_timestamps: Default::default(),
168 })
169 }
170}