embedded_rpc/lib.rs
1#![no_std]
2#![doc = include_str!("../README.md")]
3
4use core::cell::RefCell;
5use core::future::poll_fn;
6use core::task::Waker;
7
8use embassy_sync::blocking_mutex::Mutex;
9use embassy_sync::blocking_mutex::raw::RawMutex;
10
11/// Error returned to the client when the server drops [`ServedRequest`] without calling
12/// [`ServedRequest::respond`].
13///
14/// This is not a transport error: it means the server-side handler gave up without sending a
15/// successful response (for example by returning early or unwinding after `serve`).
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct RequestDroppedError;
18
19/// In-memory request/response synchronization for async tasks.
20///
21/// This type is **not** a wire protocol. It connects a client async task that calls
22/// [`RpcService::request`] with a server task that calls [`RpcService::serve`], using a
23/// [`embassy_sync::blocking_mutex::Mutex`] and async wakers. It is suitable for `no_std` use when
24/// paired with a mutex implementation appropriate for your platform ([`RawMutex`]).
25///
26/// # Concurrency
27///
28/// - **Single in-flight RPC:** Internal state holds at most one queued request and one response
29/// slot. Design for one active request/response pair at a time.
30/// - **Multiple clients:** Several tasks may call [`request`](Self::request); additional callers
31/// wait until the current RPC completes (including delivery of [`RequestDroppedError`]).
32/// - **Client cancellation:** If the async task awaiting [`Self::request`] is dropped (executor
33/// cancellation), the service releases the client slot once the in-flight RPC is fully finished
34/// (including after the server responds or drops [`ServedRequest`]). Dropping the client future
35/// after the request was queued but before the server takes it removes the queued request and
36/// frees the slot immediately.
37///
38/// # Type parameters
39///
40/// - **`M`:** Mutex raw type ([`RawMutex`]), e.g. `CriticalSectionRawMutex` on many MCUs.
41/// - **`Req` / `Resp`:** Your message types. `Req` may borrow from the client for the duration of
42/// the call; then the [`RpcService`] must not outlive those borrows (often modeled with a
43/// stack-scoped service—see the crate README).
44pub struct RpcService<M, Req, Resp>
45where
46 M: RawMutex,
47{
48 state: Mutex<M, RefCell<State<Req, Resp>>>,
49}
50
51struct State<Req, Resp> {
52 client_busy: bool,
53 /// Client [`RpcService::request`] future was dropped while waiting; server must finish without
54 /// delivering a response to that client.
55 client_abandoned: bool,
56 waiting_client_slot_waker: Option<Waker>,
57 waiting_client_response_waker: Option<Waker>,
58 waiting_server_waker: Option<Waker>,
59 queued_request: Option<Req>,
60 queued_response: Option<Result<Resp, RequestDroppedError>>,
61}
62
63impl<Req, Resp> State<Req, Resp> {
64 const fn new() -> Self {
65 Self {
66 client_busy: false,
67 client_abandoned: false,
68 waiting_client_slot_waker: None,
69 waiting_client_response_waker: None,
70 waiting_server_waker: None,
71 queued_request: None,
72 queued_response: None,
73 }
74 }
75}
76
77/// When dropped without [`Self::defuse`], cleans up after a cancelled [`RpcService::request`].
78struct InFlightGuard<'a, M, Req, Resp>
79where
80 M: RawMutex,
81{
82 service: &'a RpcService<M, Req, Resp>,
83 disarm: bool,
84}
85
86impl<'a, M, Req, Resp> InFlightGuard<'a, M, Req, Resp>
87where
88 M: RawMutex,
89{
90 fn new(service: &'a RpcService<M, Req, Resp>) -> Self {
91 Self {
92 service,
93 disarm: false,
94 }
95 }
96
97 fn defuse(mut self) {
98 self.disarm = true;
99 core::mem::forget(self);
100 }
101}
102
103impl<'a, M, Req, Resp> Drop for InFlightGuard<'a, M, Req, Resp>
104where
105 M: RawMutex,
106{
107 fn drop(&mut self) {
108 if self.disarm {
109 return;
110 }
111
112 self.service.state.lock(|state| {
113 let mut s = state.borrow_mut();
114 if let Some(req) = s.queued_request.take() {
115 drop(req);
116 s.client_abandoned = false;
117 s.client_busy = false;
118 if let Some(w) = s.waiting_client_slot_waker.take() {
119 w.wake();
120 }
121 return;
122 }
123 if let Some(resp) = s.queued_response.take() {
124 drop(resp);
125 s.client_abandoned = false;
126 s.client_busy = false;
127 if let Some(w) = s.waiting_client_slot_waker.take() {
128 w.wake();
129 }
130 return;
131 }
132 s.client_abandoned = true;
133 });
134 }
135}
136
137impl<M, Req, Resp> RpcService<M, Req, Resp>
138where
139 M: RawMutex,
140{
141 /// Creates an empty service. Safe to call in `const` contexts (e.g. `static` initializer).
142 pub const fn new() -> Self {
143 Self {
144 state: Mutex::new(RefCell::new(State::new())),
145 }
146 }
147
148 /// Sends `req` and waits until the server responds or drops the [`ServedRequest`].
149 ///
150 /// If another client is already in an RPC, this call waits until that RPC fully completes
151 /// (including waking the next waiter for the client slot) before sending `req`.
152 ///
153 /// # Errors
154 ///
155 /// Returns [`Err(RequestDroppedError)`](RequestDroppedError) if the server drops
156 /// [`ServedRequest`] without calling [`ServedRequest::respond`].
157 ///
158 /// # Cancellation
159 ///
160 /// If this future is dropped while waiting for a response, the queued request is removed if
161 /// the server has not taken it yet; otherwise the server continues with [`ServedRequest`] and
162 /// the response is discarded when the server completes. The client slot becomes available again
163 /// after that completion (or immediately when the queued request is dropped).
164 pub async fn request(&self, req: Req) -> Result<Resp, RequestDroppedError> {
165 self.acquire_client_slot().await;
166
167 self.state.lock(|state| {
168 let mut state = state.borrow_mut();
169 state.queued_request = Some(req);
170 if let Some(waker) = state.waiting_server_waker.take() {
171 waker.wake();
172 }
173 });
174
175 let in_flight = InFlightGuard::new(self);
176
177 let result = poll_fn(|cx| {
178 self.state.lock(|state| {
179 let mut state = state.borrow_mut();
180 if let Some(resp) = state.queued_response.take() {
181 state.client_busy = false;
182 if let Some(waker) = state.waiting_client_slot_waker.take() {
183 waker.wake();
184 }
185 return core::task::Poll::Ready(resp);
186 }
187
188 state.waiting_client_response_waker = Some(cx.waker().clone());
189 core::task::Poll::Pending
190 })
191 })
192 .await;
193
194 in_flight.defuse();
195 result
196 }
197
198 /// Waits until a client submits a request via [`Self::request`], then returns the request
199 /// value together with a [`ServedRequest`] used to complete or abandon the RPC.
200 ///
201 /// The server must eventually call [`ServedRequest::respond`] on the handle or drop it;
202 /// dropping notifies the client with [`RequestDroppedError`].
203 pub async fn serve(&self) -> (Req, ServedRequest<'_, M, Req, Resp>) {
204 let req = poll_fn(|cx| {
205 self.state.lock(|state| {
206 let mut state = state.borrow_mut();
207 if let Some(req) = state.queued_request.take() {
208 return core::task::Poll::Ready(req);
209 }
210
211 state.waiting_server_waker = Some(cx.waker().clone());
212 core::task::Poll::Pending
213 })
214 })
215 .await;
216
217 let served = ServedRequest {
218 state: &self.state,
219 completed: false,
220 };
221 (req, served)
222 }
223
224 async fn acquire_client_slot(&self) {
225 poll_fn(|cx| {
226 self.state.lock(|state| {
227 let mut state = state.borrow_mut();
228 if !state.client_busy {
229 state.client_busy = true;
230 return core::task::Poll::Ready(());
231 }
232
233 state.waiting_client_slot_waker = Some(cx.waker().clone());
234 core::task::Poll::Pending
235 })
236 })
237 .await;
238 }
239}
240
241impl<M, Req, Resp> Default for RpcService<M, Req, Resp>
242where
243 M: RawMutex,
244{
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250/// Server-side completion handle for one request taken from [`RpcService::serve`].
251///
252/// The request value itself is returned separately from [`RpcService::serve`]; this type only
253/// carries [`Self::respond`] and drop behavior.
254///
255/// # Completion
256///
257/// - Call [`Self::respond`] with a successful `Resp` to complete the RPC.
258/// - Drop this value without calling [`Self::respond`] to complete the RPC with
259/// [`RequestDroppedError`] on the client.
260pub struct ServedRequest<'a, M, Req, Resp>
261where
262 M: RawMutex,
263{
264 state: &'a Mutex<M, RefCell<State<Req, Resp>>>,
265 completed: bool,
266}
267
268impl<'a, M, Req, Resp> ServedRequest<'a, M, Req, Resp>
269where
270 M: RawMutex,
271{
272 /// Completes the RPC with `resp` and consumes `self`.
273 ///
274 /// The waiting client receives `Ok(resp)`.
275 pub fn respond(mut self, resp: Resp) {
276 self.state.lock(|state| {
277 let mut state = state.borrow_mut();
278 if state.client_abandoned {
279 state.client_abandoned = false;
280 state.client_busy = false;
281 if let Some(waker) = state.waiting_client_slot_waker.take() {
282 waker.wake();
283 }
284 } else {
285 state.queued_response = Some(Ok(resp));
286 if let Some(waker) = state.waiting_client_response_waker.take() {
287 waker.wake();
288 }
289 }
290 });
291 self.completed = true;
292 }
293}
294
295impl<'a, M, Req, Resp> Drop for ServedRequest<'a, M, Req, Resp>
296where
297 M: RawMutex,
298{
299 fn drop(&mut self) {
300 if !self.completed {
301 self.state.lock(|state| {
302 let mut state = state.borrow_mut();
303 if state.client_abandoned {
304 state.client_abandoned = false;
305 state.client_busy = false;
306 if let Some(waker) = state.waiting_client_slot_waker.take() {
307 waker.wake();
308 }
309 } else {
310 state.queued_response = Some(Err(RequestDroppedError));
311 if let Some(waker) = state.waiting_client_response_waker.take() {
312 waker.wake();
313 }
314 }
315 });
316 }
317 }
318}