Skip to main content

embedded_rpc/
lib.rs

1#![no_std]
2#![doc = include_str!("../README.md")]
3
4use core::cell::RefCell;
5use core::future::poll_fn;
6use core::task::Waker;
7
8use embassy_sync::blocking_mutex::Mutex;
9use embassy_sync::blocking_mutex::raw::RawMutex;
10
11/// Error returned to the client when the server drops [`ServedRequest`] without calling
12/// [`ServedRequest::respond`].
13///
14/// This is not a transport error: it means the server-side handler gave up without sending a
15/// successful response (for example by returning early or unwinding after `serve`).
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct RequestDroppedError;
18
19/// In-memory request/response synchronization for async tasks.
20///
21/// This type is **not** a wire protocol. It connects a client async task that calls
22/// [`RpcService::request`] with a server task that calls [`RpcService::serve`], using a
23/// [`embassy_sync::blocking_mutex::Mutex`] and async wakers. It is suitable for `no_std` use when
24/// paired with a mutex implementation appropriate for your platform ([`RawMutex`]).
25///
26/// # Concurrency
27///
28/// - **Single in-flight RPC:** Internal state holds at most one queued request and one response
29///   slot. Design for one active request/response pair at a time.
30/// - **Multiple clients:** Several tasks may call [`request`](Self::request); additional callers
31///   wait until the current RPC completes (including delivery of [`RequestDroppedError`]).
32/// - **Client cancellation:** If the async task awaiting [`Self::request`] is dropped (executor
33///   cancellation), the service releases the client slot once the in-flight RPC is fully finished
34///   (including after the server responds or drops [`ServedRequest`]). Dropping the client future
35///   after the request was queued but before the server takes it removes the queued request and
36///   frees the slot immediately.
37///
38/// # Type parameters
39///
40/// - **`M`:** Mutex raw type ([`RawMutex`]), e.g. `CriticalSectionRawMutex` on many MCUs.
41/// - **`Req` / `Resp`:** Your message types. `Req` may borrow from the client for the duration of
42///   the call; then the [`RpcService`] must not outlive those borrows (often modeled with a
43///   stack-scoped service—see the crate README).
44pub struct RpcService<M, Req, Resp>
45where
46    M: RawMutex,
47{
48    state: Mutex<M, RefCell<State<Req, Resp>>>,
49}
50
51struct State<Req, Resp> {
52    client_busy: bool,
53    /// Client [`RpcService::request`] future was dropped while waiting; server must finish without
54    /// delivering a response to that client.
55    client_abandoned: bool,
56    waiting_client_slot_waker: Option<Waker>,
57    waiting_client_response_waker: Option<Waker>,
58    waiting_server_waker: Option<Waker>,
59    queued_request: Option<Req>,
60    queued_response: Option<Result<Resp, RequestDroppedError>>,
61}
62
63impl<Req, Resp> State<Req, Resp> {
64    const fn new() -> Self {
65        Self {
66            client_busy: false,
67            client_abandoned: false,
68            waiting_client_slot_waker: None,
69            waiting_client_response_waker: None,
70            waiting_server_waker: None,
71            queued_request: None,
72            queued_response: None,
73        }
74    }
75}
76
77/// When dropped without [`Self::defuse`], cleans up after a cancelled [`RpcService::request`].
78struct InFlightGuard<'a, M, Req, Resp>
79where
80    M: RawMutex,
81{
82    service: &'a RpcService<M, Req, Resp>,
83    disarm: bool,
84}
85
86impl<'a, M, Req, Resp> InFlightGuard<'a, M, Req, Resp>
87where
88    M: RawMutex,
89{
90    fn new(service: &'a RpcService<M, Req, Resp>) -> Self {
91        Self {
92            service,
93            disarm: false,
94        }
95    }
96
97    fn defuse(mut self) {
98        self.disarm = true;
99        core::mem::forget(self);
100    }
101}
102
103impl<'a, M, Req, Resp> Drop for InFlightGuard<'a, M, Req, Resp>
104where
105    M: RawMutex,
106{
107    fn drop(&mut self) {
108        if self.disarm {
109            return;
110        }
111
112        self.service.state.lock(|state| {
113            let mut s = state.borrow_mut();
114            if let Some(req) = s.queued_request.take() {
115                drop(req);
116                s.client_abandoned = false;
117                s.client_busy = false;
118                if let Some(w) = s.waiting_client_slot_waker.take() {
119                    w.wake();
120                }
121                return;
122            }
123            if let Some(resp) = s.queued_response.take() {
124                drop(resp);
125                s.client_abandoned = false;
126                s.client_busy = false;
127                if let Some(w) = s.waiting_client_slot_waker.take() {
128                    w.wake();
129                }
130                return;
131            }
132            s.client_abandoned = true;
133        });
134    }
135}
136
137impl<M, Req, Resp> RpcService<M, Req, Resp>
138where
139    M: RawMutex,
140{
141    /// Creates an empty service. Safe to call in `const` contexts (e.g. `static` initializer).
142    pub const fn new() -> Self {
143        Self {
144            state: Mutex::new(RefCell::new(State::new())),
145        }
146    }
147
148    /// Sends `req` and waits until the server responds or drops the [`ServedRequest`].
149    ///
150    /// If another client is already in an RPC, this call waits until that RPC fully completes
151    /// (including waking the next waiter for the client slot) before sending `req`.
152    ///
153    /// # Errors
154    ///
155    /// Returns [`Err(RequestDroppedError)`](RequestDroppedError) if the server drops
156    /// [`ServedRequest`] without calling [`ServedRequest::respond`].
157    ///
158    /// # Cancellation
159    ///
160    /// If this future is dropped while waiting for a response, the queued request is removed if
161    /// the server has not taken it yet; otherwise the server continues with [`ServedRequest`] and
162    /// the response is discarded when the server completes. The client slot becomes available again
163    /// after that completion (or immediately when the queued request is dropped).
164    pub async fn request(&self, req: Req) -> Result<Resp, RequestDroppedError> {
165        self.acquire_client_slot().await;
166
167        self.state.lock(|state| {
168            let mut state = state.borrow_mut();
169            state.queued_request = Some(req);
170            if let Some(waker) = state.waiting_server_waker.take() {
171                waker.wake();
172            }
173        });
174
175        let in_flight = InFlightGuard::new(self);
176
177        let result = poll_fn(|cx| {
178            self.state.lock(|state| {
179                let mut state = state.borrow_mut();
180                if let Some(resp) = state.queued_response.take() {
181                    state.client_busy = false;
182                    if let Some(waker) = state.waiting_client_slot_waker.take() {
183                        waker.wake();
184                    }
185                    return core::task::Poll::Ready(resp);
186                }
187
188                state.waiting_client_response_waker = Some(cx.waker().clone());
189                core::task::Poll::Pending
190            })
191        })
192        .await;
193
194        in_flight.defuse();
195        result
196    }
197
198    /// Waits until a client submits a request via [`Self::request`], then returns the request
199    /// value together with a [`ServedRequest`] used to complete or abandon the RPC.
200    ///
201    /// The server must eventually call [`ServedRequest::respond`] on the handle or drop it;
202    /// dropping notifies the client with [`RequestDroppedError`].
203    pub async fn serve(&self) -> (Req, ServedRequest<'_, M, Req, Resp>) {
204        let req = poll_fn(|cx| {
205            self.state.lock(|state| {
206                let mut state = state.borrow_mut();
207                if let Some(req) = state.queued_request.take() {
208                    return core::task::Poll::Ready(req);
209                }
210
211                state.waiting_server_waker = Some(cx.waker().clone());
212                core::task::Poll::Pending
213            })
214        })
215        .await;
216
217        let served = ServedRequest {
218            state: &self.state,
219            completed: false,
220        };
221        (req, served)
222    }
223
224    async fn acquire_client_slot(&self) {
225        poll_fn(|cx| {
226            self.state.lock(|state| {
227                let mut state = state.borrow_mut();
228                if !state.client_busy {
229                    state.client_busy = true;
230                    return core::task::Poll::Ready(());
231                }
232
233                state.waiting_client_slot_waker = Some(cx.waker().clone());
234                core::task::Poll::Pending
235            })
236        })
237        .await;
238    }
239}
240
241impl<M, Req, Resp> Default for RpcService<M, Req, Resp>
242where
243    M: RawMutex,
244{
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250/// Server-side completion handle for one request taken from [`RpcService::serve`].
251///
252/// The request value itself is returned separately from [`RpcService::serve`]; this type only
253/// carries [`Self::respond`] and drop behavior.
254///
255/// # Completion
256///
257/// - Call [`Self::respond`] with a successful `Resp` to complete the RPC.
258/// - Drop this value without calling [`Self::respond`] to complete the RPC with
259///   [`RequestDroppedError`] on the client.
260pub struct ServedRequest<'a, M, Req, Resp>
261where
262    M: RawMutex,
263{
264    state: &'a Mutex<M, RefCell<State<Req, Resp>>>,
265    completed: bool,
266}
267
268impl<'a, M, Req, Resp> ServedRequest<'a, M, Req, Resp>
269where
270    M: RawMutex,
271{
272    /// Completes the RPC with `resp` and consumes `self`.
273    ///
274    /// The waiting client receives `Ok(resp)`.
275    pub fn respond(mut self, resp: Resp) {
276        self.state.lock(|state| {
277            let mut state = state.borrow_mut();
278            if state.client_abandoned {
279                state.client_abandoned = false;
280                state.client_busy = false;
281                if let Some(waker) = state.waiting_client_slot_waker.take() {
282                    waker.wake();
283                }
284            } else {
285                state.queued_response = Some(Ok(resp));
286                if let Some(waker) = state.waiting_client_response_waker.take() {
287                    waker.wake();
288                }
289            }
290        });
291        self.completed = true;
292    }
293}
294
295impl<'a, M, Req, Resp> Drop for ServedRequest<'a, M, Req, Resp>
296where
297    M: RawMutex,
298{
299    fn drop(&mut self) {
300        if !self.completed {
301            self.state.lock(|state| {
302                let mut state = state.borrow_mut();
303                if state.client_abandoned {
304                    state.client_abandoned = false;
305                    state.client_busy = false;
306                    if let Some(waker) = state.waiting_client_slot_waker.take() {
307                        waker.wake();
308                    }
309                } else {
310                    state.queued_response = Some(Err(RequestDroppedError));
311                    if let Some(waker) = state.waiting_client_response_waker.take() {
312                        waker.wake();
313                    }
314                }
315            });
316        }
317    }
318}