interchange/lib.rs
1#![cfg_attr(not(test), no_std)]
2//! Implement a somewhat convenient and somewhat efficient way to perform RPC
3//! in an embedded context.
4//!
5//! The approach is inspired by Go's channels, with the restriction that
6//! there is a clear separation into a requester and a responder.
7//!
8//! Requests may be canceled, which the responder should honour on a
9//! best-effort basis.
10//!
11//! ### Example use cases
12//! - USB device interrupt handler performs low-level protocol details, hands off
13//! commands from the host to higher-level logic running in the idle thread.
14//! This higher-level logic need only understand clearly typed commands, as
15//! moduled by variants of a given `Request` enum.
16//! - `trussed` crypto service, responding to crypto request from apps across
17//! TrustZone for Cortex-M secure/non-secure boundaries.
18//! - Request to blink a few lights and reply on button press
19//!
20//!
21//! ### Approach
22//! It is assumed that all requests fit in a single `Request` enum, and that
23//! all responses fit in single `Response` enum. The [`Channel`]() and [`Interchange`]() structs allocate a single buffer in which either Request or Response fit and handle synchronization
24//! Both structures have `const` constructors, allowing them to be statically allocated.
25//!
26//! An alternative approach would be to use two heapless Queues of length one
27//! each for response and requests. The advantage of our construction is to
28//! have only one static memory region in use.
29//!
30//! ```
31//! # #![cfg(not(loom))]
32//! # use interchange::{State, Interchange};
33//! #[derive(Clone, Debug, PartialEq)]
34//! pub enum Request {
35//! This(u8, u32),
36//! That(i64),
37//! }
38//!
39//! #[derive(Clone, Debug, PartialEq)]
40//! pub enum Response {
41//! Here(u8, u8, u8),
42//! There(i16),
43//! }
44//!
45//! static INTERCHANGE: Interchange<Request, Response, 1> = Interchange::new();
46//!
47//! let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
48//!
49//! assert!(rq.state() == State::Idle);
50//!
51//! // happy path: no cancelation
52//! let request = Request::This(1, 2);
53//! assert!(rq.request(request).is_ok());
54//!
55//! let request = rp.take_request().unwrap();
56//! println!("rp got request: {:?}", request);
57//!
58//! let response = Response::There(-1);
59//! assert!(!rp.is_canceled());
60//! assert!(rp.respond(response).is_ok());
61//!
62//! let response = rq.take_response().unwrap();
63//! println!("rq got response: {:?}", response);
64//!
65//! // early cancelation path
66//! assert!(rq.request(request).is_ok());
67//!
68//! let request = rq.cancel().unwrap().unwrap();
69//! println!("responder could cancel: {:?}", request);
70//!
71//! assert!(rp.take_request().is_none());
72//! assert!(State::Idle == rq.state());
73//!
74//! // late cancelation
75//! assert!(rq.request(request).is_ok());
76//! let request = rp.take_request().unwrap();
77//!
78//! println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
79//! assert!(rp.is_canceled());
80//! assert!(rp.respond(response).is_err());
81//! assert!(rp.acknowledge_cancel().is_ok());
82//! assert!(State::Idle == rq.state());
83//!
84//! // building into request buffer
85//! impl Default for Request {
86//! fn default() -> Self {
87//! Request::That(0)
88//! }
89//! }
90//!
91//! rq.with_request_mut(|r| *r = Request::This(1,2)).unwrap() ;
92//! assert!(rq.send_request().is_ok());
93//! let request = rp.take_request().unwrap();
94//! assert_eq!(request, Request::This(1, 2));
95//! println!("rp got request: {:?}", request);
96//!
97//! // building into response buffer
98//! impl Default for Response {
99//! fn default() -> Self {
100//! Response::There(1)
101//! }
102//! }
103//!
104//! rp.with_response_mut(|r| *r = Response::Here(3,2,1)).unwrap();
105//! assert!(rp.send_response().is_ok());
106//! let response = rq.take_response().unwrap();
107//! assert_eq!(response, Response::Here(3,2,1));
108//!
109//! ```
110
111use core::fmt::{self, Debug};
112use core::sync::atomic::Ordering;
113
114#[cfg(loom)]
115use loom::{
116 cell::UnsafeCell,
117 sync::atomic::{AtomicBool, AtomicU8, AtomicUsize},
118};
119
120#[cfg(not(loom))]
121use core::{
122 cell::UnsafeCell,
123 sync::atomic::{AtomicBool, AtomicU8, AtomicUsize},
124};
125
126#[derive(Clone, Copy)]
127pub struct Error;
128
129impl Debug for Error {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> core::fmt::Result {
131 f.write_str("The interchange is busy, this operation could not be performed")
132 }
133}
134
135#[repr(u8)]
136#[derive(Copy, Clone, Debug, Eq, PartialEq)]
137/// State of the RPC interchange
138pub enum State {
139 /// The requester may send a new request.
140 Idle = 0,
141 /// The requester is building a request, using the pre-allocated static data as &mut Request
142 BuildingRequest = 1,
143 /// The request is pending either processing by responder or cancelation by requester.
144 Requested = 2,
145 /// The responder is building a response, using the pre-allocated static data as &mut Response
146 /// It may opportunitstically be canceled by requester.
147 BuildingResponse = 3,
148 /// The responder sent a response.
149 Responded = 4,
150
151 Canceled = 12,
152}
153
154impl PartialEq<u8> for State {
155 #[inline]
156 fn eq(&self, other: &u8) -> bool {
157 *self as u8 == *other
158 }
159}
160
161impl From<u8> for State {
162 fn from(byte: u8) -> Self {
163 match byte {
164 1 => State::BuildingRequest,
165 2 => State::Requested,
166 3 => State::BuildingResponse,
167 4 => State::Responded,
168 12 => State::Canceled,
169 _ => State::Idle,
170 }
171 }
172}
173
174// the repr(u8) is necessary so MaybeUninit::zeroized.assume_init() is valid and corresponds to
175// None
176#[repr(u8)]
177enum Message<Rq, Rp> {
178 None,
179 Request(Rq),
180 Response(Rp),
181}
182
183impl<Rq, Rp> Message<Rq, Rp> {
184 fn is_request_state(&self) -> bool {
185 matches!(self, Self::Request(_))
186 }
187
188 fn is_response_state(&self) -> bool {
189 matches!(self, Self::Response(_))
190 }
191
192 fn take_rq(&mut self) -> Rq {
193 let this = core::mem::replace(self, Message::None);
194 match this {
195 Message::Request(r) => r,
196 _ => unreachable!(),
197 }
198 }
199
200 fn rq_ref(&self) -> &Rq {
201 match *self {
202 Self::Request(ref request) => request,
203 _ => unreachable!(),
204 }
205 }
206
207 fn rq_mut(&mut self) -> &mut Rq {
208 match *self {
209 Self::Request(ref mut request) => request,
210 _ => unreachable!(),
211 }
212 }
213
214 fn take_rp(&mut self) -> Rp {
215 let this = core::mem::replace(self, Message::None);
216 match this {
217 Message::Response(r) => r,
218 _ => unreachable!(),
219 }
220 }
221
222 fn rp_ref(&self) -> &Rp {
223 match *self {
224 Self::Response(ref response) => response,
225 _ => unreachable!(),
226 }
227 }
228
229 fn rp_mut(&mut self) -> &mut Rp {
230 match *self {
231 Self::Response(ref mut response) => response,
232 _ => unreachable!(),
233 }
234 }
235
236 fn from_rq(rq: Rq) -> Self {
237 Self::Request(rq)
238 }
239
240 fn from_rp(rp: Rp) -> Self {
241 Self::Response(rp)
242 }
243}
244
245/// Channel used for Request/Response mechanism.
246/// ```
247/// # #![cfg(not(loom))]
248/// # use interchange::{State, Channel};
249/// #[derive(Clone, Debug, PartialEq)]
250/// pub enum Request {
251/// This(u8, u32),
252/// That(i64),
253/// }
254///
255/// #[derive(Clone, Debug, PartialEq)]
256/// pub enum Response {
257/// Here(u8, u8, u8),
258/// There(i16),
259/// }
260///
261/// static CHANNEL: Channel<Request,Response> = Channel::new();
262///
263/// let (mut rq, mut rp) = CHANNEL.split().unwrap();
264///
265/// assert!(rq.state() == State::Idle);
266///
267/// // happy path: no cancelation
268/// let request = Request::This(1, 2);
269/// assert!(rq.request(request).is_ok());
270///
271/// let request = rp.take_request().unwrap();
272/// println!("rp got request: {:?}", request);
273///
274/// let response = Response::There(-1);
275/// assert!(!rp.is_canceled());
276/// assert!(rp.respond(response).is_ok());
277///
278/// let response = rq.take_response().unwrap();
279/// println!("rq got response: {:?}", response);
280///
281/// // early cancelation path
282/// assert!(rq.request(request).is_ok());
283///
284/// let request = rq.cancel().unwrap().unwrap();
285/// println!("responder could cancel: {:?}", request);
286///
287/// assert!(rp.take_request().is_none());
288/// assert!(State::Idle == rq.state());
289///
290/// // late cancelation
291/// assert!(rq.request(request).is_ok());
292/// let request = rp.take_request().unwrap();
293///
294/// println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
295/// assert!(rp.is_canceled());
296/// assert!(rp.respond(response).is_err());
297/// assert!(rp.acknowledge_cancel().is_ok());
298/// assert!(State::Idle == rq.state());
299///
300/// // building into request buffer
301/// impl Default for Request {
302/// fn default() -> Self {
303/// Request::That(0)
304/// }
305/// }
306///
307/// rq.with_request_mut(|r| *r = Request::This(1,2)).unwrap() ;
308/// assert!(rq.send_request().is_ok());
309/// let request = rp.take_request().unwrap();
310/// assert_eq!(request, Request::This(1, 2));
311/// println!("rp got request: {:?}", request);
312///
313/// // building into response buffer
314/// impl Default for Response {
315/// fn default() -> Self {
316/// Response::There(1)
317/// }
318/// }
319///
320/// rp.with_response_mut(|r| *r = Response::Here(3,2,1)).unwrap();
321/// assert!(rp.send_response().is_ok());
322/// let response = rq.take_response().unwrap();
323/// assert_eq!(response, Response::Here(3,2,1));
324///
325/// ```
326pub struct Channel<Rq, Rp> {
327 data: UnsafeCell<Message<Rq, Rp>>,
328 state: AtomicU8,
329 requester_claimed: AtomicBool,
330 responder_claimed: AtomicBool,
331}
332
333impl<Rq, Rp> Channel<Rq, Rp> {
334 // Loom's atomics are not const :/
335 #[cfg(not(loom))]
336 pub const fn new() -> Self {
337 Self {
338 data: UnsafeCell::new(Message::None),
339 state: AtomicU8::new(0),
340 requester_claimed: AtomicBool::new(false),
341 responder_claimed: AtomicBool::new(false),
342 }
343 }
344
345 #[cfg(loom)]
346 pub fn new() -> Self {
347 Self {
348 data: UnsafeCell::new(Message::None),
349 state: AtomicU8::new(0),
350 requester_claimed: AtomicBool::new(false),
351 responder_claimed: AtomicBool::new(false),
352 }
353 }
354
355 /// Obtain the requester end of the channel if it hasn't been taken yet.
356 ///
357 /// Can be called again if the previously obtained [`Requester`]() has been dropped
358 pub fn requester(&self) -> Option<Requester<'_, Rq, Rp>> {
359 if self
360 .requester_claimed
361 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
362 .is_ok()
363 {
364 Some(Requester { channel: self })
365 } else {
366 None
367 }
368 }
369
370 /// Obtain the responder end of the channel if it hasn't been taken yet.
371 ///
372 /// Can be called again if the previously obtained [`Responder`]() has been dropped
373 pub fn responder(&self) -> Option<Responder<'_, Rq, Rp>> {
374 if self
375 .responder_claimed
376 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
377 .is_ok()
378 {
379 Some(Responder { channel: self })
380 } else {
381 None
382 }
383 }
384
385 /// Obtain both the requester and responder ends of the channel.
386 ///
387 /// Can be called again if the previously obtained [`Responder`]() and [`Requester`]() have been dropped
388 pub fn split(&self) -> Option<(Requester<'_, Rq, Rp>, Responder<'_, Rq, Rp>)> {
389 Some((self.requester()?, self.responder()?))
390 }
391
392 fn transition(&self, from: State, to: State) -> bool {
393 self.state
394 .compare_exchange(from as u8, to as u8, Ordering::AcqRel, Ordering::Relaxed)
395 .is_ok()
396 }
397}
398
399impl<Rq, Rp> Default for Channel<Rq, Rp> {
400 fn default() -> Self {
401 Self::new()
402 }
403}
404
405/// Requester end of a channel
406///
407/// For a `static` [`Channel`]() or [`Interchange`](),
408/// the requester uses a `'static` lifetime parameter
409pub struct Requester<'i, Rq, Rp> {
410 channel: &'i Channel<Rq, Rp>,
411}
412
413impl<Rq, Rp> Drop for Requester<'_, Rq, Rp> {
414 fn drop(&mut self) {
415 self.channel
416 .requester_claimed
417 .store(false, Ordering::Release);
418 }
419}
420
421impl<'i, Rq, Rp> Requester<'i, Rq, Rp> {
422 pub fn channel(&self) -> &'i Channel<Rq, Rp> {
423 self.channel
424 }
425
426 #[cfg(not(loom))]
427 unsafe fn data(&self) -> &Message<Rq, Rp> {
428 &mut *self.channel.data.get()
429 }
430
431 #[cfg(not(loom))]
432 unsafe fn data_mut(&mut self) -> &mut Message<Rq, Rp> {
433 &mut *self.channel.data.get()
434 }
435
436 #[cfg(not(loom))]
437 unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
438 f(&*self.channel.data.get())
439 }
440
441 #[cfg(not(loom))]
442 unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
443 f(&mut *self.channel.data.get())
444 }
445
446 #[cfg(loom)]
447 unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
448 self.channel.data.with(|i| f(&*i))
449 }
450
451 #[cfg(loom)]
452 unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
453 self.channel.data.with_mut(|i| f(&mut *i))
454 }
455
456 #[inline]
457 /// Current state of the channel.
458 ///
459 /// Informational only!
460 ///
461 /// The responder may change this state between calls,
462 /// internally atomics ensure correctness.
463 pub fn state(&self) -> State {
464 State::from(self.channel.state.load(Ordering::Acquire))
465 }
466
467 /// Send a request to the responder.
468 ///
469 /// If efficiency is a concern, or requests need multiple steps to
470 /// construct, use `request_mut` and `send_request.
471 ///
472 /// If the RPC state is `Idle`, this always succeeds, else calling
473 /// is a logic error and the request is returned.
474 pub fn request(&mut self, request: Rq) -> Result<(), Error> {
475 if State::Idle == self.channel.state.load(Ordering::Acquire) {
476 unsafe {
477 self.with_data_mut(|i| *i = Message::from_rq(request));
478 }
479 self.channel
480 .state
481 .store(State::Requested as u8, Ordering::Release);
482 Ok(())
483 } else {
484 Err(Error)
485 }
486 }
487
488 /// Attempt to cancel a request.
489 ///
490 /// If the responder has not taken the request yet, this succeeds and returns
491 /// the request.
492 ///
493 /// If the responder has taken the request (is processing), we succeed and return None.
494 ///
495 /// In other cases (`Idle` or `Reponsed`) there is nothing to cancel and we fail.
496 pub fn cancel(&mut self) -> Result<Option<Rq>, Error> {
497 if self
498 .channel
499 .transition(State::BuildingResponse, State::Canceled)
500 {
501 // we canceled after the responder took the request, but before they answered.
502 return Ok(None);
503 }
504
505 if self.channel.transition(State::Requested, State::Idle) {
506 // we canceled before the responder was even aware of the request.
507 return Ok(Some(unsafe { self.with_data_mut(|i| i.take_rq()) }));
508 }
509
510 Err(Error)
511 }
512
513 /// If there is a response waiting, obtain a reference to it
514 ///
515 /// This may be called multiple times.
516 // Safety: We cannot test this with loom efficiently, but given that `with_response` is tested,
517 // this is likely correct
518 #[cfg(not(loom))]
519 pub fn response(&self) -> Result<&Rp, Error> {
520 if self.channel.transition(State::Responded, State::Responded) {
521 Ok(unsafe { self.data().rp_ref() })
522 } else {
523 Err(Error)
524 }
525 }
526
527 /// If there is a request waiting, perform an operation with a reference to it
528 ///
529 /// This may be called multiple times.
530 pub fn with_response<R>(&self, f: impl FnOnce(&Rp) -> R) -> Result<R, Error> {
531 if self.channel.transition(State::Responded, State::Responded) {
532 Ok(unsafe { self.with_data(|i| f(i.rp_ref())) })
533 } else {
534 Err(Error)
535 }
536 }
537
538 /// Look for a response.
539 /// If the responder has sent a response, we return it.
540 ///
541 /// This may be called only once as it move the state to Idle.
542 /// If you need copies, clone the request.
543 // It is a logic error to call this method if we're Idle or Canceled, but
544 // it seems unnecessary to model this.
545 pub fn take_response(&mut self) -> Option<Rp> {
546 if self.channel.transition(State::Responded, State::Idle) {
547 Some(unsafe { self.with_data_mut(|i| i.take_rp()) })
548 } else {
549 None
550 }
551 }
552}
553
554impl<Rq, Rp> Requester<'_, Rq, Rp>
555where
556 Rq: Default,
557{
558 /// Initialize a request with its default values and mutates it with `f`
559 ///
560 /// This is usefull to build large structures in-place
561 pub fn with_request_mut<R>(&mut self, f: impl FnOnce(&mut Rq) -> R) -> Result<R, Error> {
562 if self.channel.transition(State::Idle, State::BuildingRequest)
563 || self
564 .channel
565 .transition(State::BuildingRequest, State::BuildingRequest)
566 {
567 let res = unsafe {
568 self.with_data_mut(|i| {
569 if !i.is_request_state() {
570 *i = Message::from_rq(Rq::default());
571 }
572 f(i.rq_mut())
573 })
574 };
575 Ok(res)
576 } else {
577 Err(Error)
578 }
579 }
580
581 /// Initialize a request with its default values and and return a mutable reference to it
582 ///
583 /// This is usefull to build large structures in-place
584 // Safety: We cannot test this with loom efficiently, but given that `with_request_mut` is tested,
585 // this is likely correct
586 #[cfg(not(loom))]
587 pub fn request_mut(&mut self) -> Result<&mut Rq, Error> {
588 if self.channel.transition(State::Idle, State::BuildingRequest)
589 || self
590 .channel
591 .transition(State::BuildingRequest, State::BuildingRequest)
592 {
593 unsafe {
594 self.with_data_mut(|i| {
595 if !i.is_request_state() {
596 *i = Message::from_rq(Rq::default());
597 }
598 })
599 }
600 Ok(unsafe { self.data_mut().rq_mut() })
601 } else {
602 Err(Error)
603 }
604 }
605
606 /// Send a request that was already placed in the channel using `request_mut` or
607 /// `with_request_mut`.
608 pub fn send_request(&mut self) -> Result<(), Error> {
609 if State::BuildingRequest == self.channel.state.load(Ordering::Acquire)
610 && self
611 .channel
612 .transition(State::BuildingRequest, State::Requested)
613 {
614 Ok(())
615 } else {
616 // logic error
617 Err(Error)
618 }
619 }
620}
621
622/// Responder end of a channel
623///
624/// For a `static` [`Channel`]() or [`Interchange`](),
625/// the responder uses a `'static` lifetime parameter
626pub struct Responder<'i, Rq, Rp> {
627 channel: &'i Channel<Rq, Rp>,
628}
629
630impl<Rq, Rp> Drop for Responder<'_, Rq, Rp> {
631 fn drop(&mut self) {
632 self.channel
633 .responder_claimed
634 .store(false, Ordering::Release);
635 }
636}
637
638impl<'i, Rq, Rp> Responder<'i, Rq, Rp> {
639 pub fn channel(&self) -> &'i Channel<Rq, Rp> {
640 self.channel
641 }
642
643 #[cfg(not(loom))]
644 unsafe fn data(&self) -> &Message<Rq, Rp> {
645 &mut *self.channel.data.get()
646 }
647
648 #[cfg(not(loom))]
649 unsafe fn data_mut(&mut self) -> &mut Message<Rq, Rp> {
650 &mut *self.channel.data.get()
651 }
652
653 #[cfg(not(loom))]
654 unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
655 f(&*self.channel.data.get())
656 }
657
658 #[cfg(not(loom))]
659 unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
660 f(&mut *self.channel.data.get())
661 }
662
663 #[cfg(loom)]
664 unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
665 self.channel.data.with(|i| f(&*i))
666 }
667
668 #[cfg(loom)]
669 unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
670 self.channel.data.with_mut(|i| f(&mut *i))
671 }
672
673 #[inline]
674 /// Current state of the channel.
675 ///
676 /// Informational only!
677 ///
678 /// The responder may change this state between calls,
679 /// internally atomics ensure correctness.
680 pub fn state(&self) -> State {
681 State::from(self.channel.state.load(Ordering::Acquire))
682 }
683
684 /// If there is a request waiting, perform an operation with a reference to it
685 ///
686 /// This may be called only once as it move the state to BuildingResponse.
687 /// If you need copies, use `take_request`
688 pub fn with_request<R>(&self, f: impl FnOnce(&Rq) -> R) -> Result<R, Error> {
689 if self
690 .channel
691 .transition(State::Requested, State::BuildingResponse)
692 {
693 Ok(unsafe { self.with_data(|i| f(i.rq_ref())) })
694 } else {
695 Err(Error)
696 }
697 }
698
699 /// If there is a request waiting, obtain a reference to it
700 ///
701 /// This may be called multiple times.
702 // Safety: We cannot test this with loom efficiently, but given that `with_request` is tested,
703 // this is likely correct
704 #[cfg(not(loom))]
705 pub fn request(&self) -> Result<&Rq, Error> {
706 if self
707 .channel
708 .transition(State::Requested, State::BuildingResponse)
709 {
710 Ok(unsafe { self.data().rq_ref() })
711 } else {
712 Err(Error)
713 }
714 }
715
716 /// If there is a request waiting, take a reference to it out
717 ///
718 /// This may be called only once as it move the state to BuildingResponse.
719 /// If you need copies, clone the request.
720 pub fn take_request(&mut self) -> Option<Rq> {
721 if self
722 .channel
723 .transition(State::Requested, State::BuildingResponse)
724 {
725 Some(unsafe { self.with_data_mut(|i| i.take_rq()) })
726 } else {
727 None
728 }
729 }
730
731 // Check if requester attempted to cancel
732 pub fn is_canceled(&self) -> bool {
733 self.channel.state.load(Ordering::SeqCst) == State::Canceled as u8
734 }
735
736 // Acknowledge a cancel, thereby setting Channel to Idle state again.
737 //
738 // It is a logic error to call this method if there is no pending cancellation.
739 pub fn acknowledge_cancel(&self) -> Result<(), Error> {
740 if self.channel.transition(State::Canceled, State::Idle) {
741 Ok(())
742 } else {
743 Err(Error)
744 }
745 }
746
747 /// Respond to a request.
748 ///
749 /// If efficiency is a concern, or responses need multiple steps to
750 /// construct, use `with_response_mut` or `response_mut` and `send_response`.
751 ///
752 pub fn respond(&mut self, response: Rp) -> Result<(), Error> {
753 if State::BuildingResponse == self.channel.state.load(Ordering::Acquire) {
754 unsafe {
755 self.with_data_mut(|i| *i = Message::from_rp(response));
756 }
757 if self
758 .channel
759 .transition(State::BuildingResponse, State::Responded)
760 {
761 Ok(())
762 } else {
763 Err(Error)
764 }
765 } else {
766 Err(Error)
767 }
768 }
769}
770
771impl<Rq, Rp> Responder<'_, Rq, Rp>
772where
773 Rp: Default,
774{
775 /// Initialize a response with its default values and mutates it with `f`
776 ///
777 /// This is usefull to build large structures in-place
778 pub fn with_response_mut<R>(&mut self, f: impl FnOnce(&mut Rp) -> R) -> Result<R, Error> {
779 if self
780 .channel
781 .transition(State::Requested, State::BuildingResponse)
782 || self
783 .channel
784 .transition(State::BuildingResponse, State::BuildingResponse)
785 {
786 let res = unsafe {
787 self.with_data_mut(|i| {
788 if !i.is_response_state() {
789 *i = Message::from_rp(Rp::default());
790 }
791 f(i.rp_mut())
792 })
793 };
794 Ok(res)
795 } else {
796 Err(Error)
797 }
798 }
799
800 /// Initialize a response with its default values and and return a mutable reference to it
801 ///
802 /// This is usefull to build large structures in-place
803 // Safety: We cannot test this with loom efficiently, but given that `with_response_mut` is tested,
804 // this is likely correct
805 #[cfg(not(loom))]
806 pub fn response_mut(&mut self) -> Result<&mut Rp, Error> {
807 if self
808 .channel
809 .transition(State::Requested, State::BuildingResponse)
810 || self
811 .channel
812 .transition(State::BuildingResponse, State::BuildingResponse)
813 {
814 unsafe {
815 self.with_data_mut(|i| {
816 if !i.is_response_state() {
817 *i = Message::from_rp(Rp::default());
818 }
819 })
820 }
821 Ok(unsafe { self.data_mut().rp_mut() })
822 } else {
823 Err(Error)
824 }
825 }
826
827 /// Send a response that was already placed in the channel using `response_mut` or
828 /// `with_response_mut`.
829 pub fn send_response(&mut self) -> Result<(), Error> {
830 if State::BuildingResponse == self.channel.state.load(Ordering::Acquire)
831 && self
832 .channel
833 .transition(State::BuildingResponse, State::Responded)
834 {
835 Ok(())
836 } else {
837 // logic error
838 Err(Error)
839 }
840 }
841}
842
843// Safety: The channel can be split, which then allows getting sending the Rq and Rp types across threads
844// TODO: is the Sync bound really necessary?
845unsafe impl<Rq, Rp> Sync for Channel<Rq, Rp>
846where
847 Rq: Send + Sync,
848 Rp: Send + Sync,
849{
850}
851
852/// Set of `N` channels
853///
854/// Channels can be claimed with [`claim()`](Self::claim)
855///
856/// ```
857/// # #![cfg(not(loom))]
858/// # use interchange::*;
859/// # #[derive(Clone, Debug, PartialEq)]
860/// # pub enum Request {
861/// # This(u8, u32),
862/// # That(i64),
863/// # }
864/// #
865/// # #[derive(Clone, Debug, PartialEq)]
866/// # pub enum Response {
867/// # Here(u8, u8, u8),
868/// # There(i16),
869/// # }
870/// #
871/// static interchange: Interchange<Request, Response,10> = Interchange::new();
872///
873/// for i in 0..10 {
874/// let rq: Requester<'_, Request, Response>;
875/// let rp: Responder<'_, Request, Response>;
876/// (rq, rp) = interchange.claim().unwrap() ;
877/// }
878/// ```
879pub struct Interchange<Rq, Rp, const N: usize> {
880 channels: [Channel<Rq, Rp>; N],
881 last_claimed: AtomicUsize,
882}
883
884impl<Rq, Rp, const N: usize> Interchange<Rq, Rp, N> {
885 /// Create a new Interchange
886 #[cfg(not(loom))]
887 pub const fn new() -> Self {
888 Self {
889 channels: [const { Channel::new() }; N],
890 last_claimed: AtomicUsize::new(0),
891 }
892 }
893
894 /// Create a new Interchange
895 #[cfg(loom)]
896 pub fn new() -> Self {
897 Self {
898 channels: core::array::from_fn(|_| Channel::new()),
899 last_claimed: AtomicUsize::new(0),
900 }
901 }
902
903 /// Claim one of the channels of the interchange. Returns None if called more than `N` times.
904 pub fn claim(&self) -> Option<(Requester<Rq, Rp>, Responder<Rq, Rp>)> {
905 self.as_interchange_ref().claim()
906 }
907
908 /// Returns a reference to the interchange with the `N` const-generic removed.
909 /// This can avoid the requirement to have `const N: usize` everywhere
910 /// ```
911 /// # #![cfg(not(loom))]
912 /// # use interchange::{State, Interchange, InterchangeRef};
913 /// # #[derive(Clone, Debug, PartialEq)]
914 /// # pub enum Request {
915 /// # This(u8, u32),
916 /// # That(i64),
917 /// # }
918 /// # #[derive(Clone, Debug, PartialEq)]
919 /// # pub enum Response {
920 /// # Here(u8, u8, u8),
921 /// # There(i16),
922 /// # }
923 /// static INTERCHANGE_INNER: Interchange<Request, Response, 1> = Interchange::new();
924 ///
925 /// // The size of the interchange is absent from the type
926 /// static INTERCHANGE: InterchangeRef<'static, Request, Response> = INTERCHANGE_INNER.as_interchange_ref();
927 ///
928 /// let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
929 /// ```
930 pub const fn as_interchange_ref(&self) -> InterchangeRef<'_, Rq, Rp> {
931 InterchangeRef {
932 channels: &self.channels,
933 last_claimed: &self.last_claimed,
934 }
935 }
936}
937
938/// Interchange witout the `const N: usize` generic parameter
939/// Obtained using [`Interchange::as_interchange_ref`](Interchange::as_interchange_ref)
940pub struct InterchangeRef<'alloc, Rq, Rp> {
941 channels: &'alloc [Channel<Rq, Rp>],
942 last_claimed: &'alloc AtomicUsize,
943}
944
945impl<'alloc, Rq, Rp> InterchangeRef<'alloc, Rq, Rp> {
946 /// Claim one of the channels of the interchange. Returns None if called more than `N` times.
947 pub fn claim(&self) -> Option<(Requester<'alloc, Rq, Rp>, Responder<'alloc, Rq, Rp>)> {
948 let index = self.last_claimed.fetch_add(1, Ordering::Relaxed);
949 let n = self.channels.len();
950
951 for i in (index % n)..n {
952 let tmp = self.channels[i].split();
953 if tmp.is_some() {
954 return tmp;
955 }
956 }
957
958 for i in 0..(index % n) {
959 let tmp = self.channels[i].split();
960 if tmp.is_some() {
961 return tmp;
962 }
963 }
964 None
965 }
966}
967
968impl<Rq, Rp> Clone for InterchangeRef<'_, Rq, Rp> {
969 fn clone(&self) -> Self {
970 *self
971 }
972}
973
974impl<Rq, Rp> Copy for InterchangeRef<'_, Rq, Rp> {}
975
976impl<Rq, Rp, const N: usize> Default for Interchange<Rq, Rp, N> {
977 fn default() -> Self {
978 Self::new()
979 }
980}
981
982/// ```compile_fail
983/// use std::rc::Rc;
984/// use interchange::*;
985/// #[allow(unconditional_recursion, unused)]
986/// fn assert_send<T: Send>() {
987/// assert_send::<Channel<Rc<String>, u32>>();
988/// }
989/// ```
990/// ```compile_fail
991/// use std::rc::Rc;
992/// use interchange::*;
993/// #[allow(unconditional_recursion, unused)]
994/// fn assert_send<T: Send>() {
995/// assert_send::<Requester<Rc<String>, u32>>();
996/// }
997/// ```
998/// ```compile_fail
999/// use std::rc::Rc;
1000/// use interchange::*;
1001/// #[allow(unconditional_recursion, unused)]
1002/// fn assert_send<T: Send>() {
1003/// assert_send::<Responder<Rc<String>, u32>>();
1004/// }
1005/// ```
1006/// ```compile_fail
1007/// use std::rc::Rc;
1008/// use interchange::*;
1009/// #[allow(unconditional_recursion, unused)]
1010/// fn assert_sync<T: Sync>() {
1011/// assert_sync::<Channel<Rc<String>, u32>>();
1012/// }
1013/// ```
1014/// ```compile_fail
1015/// use std::rc::Rc;
1016/// use interchange::*;
1017/// #[allow(unconditional_recursion, unused)]
1018/// fn assert_sync<T: Sync>() {
1019/// assert_sync::<Requester<Rc<String>, u32>>();
1020/// }
1021/// ```
1022/// ```compile_fail
1023/// use std::rc::Rc;
1024/// use interchange::*;
1025/// #[allow(unconditional_recursion, unused)]
1026/// fn assert_sync<T: Sync>() {
1027/// assert_sync::<Responder<Rc<String>, u32>>();
1028/// }
1029/// ```
1030const _ASSERT_COMPILE_FAILS: () = {};
1031
1032#[cfg(all(not(loom), test))]
1033mod tests {
1034 use super::*;
1035 #[derive(Clone, Debug, PartialEq)]
1036 pub enum Request {
1037 This(u8, u32),
1038 }
1039 #[derive(Clone, Debug, PartialEq)]
1040 pub enum Response {
1041 Here(u8, u8, u8),
1042 There(i16),
1043 }
1044 impl Default for Response {
1045 fn default() -> Self {
1046 Response::There(1)
1047 }
1048 }
1049 impl Default for Request {
1050 fn default() -> Self {
1051 Request::This(0, 0)
1052 }
1053 }
1054
1055 #[test]
1056 fn interchange() {
1057 static INTERCHANGE: Interchange<Request, Response, 1> = Interchange::new();
1058 let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
1059 assert_eq!(rq.state(), State::Idle);
1060 // happy path: no cancelation
1061 let request = Request::This(1, 2);
1062 assert!(rq.request(request).is_ok());
1063 let request = rp.take_request().unwrap();
1064 println!("rp got request: {request:?}");
1065 let response = Response::There(-1);
1066 assert!(!rp.is_canceled());
1067 assert!(rp.respond(response).is_ok());
1068 let response = rq.take_response().unwrap();
1069 println!("rq got response: {response:?}");
1070 // early cancelation path
1071 assert!(rq.request(request).is_ok());
1072 let request = rq.cancel().unwrap().unwrap();
1073 println!("responder could cancel: {request:?}");
1074 assert!(rp.take_request().is_none());
1075 assert_eq!(State::Idle, rq.state());
1076 // late cancelation
1077 assert!(rq.request(request).is_ok());
1078 let request = rp.take_request().unwrap();
1079 println!(
1080 "responder could cancel: {:?}",
1081 &rq.cancel().unwrap().is_none()
1082 );
1083 assert_eq!(request, Request::This(1, 2));
1084 assert!(rp.is_canceled());
1085 assert!(rp.respond(response).is_err());
1086 assert!(rp.acknowledge_cancel().is_ok());
1087 assert_eq!(State::Idle, rq.state());
1088 // building into request buffer
1089 rq.with_request_mut(|r| *r = Request::This(1, 2)).unwrap();
1090 assert!(rq.send_request().is_ok());
1091 let request = rp.take_request().unwrap();
1092 assert_eq!(request, Request::This(1, 2));
1093 println!("rp got request: {request:?}");
1094 // building into response buffer
1095 rp.with_response_mut(|r| *r = Response::Here(3, 2, 1))
1096 .unwrap();
1097 assert!(rp.send_response().is_ok());
1098 let response = rq.take_response().unwrap();
1099 assert_eq!(response, Response::Here(3, 2, 1));
1100 }
1101
1102 #[test]
1103 fn interchange_ref() {
1104 static INTERCHANGE_INNER: Interchange<Request, Response, 1> = Interchange::new();
1105 static INTERCHANGE: InterchangeRef<'static, Request, Response> =
1106 INTERCHANGE_INNER.as_interchange_ref();
1107 let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
1108 assert_eq!(rq.state(), State::Idle);
1109 // happy path: no cancelation
1110 let request = Request::This(1, 2);
1111 assert!(rq.request(request).is_ok());
1112 let request = rp.take_request().unwrap();
1113 println!("rp got request: {request:?}");
1114 let response = Response::There(-1);
1115 assert!(!rp.is_canceled());
1116 assert!(rp.respond(response).is_ok());
1117 let response = rq.take_response().unwrap();
1118 println!("rq got response: {response:?}");
1119 // early cancelation path
1120 assert!(rq.request(request).is_ok());
1121 let request = rq.cancel().unwrap().unwrap();
1122 println!("responder could cancel: {request:?}");
1123 assert!(rp.take_request().is_none());
1124 assert_eq!(State::Idle, rq.state());
1125 // late cancelation
1126 assert!(rq.request(request).is_ok());
1127 let request = rp.take_request().unwrap();
1128 println!(
1129 "responder could cancel: {:?}",
1130 &rq.cancel().unwrap().is_none()
1131 );
1132 assert_eq!(request, Request::This(1, 2));
1133 assert!(rp.is_canceled());
1134 assert!(rp.respond(response).is_err());
1135 assert!(rp.acknowledge_cancel().is_ok());
1136 assert_eq!(State::Idle, rq.state());
1137 // building into request buffer
1138 rq.with_request_mut(|r| *r = Request::This(1, 2)).unwrap();
1139 assert!(rq.send_request().is_ok());
1140 let request = rp.take_request().unwrap();
1141 assert_eq!(request, Request::This(1, 2));
1142 println!("rp got request: {request:?}");
1143 // building into response buffer
1144 rp.with_response_mut(|r| *r = Response::Here(3, 2, 1))
1145 .unwrap();
1146 assert!(rp.send_response().is_ok());
1147 let response = rq.take_response().unwrap();
1148 assert_eq!(response, Response::Here(3, 2, 1));
1149 }
1150
1151 #[allow(unconditional_recursion, clippy::extra_unused_type_parameters, unused)]
1152 fn assert_send<T: Send>() {
1153 assert_send::<Channel<String, u32>>();
1154 assert_send::<Responder<'static, String, u32>>();
1155 assert_send::<Requester<'static, String, u32>>();
1156 assert_send::<Channel<&'static mut String, u32>>();
1157 assert_send::<Responder<'static, &'static mut String, u32>>();
1158 assert_send::<Requester<'static, &'static mut String, u32>>();
1159 }
1160 #[allow(unconditional_recursion, clippy::extra_unused_type_parameters, unused)]
1161 fn assert_sync<T: Sync>() {
1162 assert_sync::<Channel<String, u32>>();
1163 assert_sync::<Channel<String, u32>>();
1164 assert_sync::<Responder<'static, String, u32>>();
1165 assert_sync::<Requester<'static, String, u32>>();
1166
1167 assert_sync::<Channel<&'static mut String, u32>>();
1168 assert_sync::<Responder<'static, &'static mut String, u32>>();
1169 assert_sync::<Requester<'static, &'static mut String, u32>>();
1170 }
1171}