interchange/
lib.rs

1#![cfg_attr(not(test), no_std)]
2//! Implement a somewhat convenient and somewhat efficient way to perform RPC
3//! in an embedded context.
4//!
5//! The approach is inspired by Go's channels, with the restriction that
6//! there is a clear separation into a requester and a responder.
7//!
8//! Requests may be canceled, which the responder should honour on a
9//! best-effort basis.
10//!
11//! ### Example use cases
12//! - USB device interrupt handler performs low-level protocol details, hands off
13//!   commands from the host to higher-level logic running in the idle thread.
14//!   This higher-level logic need only understand clearly typed commands, as
15//!   moduled by variants of a given `Request` enum.
16//! - `trussed` crypto service, responding to crypto request from apps across
17//!   TrustZone for Cortex-M secure/non-secure boundaries.
18//! - Request to blink a few lights and reply on button press
19//!
20//!
21//! ### Approach
22//! It is assumed that all requests fit in a single `Request` enum, and that
23//! all responses fit in single `Response` enum. The [`Channel`]() and [`Interchange`]() structs allocate a single buffer in which either Request or Response fit and handle synchronization
24//! Both structures have `const` constructors, allowing them to be statically allocated.
25//!
26//! An alternative approach would be to use two heapless Queues of length one
27//! each for response and requests. The advantage of our construction is to
28//! have only one static memory region in use.
29//!
30//! ```
31//! # #![cfg(not(loom))]
32//! # use interchange::{State, Interchange};
33//! #[derive(Clone, Debug, PartialEq)]
34//! pub enum Request {
35//!     This(u8, u32),
36//!     That(i64),
37//! }
38//!
39//! #[derive(Clone, Debug, PartialEq)]
40//! pub enum Response {
41//!     Here(u8, u8, u8),
42//!     There(i16),
43//! }
44//!
45//! static INTERCHANGE: Interchange<Request, Response, 1> = Interchange::new();
46//!
47//! let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
48//!
49//! assert!(rq.state() == State::Idle);
50//!
51//! // happy path: no cancelation
52//! let request = Request::This(1, 2);
53//! assert!(rq.request(request).is_ok());
54//!
55//! let request = rp.take_request().unwrap();
56//! println!("rp got request: {:?}", request);
57//!
58//! let response = Response::There(-1);
59//! assert!(!rp.is_canceled());
60//! assert!(rp.respond(response).is_ok());
61//!
62//! let response = rq.take_response().unwrap();
63//! println!("rq got response: {:?}", response);
64//!
65//! // early cancelation path
66//! assert!(rq.request(request).is_ok());
67//!
68//! let request =  rq.cancel().unwrap().unwrap();
69//! println!("responder could cancel: {:?}", request);
70//!
71//! assert!(rp.take_request().is_none());
72//! assert!(State::Idle == rq.state());
73//!
74//! // late cancelation
75//! assert!(rq.request(request).is_ok());
76//! let request = rp.take_request().unwrap();
77//!
78//! println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
79//! assert!(rp.is_canceled());
80//! assert!(rp.respond(response).is_err());
81//! assert!(rp.acknowledge_cancel().is_ok());
82//! assert!(State::Idle == rq.state());
83//!
84//! // building into request buffer
85//! impl Default for Request {
86//!   fn default() -> Self {
87//!     Request::That(0)
88//!   }
89//! }
90//!
91//! rq.with_request_mut(|r| *r = Request::This(1,2)).unwrap() ;
92//! assert!(rq.send_request().is_ok());
93//! let request = rp.take_request().unwrap();
94//! assert_eq!(request, Request::This(1, 2));
95//! println!("rp got request: {:?}", request);
96//!
97//! // building into response buffer
98//! impl Default for Response {
99//!   fn default() -> Self {
100//!     Response::There(1)
101//!   }
102//! }
103//!
104//! rp.with_response_mut(|r| *r = Response::Here(3,2,1)).unwrap();
105//! assert!(rp.send_response().is_ok());
106//! let response = rq.take_response().unwrap();
107//! assert_eq!(response, Response::Here(3,2,1));
108//!
109//! ```
110
111use core::fmt::{self, Debug};
112use core::sync::atomic::Ordering;
113
114#[cfg(loom)]
115use loom::{
116    cell::UnsafeCell,
117    sync::atomic::{AtomicBool, AtomicU8, AtomicUsize},
118};
119
120#[cfg(not(loom))]
121use core::{
122    cell::UnsafeCell,
123    sync::atomic::{AtomicBool, AtomicU8, AtomicUsize},
124};
125
126#[derive(Clone, Copy)]
127pub struct Error;
128
129impl Debug for Error {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> core::fmt::Result {
131        f.write_str("The interchange is busy, this operation could not be performed")
132    }
133}
134
135#[repr(u8)]
136#[derive(Copy, Clone, Debug, Eq, PartialEq)]
137/// State of the RPC interchange
138pub enum State {
139    /// The requester may send a new request.
140    Idle = 0,
141    /// The requester is building a request, using the pre-allocated static data as &mut Request
142    BuildingRequest = 1,
143    /// The request is pending either processing by responder or cancelation by requester.
144    Requested = 2,
145    /// The responder is building a response, using the pre-allocated static data as &mut Response
146    /// It may opportunitstically be canceled by requester.
147    BuildingResponse = 3,
148    /// The responder sent a response.
149    Responded = 4,
150
151    Canceled = 12,
152}
153
154impl PartialEq<u8> for State {
155    #[inline]
156    fn eq(&self, other: &u8) -> bool {
157        *self as u8 == *other
158    }
159}
160
161impl From<u8> for State {
162    fn from(byte: u8) -> Self {
163        match byte {
164            1 => State::BuildingRequest,
165            2 => State::Requested,
166            3 => State::BuildingResponse,
167            4 => State::Responded,
168            12 => State::Canceled,
169            _ => State::Idle,
170        }
171    }
172}
173
174// the repr(u8) is necessary so MaybeUninit::zeroized.assume_init() is valid and corresponds to
175// None
176#[repr(u8)]
177enum Message<Rq, Rp> {
178    None,
179    Request(Rq),
180    Response(Rp),
181}
182
183impl<Rq, Rp> Message<Rq, Rp> {
184    fn is_request_state(&self) -> bool {
185        matches!(self, Self::Request(_))
186    }
187
188    fn is_response_state(&self) -> bool {
189        matches!(self, Self::Response(_))
190    }
191
192    fn take_rq(&mut self) -> Rq {
193        let this = core::mem::replace(self, Message::None);
194        match this {
195            Message::Request(r) => r,
196            _ => unreachable!(),
197        }
198    }
199
200    fn rq_ref(&self) -> &Rq {
201        match *self {
202            Self::Request(ref request) => request,
203            _ => unreachable!(),
204        }
205    }
206
207    fn rq_mut(&mut self) -> &mut Rq {
208        match *self {
209            Self::Request(ref mut request) => request,
210            _ => unreachable!(),
211        }
212    }
213
214    fn take_rp(&mut self) -> Rp {
215        let this = core::mem::replace(self, Message::None);
216        match this {
217            Message::Response(r) => r,
218            _ => unreachable!(),
219        }
220    }
221
222    fn rp_ref(&self) -> &Rp {
223        match *self {
224            Self::Response(ref response) => response,
225            _ => unreachable!(),
226        }
227    }
228
229    fn rp_mut(&mut self) -> &mut Rp {
230        match *self {
231            Self::Response(ref mut response) => response,
232            _ => unreachable!(),
233        }
234    }
235
236    fn from_rq(rq: Rq) -> Self {
237        Self::Request(rq)
238    }
239
240    fn from_rp(rp: Rp) -> Self {
241        Self::Response(rp)
242    }
243}
244
245/// Channel used for Request/Response mechanism.
246/// ```
247/// # #![cfg(not(loom))]
248/// # use interchange::{State, Channel};
249/// #[derive(Clone, Debug, PartialEq)]
250/// pub enum Request {
251///     This(u8, u32),
252///     That(i64),
253/// }
254///
255/// #[derive(Clone, Debug, PartialEq)]
256/// pub enum Response {
257///     Here(u8, u8, u8),
258///     There(i16),
259/// }
260///
261/// static CHANNEL: Channel<Request,Response> = Channel::new();
262///
263/// let (mut rq, mut rp) = CHANNEL.split().unwrap();
264///
265/// assert!(rq.state() == State::Idle);
266///
267/// // happy path: no cancelation
268/// let request = Request::This(1, 2);
269/// assert!(rq.request(request).is_ok());
270///
271/// let request = rp.take_request().unwrap();
272/// println!("rp got request: {:?}", request);
273///
274/// let response = Response::There(-1);
275/// assert!(!rp.is_canceled());
276/// assert!(rp.respond(response).is_ok());
277///
278/// let response = rq.take_response().unwrap();
279/// println!("rq got response: {:?}", response);
280///
281/// // early cancelation path
282/// assert!(rq.request(request).is_ok());
283///
284/// let request =  rq.cancel().unwrap().unwrap();
285/// println!("responder could cancel: {:?}", request);
286///
287/// assert!(rp.take_request().is_none());
288/// assert!(State::Idle == rq.state());
289///
290/// // late cancelation
291/// assert!(rq.request(request).is_ok());
292/// let request = rp.take_request().unwrap();
293///
294/// println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
295/// assert!(rp.is_canceled());
296/// assert!(rp.respond(response).is_err());
297/// assert!(rp.acknowledge_cancel().is_ok());
298/// assert!(State::Idle == rq.state());
299///
300/// // building into request buffer
301/// impl Default for Request {
302///   fn default() -> Self {
303///     Request::That(0)
304///   }
305/// }
306///
307/// rq.with_request_mut(|r| *r = Request::This(1,2)).unwrap() ;
308/// assert!(rq.send_request().is_ok());
309/// let request = rp.take_request().unwrap();
310/// assert_eq!(request, Request::This(1, 2));
311/// println!("rp got request: {:?}", request);
312///
313/// // building into response buffer
314/// impl Default for Response {
315///   fn default() -> Self {
316///     Response::There(1)
317///   }
318/// }
319///
320/// rp.with_response_mut(|r| *r = Response::Here(3,2,1)).unwrap();
321/// assert!(rp.send_response().is_ok());
322/// let response = rq.take_response().unwrap();
323/// assert_eq!(response, Response::Here(3,2,1));
324///
325/// ```
326pub struct Channel<Rq, Rp> {
327    data: UnsafeCell<Message<Rq, Rp>>,
328    state: AtomicU8,
329    requester_claimed: AtomicBool,
330    responder_claimed: AtomicBool,
331}
332
333impl<Rq, Rp> Channel<Rq, Rp> {
334    // Loom's atomics are not const :/
335    #[cfg(not(loom))]
336    pub const fn new() -> Self {
337        Self {
338            data: UnsafeCell::new(Message::None),
339            state: AtomicU8::new(0),
340            requester_claimed: AtomicBool::new(false),
341            responder_claimed: AtomicBool::new(false),
342        }
343    }
344
345    #[cfg(loom)]
346    pub fn new() -> Self {
347        Self {
348            data: UnsafeCell::new(Message::None),
349            state: AtomicU8::new(0),
350            requester_claimed: AtomicBool::new(false),
351            responder_claimed: AtomicBool::new(false),
352        }
353    }
354
355    /// Obtain the requester end of the channel if it hasn't been taken yet.
356    ///
357    /// Can be called again if the previously obtained [`Requester`]() has been dropped
358    pub fn requester(&self) -> Option<Requester<'_, Rq, Rp>> {
359        if self
360            .requester_claimed
361            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
362            .is_ok()
363        {
364            Some(Requester { channel: self })
365        } else {
366            None
367        }
368    }
369
370    /// Obtain the responder end of the channel if it hasn't been taken yet.
371    ///
372    /// Can be called again if the previously obtained [`Responder`]() has been dropped
373    pub fn responder(&self) -> Option<Responder<'_, Rq, Rp>> {
374        if self
375            .responder_claimed
376            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
377            .is_ok()
378        {
379            Some(Responder { channel: self })
380        } else {
381            None
382        }
383    }
384
385    /// Obtain both the requester and responder ends of the channel.
386    ///
387    /// Can be called again if the previously obtained [`Responder`]() and [`Requester`]() have been dropped
388    pub fn split(&self) -> Option<(Requester<'_, Rq, Rp>, Responder<'_, Rq, Rp>)> {
389        Some((self.requester()?, self.responder()?))
390    }
391
392    fn transition(&self, from: State, to: State) -> bool {
393        self.state
394            .compare_exchange(from as u8, to as u8, Ordering::AcqRel, Ordering::Relaxed)
395            .is_ok()
396    }
397}
398
399impl<Rq, Rp> Default for Channel<Rq, Rp> {
400    fn default() -> Self {
401        Self::new()
402    }
403}
404
405/// Requester end of a channel
406///
407/// For a `static` [`Channel`]() or [`Interchange`](),
408/// the requester uses a `'static` lifetime parameter
409pub struct Requester<'i, Rq, Rp> {
410    channel: &'i Channel<Rq, Rp>,
411}
412
413impl<Rq, Rp> Drop for Requester<'_, Rq, Rp> {
414    fn drop(&mut self) {
415        self.channel
416            .requester_claimed
417            .store(false, Ordering::Release);
418    }
419}
420
421impl<'i, Rq, Rp> Requester<'i, Rq, Rp> {
422    pub fn channel(&self) -> &'i Channel<Rq, Rp> {
423        self.channel
424    }
425
426    #[cfg(not(loom))]
427    unsafe fn data(&self) -> &Message<Rq, Rp> {
428        &mut *self.channel.data.get()
429    }
430
431    #[cfg(not(loom))]
432    unsafe fn data_mut(&mut self) -> &mut Message<Rq, Rp> {
433        &mut *self.channel.data.get()
434    }
435
436    #[cfg(not(loom))]
437    unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
438        f(&*self.channel.data.get())
439    }
440
441    #[cfg(not(loom))]
442    unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
443        f(&mut *self.channel.data.get())
444    }
445
446    #[cfg(loom)]
447    unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
448        self.channel.data.with(|i| f(&*i))
449    }
450
451    #[cfg(loom)]
452    unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
453        self.channel.data.with_mut(|i| f(&mut *i))
454    }
455
456    #[inline]
457    /// Current state of the channel.
458    ///
459    /// Informational only!
460    ///
461    /// The responder may change this state between calls,
462    /// internally atomics ensure correctness.
463    pub fn state(&self) -> State {
464        State::from(self.channel.state.load(Ordering::Acquire))
465    }
466
467    /// Send a request to the responder.
468    ///
469    /// If efficiency is a concern, or requests need multiple steps to
470    /// construct, use `request_mut` and `send_request.
471    ///
472    /// If the RPC state is `Idle`, this always succeeds, else calling
473    /// is a logic error and the request is returned.
474    pub fn request(&mut self, request: Rq) -> Result<(), Error> {
475        if State::Idle == self.channel.state.load(Ordering::Acquire) {
476            unsafe {
477                self.with_data_mut(|i| *i = Message::from_rq(request));
478            }
479            self.channel
480                .state
481                .store(State::Requested as u8, Ordering::Release);
482            Ok(())
483        } else {
484            Err(Error)
485        }
486    }
487
488    /// Attempt to cancel a request.
489    ///
490    /// If the responder has not taken the request yet, this succeeds and returns
491    /// the request.
492    ///
493    /// If the responder has taken the request (is processing), we succeed and return None.
494    ///
495    /// In other cases (`Idle` or `Reponsed`) there is nothing to cancel and we fail.
496    pub fn cancel(&mut self) -> Result<Option<Rq>, Error> {
497        if self
498            .channel
499            .transition(State::BuildingResponse, State::Canceled)
500        {
501            // we canceled after the responder took the request, but before they answered.
502            return Ok(None);
503        }
504
505        if self.channel.transition(State::Requested, State::Idle) {
506            // we canceled before the responder was even aware of the request.
507            return Ok(Some(unsafe { self.with_data_mut(|i| i.take_rq()) }));
508        }
509
510        Err(Error)
511    }
512
513    /// If there is a response waiting, obtain a reference to it
514    ///
515    /// This may be called multiple times.
516    // Safety: We cannot test this with loom efficiently, but given that `with_response` is tested,
517    // this is likely correct
518    #[cfg(not(loom))]
519    pub fn response(&self) -> Result<&Rp, Error> {
520        if self.channel.transition(State::Responded, State::Responded) {
521            Ok(unsafe { self.data().rp_ref() })
522        } else {
523            Err(Error)
524        }
525    }
526
527    /// If there is a request waiting, perform an operation with a reference to it
528    ///
529    /// This may be called multiple times.
530    pub fn with_response<R>(&self, f: impl FnOnce(&Rp) -> R) -> Result<R, Error> {
531        if self.channel.transition(State::Responded, State::Responded) {
532            Ok(unsafe { self.with_data(|i| f(i.rp_ref())) })
533        } else {
534            Err(Error)
535        }
536    }
537
538    /// Look for a response.
539    /// If the responder has sent a response, we return it.
540    ///
541    /// This may be called only once as it move the state to Idle.
542    /// If you need copies, clone the request.
543    // It is a logic error to call this method if we're Idle or Canceled, but
544    // it seems unnecessary to model this.
545    pub fn take_response(&mut self) -> Option<Rp> {
546        if self.channel.transition(State::Responded, State::Idle) {
547            Some(unsafe { self.with_data_mut(|i| i.take_rp()) })
548        } else {
549            None
550        }
551    }
552}
553
554impl<Rq, Rp> Requester<'_, Rq, Rp>
555where
556    Rq: Default,
557{
558    /// Initialize a request with its default values and mutates it with `f`
559    ///
560    /// This is usefull to build large structures in-place
561    pub fn with_request_mut<R>(&mut self, f: impl FnOnce(&mut Rq) -> R) -> Result<R, Error> {
562        if self.channel.transition(State::Idle, State::BuildingRequest)
563            || self
564                .channel
565                .transition(State::BuildingRequest, State::BuildingRequest)
566        {
567            let res = unsafe {
568                self.with_data_mut(|i| {
569                    if !i.is_request_state() {
570                        *i = Message::from_rq(Rq::default());
571                    }
572                    f(i.rq_mut())
573                })
574            };
575            Ok(res)
576        } else {
577            Err(Error)
578        }
579    }
580
581    /// Initialize a request with its default values and and return a mutable reference to it
582    ///
583    /// This is usefull to build large structures in-place
584    // Safety: We cannot test this with loom efficiently, but given that `with_request_mut` is tested,
585    // this is likely correct
586    #[cfg(not(loom))]
587    pub fn request_mut(&mut self) -> Result<&mut Rq, Error> {
588        if self.channel.transition(State::Idle, State::BuildingRequest)
589            || self
590                .channel
591                .transition(State::BuildingRequest, State::BuildingRequest)
592        {
593            unsafe {
594                self.with_data_mut(|i| {
595                    if !i.is_request_state() {
596                        *i = Message::from_rq(Rq::default());
597                    }
598                })
599            }
600            Ok(unsafe { self.data_mut().rq_mut() })
601        } else {
602            Err(Error)
603        }
604    }
605
606    /// Send a request that was already placed in the channel using `request_mut` or
607    /// `with_request_mut`.
608    pub fn send_request(&mut self) -> Result<(), Error> {
609        if State::BuildingRequest == self.channel.state.load(Ordering::Acquire)
610            && self
611                .channel
612                .transition(State::BuildingRequest, State::Requested)
613        {
614            Ok(())
615        } else {
616            // logic error
617            Err(Error)
618        }
619    }
620}
621
622/// Responder end of a channel
623///
624/// For a `static` [`Channel`]() or [`Interchange`](),
625/// the responder uses a `'static` lifetime parameter
626pub struct Responder<'i, Rq, Rp> {
627    channel: &'i Channel<Rq, Rp>,
628}
629
630impl<Rq, Rp> Drop for Responder<'_, Rq, Rp> {
631    fn drop(&mut self) {
632        self.channel
633            .responder_claimed
634            .store(false, Ordering::Release);
635    }
636}
637
638impl<'i, Rq, Rp> Responder<'i, Rq, Rp> {
639    pub fn channel(&self) -> &'i Channel<Rq, Rp> {
640        self.channel
641    }
642
643    #[cfg(not(loom))]
644    unsafe fn data(&self) -> &Message<Rq, Rp> {
645        &mut *self.channel.data.get()
646    }
647
648    #[cfg(not(loom))]
649    unsafe fn data_mut(&mut self) -> &mut Message<Rq, Rp> {
650        &mut *self.channel.data.get()
651    }
652
653    #[cfg(not(loom))]
654    unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
655        f(&*self.channel.data.get())
656    }
657
658    #[cfg(not(loom))]
659    unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
660        f(&mut *self.channel.data.get())
661    }
662
663    #[cfg(loom)]
664    unsafe fn with_data<R>(&self, f: impl FnOnce(&Message<Rq, Rp>) -> R) -> R {
665        self.channel.data.with(|i| f(&*i))
666    }
667
668    #[cfg(loom)]
669    unsafe fn with_data_mut<R>(&mut self, f: impl FnOnce(&mut Message<Rq, Rp>) -> R) -> R {
670        self.channel.data.with_mut(|i| f(&mut *i))
671    }
672
673    #[inline]
674    /// Current state of the channel.
675    ///
676    /// Informational only!
677    ///
678    /// The responder may change this state between calls,
679    /// internally atomics ensure correctness.
680    pub fn state(&self) -> State {
681        State::from(self.channel.state.load(Ordering::Acquire))
682    }
683
684    /// If there is a request waiting, perform an operation with a reference to it
685    ///
686    /// This may be called only once as it move the state to BuildingResponse.
687    /// If you need copies, use `take_request`
688    pub fn with_request<R>(&self, f: impl FnOnce(&Rq) -> R) -> Result<R, Error> {
689        if self
690            .channel
691            .transition(State::Requested, State::BuildingResponse)
692        {
693            Ok(unsafe { self.with_data(|i| f(i.rq_ref())) })
694        } else {
695            Err(Error)
696        }
697    }
698
699    /// If there is a request waiting, obtain a reference to it
700    ///
701    /// This may be called multiple times.
702    // Safety: We cannot test this with loom efficiently, but given that `with_request` is tested,
703    // this is likely correct
704    #[cfg(not(loom))]
705    pub fn request(&self) -> Result<&Rq, Error> {
706        if self
707            .channel
708            .transition(State::Requested, State::BuildingResponse)
709        {
710            Ok(unsafe { self.data().rq_ref() })
711        } else {
712            Err(Error)
713        }
714    }
715
716    /// If there is a request waiting, take a reference to it out
717    ///
718    /// This may be called only once as it move the state to BuildingResponse.
719    /// If you need copies, clone the request.
720    pub fn take_request(&mut self) -> Option<Rq> {
721        if self
722            .channel
723            .transition(State::Requested, State::BuildingResponse)
724        {
725            Some(unsafe { self.with_data_mut(|i| i.take_rq()) })
726        } else {
727            None
728        }
729    }
730
731    // Check if requester attempted to cancel
732    pub fn is_canceled(&self) -> bool {
733        self.channel.state.load(Ordering::SeqCst) == State::Canceled as u8
734    }
735
736    // Acknowledge a cancel, thereby setting Channel to Idle state again.
737    //
738    // It is a logic error to call this method if there is no pending cancellation.
739    pub fn acknowledge_cancel(&self) -> Result<(), Error> {
740        if self.channel.transition(State::Canceled, State::Idle) {
741            Ok(())
742        } else {
743            Err(Error)
744        }
745    }
746
747    /// Respond to a request.
748    ///
749    /// If efficiency is a concern, or responses need multiple steps to
750    /// construct, use `with_response_mut` or `response_mut` and `send_response`.
751    ///
752    pub fn respond(&mut self, response: Rp) -> Result<(), Error> {
753        if State::BuildingResponse == self.channel.state.load(Ordering::Acquire) {
754            unsafe {
755                self.with_data_mut(|i| *i = Message::from_rp(response));
756            }
757            if self
758                .channel
759                .transition(State::BuildingResponse, State::Responded)
760            {
761                Ok(())
762            } else {
763                Err(Error)
764            }
765        } else {
766            Err(Error)
767        }
768    }
769}
770
771impl<Rq, Rp> Responder<'_, Rq, Rp>
772where
773    Rp: Default,
774{
775    /// Initialize a response with its default values and mutates it with `f`
776    ///
777    /// This is usefull to build large structures in-place
778    pub fn with_response_mut<R>(&mut self, f: impl FnOnce(&mut Rp) -> R) -> Result<R, Error> {
779        if self
780            .channel
781            .transition(State::Requested, State::BuildingResponse)
782            || self
783                .channel
784                .transition(State::BuildingResponse, State::BuildingResponse)
785        {
786            let res = unsafe {
787                self.with_data_mut(|i| {
788                    if !i.is_response_state() {
789                        *i = Message::from_rp(Rp::default());
790                    }
791                    f(i.rp_mut())
792                })
793            };
794            Ok(res)
795        } else {
796            Err(Error)
797        }
798    }
799
800    /// Initialize a response with its default values and and return a mutable reference to it
801    ///
802    /// This is usefull to build large structures in-place
803    // Safety: We cannot test this with loom efficiently, but given that `with_response_mut` is tested,
804    // this is likely correct
805    #[cfg(not(loom))]
806    pub fn response_mut(&mut self) -> Result<&mut Rp, Error> {
807        if self
808            .channel
809            .transition(State::Requested, State::BuildingResponse)
810            || self
811                .channel
812                .transition(State::BuildingResponse, State::BuildingResponse)
813        {
814            unsafe {
815                self.with_data_mut(|i| {
816                    if !i.is_response_state() {
817                        *i = Message::from_rp(Rp::default());
818                    }
819                })
820            }
821            Ok(unsafe { self.data_mut().rp_mut() })
822        } else {
823            Err(Error)
824        }
825    }
826
827    /// Send a response that was already placed in the channel using `response_mut` or
828    /// `with_response_mut`.
829    pub fn send_response(&mut self) -> Result<(), Error> {
830        if State::BuildingResponse == self.channel.state.load(Ordering::Acquire)
831            && self
832                .channel
833                .transition(State::BuildingResponse, State::Responded)
834        {
835            Ok(())
836        } else {
837            // logic error
838            Err(Error)
839        }
840    }
841}
842
843// Safety: The channel can be split, which then allows getting sending the Rq and Rp types across threads
844// TODO: is the Sync bound really necessary?
845unsafe impl<Rq, Rp> Sync for Channel<Rq, Rp>
846where
847    Rq: Send + Sync,
848    Rp: Send + Sync,
849{
850}
851
852/// Set of `N` channels
853///
854/// Channels can be claimed with [`claim()`](Self::claim)
855///
856/// ```
857/// # #![cfg(not(loom))]
858/// # use interchange::*;
859/// # #[derive(Clone, Debug, PartialEq)]
860/// # pub enum Request {
861/// #     This(u8, u32),
862/// #     That(i64),
863/// # }
864/// #
865/// # #[derive(Clone, Debug, PartialEq)]
866/// # pub enum Response {
867/// #     Here(u8, u8, u8),
868/// #     There(i16),
869/// # }
870/// #
871/// static interchange: Interchange<Request, Response,10> = Interchange::new();
872///
873/// for i in 0..10 {
874///     let rq: Requester<'_, Request, Response>;
875///     let rp: Responder<'_, Request, Response>;
876///     (rq, rp) = interchange.claim().unwrap() ;
877/// }
878/// ```
879pub struct Interchange<Rq, Rp, const N: usize> {
880    channels: [Channel<Rq, Rp>; N],
881    last_claimed: AtomicUsize,
882}
883
884impl<Rq, Rp, const N: usize> Interchange<Rq, Rp, N> {
885    /// Create a new Interchange
886    #[cfg(not(loom))]
887    pub const fn new() -> Self {
888        Self {
889            channels: [const { Channel::new() }; N],
890            last_claimed: AtomicUsize::new(0),
891        }
892    }
893
894    /// Create a new Interchange
895    #[cfg(loom)]
896    pub fn new() -> Self {
897        Self {
898            channels: core::array::from_fn(|_| Channel::new()),
899            last_claimed: AtomicUsize::new(0),
900        }
901    }
902
903    /// Claim one of the channels of the interchange. Returns None if called more than `N` times.
904    pub fn claim(&self) -> Option<(Requester<Rq, Rp>, Responder<Rq, Rp>)> {
905        self.as_interchange_ref().claim()
906    }
907
908    /// Returns a reference to the interchange with the `N` const-generic removed.
909    /// This can avoid the requirement to have `const N: usize` everywhere
910    /// ```
911    /// # #![cfg(not(loom))]
912    /// # use interchange::{State, Interchange, InterchangeRef};
913    /// # #[derive(Clone, Debug, PartialEq)]
914    /// # pub enum Request {
915    /// #     This(u8, u32),
916    /// #     That(i64),
917    /// # }
918    /// # #[derive(Clone, Debug, PartialEq)]
919    /// # pub enum Response {
920    /// #     Here(u8, u8, u8),
921    /// #     There(i16),
922    /// # }
923    /// static INTERCHANGE_INNER: Interchange<Request, Response, 1> = Interchange::new();
924    ///
925    /// // The size of the interchange is absent from the type
926    /// static INTERCHANGE: InterchangeRef<'static, Request, Response> = INTERCHANGE_INNER.as_interchange_ref();
927    ///
928    /// let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
929    /// ```
930    pub const fn as_interchange_ref(&self) -> InterchangeRef<'_, Rq, Rp> {
931        InterchangeRef {
932            channels: &self.channels,
933            last_claimed: &self.last_claimed,
934        }
935    }
936}
937
938/// Interchange witout the `const N: usize` generic parameter
939/// Obtained using [`Interchange::as_interchange_ref`](Interchange::as_interchange_ref)
940pub struct InterchangeRef<'alloc, Rq, Rp> {
941    channels: &'alloc [Channel<Rq, Rp>],
942    last_claimed: &'alloc AtomicUsize,
943}
944
945impl<'alloc, Rq, Rp> InterchangeRef<'alloc, Rq, Rp> {
946    /// Claim one of the channels of the interchange. Returns None if called more than `N` times.
947    pub fn claim(&self) -> Option<(Requester<'alloc, Rq, Rp>, Responder<'alloc, Rq, Rp>)> {
948        let index = self.last_claimed.fetch_add(1, Ordering::Relaxed);
949        let n = self.channels.len();
950
951        for i in (index % n)..n {
952            let tmp = self.channels[i].split();
953            if tmp.is_some() {
954                return tmp;
955            }
956        }
957
958        for i in 0..(index % n) {
959            let tmp = self.channels[i].split();
960            if tmp.is_some() {
961                return tmp;
962            }
963        }
964        None
965    }
966}
967
968impl<Rq, Rp> Clone for InterchangeRef<'_, Rq, Rp> {
969    fn clone(&self) -> Self {
970        *self
971    }
972}
973
974impl<Rq, Rp> Copy for InterchangeRef<'_, Rq, Rp> {}
975
976impl<Rq, Rp, const N: usize> Default for Interchange<Rq, Rp, N> {
977    fn default() -> Self {
978        Self::new()
979    }
980}
981
982/// ```compile_fail
983/// use std::rc::Rc;
984/// use interchange::*;
985/// #[allow(unconditional_recursion, unused)]
986/// fn assert_send<T: Send>() {
987///     assert_send::<Channel<Rc<String>, u32>>();
988/// }
989/// ```
990/// ```compile_fail
991/// use std::rc::Rc;
992/// use interchange::*;
993/// #[allow(unconditional_recursion, unused)]
994/// fn assert_send<T: Send>() {
995///     assert_send::<Requester<Rc<String>, u32>>();
996/// }
997/// ```
998/// ```compile_fail
999/// use std::rc::Rc;
1000/// use interchange::*;
1001/// #[allow(unconditional_recursion, unused)]
1002/// fn assert_send<T: Send>() {
1003///     assert_send::<Responder<Rc<String>, u32>>();
1004/// }
1005/// ```
1006/// ```compile_fail
1007/// use std::rc::Rc;
1008/// use interchange::*;
1009/// #[allow(unconditional_recursion, unused)]
1010/// fn assert_sync<T: Sync>() {
1011///     assert_sync::<Channel<Rc<String>, u32>>();
1012/// }
1013/// ```
1014/// ```compile_fail
1015/// use std::rc::Rc;
1016/// use interchange::*;
1017/// #[allow(unconditional_recursion, unused)]
1018/// fn assert_sync<T: Sync>() {
1019///     assert_sync::<Requester<Rc<String>, u32>>();
1020/// }
1021/// ```
1022/// ```compile_fail
1023/// use std::rc::Rc;
1024/// use interchange::*;
1025/// #[allow(unconditional_recursion, unused)]
1026/// fn assert_sync<T: Sync>() {
1027///     assert_sync::<Responder<Rc<String>, u32>>();
1028/// }
1029/// ```
1030const _ASSERT_COMPILE_FAILS: () = {};
1031
1032#[cfg(all(not(loom), test))]
1033mod tests {
1034    use super::*;
1035    #[derive(Clone, Debug, PartialEq)]
1036    pub enum Request {
1037        This(u8, u32),
1038    }
1039    #[derive(Clone, Debug, PartialEq)]
1040    pub enum Response {
1041        Here(u8, u8, u8),
1042        There(i16),
1043    }
1044    impl Default for Response {
1045        fn default() -> Self {
1046            Response::There(1)
1047        }
1048    }
1049    impl Default for Request {
1050        fn default() -> Self {
1051            Request::This(0, 0)
1052        }
1053    }
1054
1055    #[test]
1056    fn interchange() {
1057        static INTERCHANGE: Interchange<Request, Response, 1> = Interchange::new();
1058        let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
1059        assert_eq!(rq.state(), State::Idle);
1060        // happy path: no cancelation
1061        let request = Request::This(1, 2);
1062        assert!(rq.request(request).is_ok());
1063        let request = rp.take_request().unwrap();
1064        println!("rp got request: {request:?}");
1065        let response = Response::There(-1);
1066        assert!(!rp.is_canceled());
1067        assert!(rp.respond(response).is_ok());
1068        let response = rq.take_response().unwrap();
1069        println!("rq got response: {response:?}");
1070        // early cancelation path
1071        assert!(rq.request(request).is_ok());
1072        let request = rq.cancel().unwrap().unwrap();
1073        println!("responder could cancel: {request:?}");
1074        assert!(rp.take_request().is_none());
1075        assert_eq!(State::Idle, rq.state());
1076        // late cancelation
1077        assert!(rq.request(request).is_ok());
1078        let request = rp.take_request().unwrap();
1079        println!(
1080            "responder could cancel: {:?}",
1081            &rq.cancel().unwrap().is_none()
1082        );
1083        assert_eq!(request, Request::This(1, 2));
1084        assert!(rp.is_canceled());
1085        assert!(rp.respond(response).is_err());
1086        assert!(rp.acknowledge_cancel().is_ok());
1087        assert_eq!(State::Idle, rq.state());
1088        // building into request buffer
1089        rq.with_request_mut(|r| *r = Request::This(1, 2)).unwrap();
1090        assert!(rq.send_request().is_ok());
1091        let request = rp.take_request().unwrap();
1092        assert_eq!(request, Request::This(1, 2));
1093        println!("rp got request: {request:?}");
1094        // building into response buffer
1095        rp.with_response_mut(|r| *r = Response::Here(3, 2, 1))
1096            .unwrap();
1097        assert!(rp.send_response().is_ok());
1098        let response = rq.take_response().unwrap();
1099        assert_eq!(response, Response::Here(3, 2, 1));
1100    }
1101
1102    #[test]
1103    fn interchange_ref() {
1104        static INTERCHANGE_INNER: Interchange<Request, Response, 1> = Interchange::new();
1105        static INTERCHANGE: InterchangeRef<'static, Request, Response> =
1106            INTERCHANGE_INNER.as_interchange_ref();
1107        let (mut rq, mut rp) = INTERCHANGE.claim().unwrap();
1108        assert_eq!(rq.state(), State::Idle);
1109        // happy path: no cancelation
1110        let request = Request::This(1, 2);
1111        assert!(rq.request(request).is_ok());
1112        let request = rp.take_request().unwrap();
1113        println!("rp got request: {request:?}");
1114        let response = Response::There(-1);
1115        assert!(!rp.is_canceled());
1116        assert!(rp.respond(response).is_ok());
1117        let response = rq.take_response().unwrap();
1118        println!("rq got response: {response:?}");
1119        // early cancelation path
1120        assert!(rq.request(request).is_ok());
1121        let request = rq.cancel().unwrap().unwrap();
1122        println!("responder could cancel: {request:?}");
1123        assert!(rp.take_request().is_none());
1124        assert_eq!(State::Idle, rq.state());
1125        // late cancelation
1126        assert!(rq.request(request).is_ok());
1127        let request = rp.take_request().unwrap();
1128        println!(
1129            "responder could cancel: {:?}",
1130            &rq.cancel().unwrap().is_none()
1131        );
1132        assert_eq!(request, Request::This(1, 2));
1133        assert!(rp.is_canceled());
1134        assert!(rp.respond(response).is_err());
1135        assert!(rp.acknowledge_cancel().is_ok());
1136        assert_eq!(State::Idle, rq.state());
1137        // building into request buffer
1138        rq.with_request_mut(|r| *r = Request::This(1, 2)).unwrap();
1139        assert!(rq.send_request().is_ok());
1140        let request = rp.take_request().unwrap();
1141        assert_eq!(request, Request::This(1, 2));
1142        println!("rp got request: {request:?}");
1143        // building into response buffer
1144        rp.with_response_mut(|r| *r = Response::Here(3, 2, 1))
1145            .unwrap();
1146        assert!(rp.send_response().is_ok());
1147        let response = rq.take_response().unwrap();
1148        assert_eq!(response, Response::Here(3, 2, 1));
1149    }
1150
1151    #[allow(unconditional_recursion, clippy::extra_unused_type_parameters, unused)]
1152    fn assert_send<T: Send>() {
1153        assert_send::<Channel<String, u32>>();
1154        assert_send::<Responder<'static, String, u32>>();
1155        assert_send::<Requester<'static, String, u32>>();
1156        assert_send::<Channel<&'static mut String, u32>>();
1157        assert_send::<Responder<'static, &'static mut String, u32>>();
1158        assert_send::<Requester<'static, &'static mut String, u32>>();
1159    }
1160    #[allow(unconditional_recursion, clippy::extra_unused_type_parameters, unused)]
1161    fn assert_sync<T: Sync>() {
1162        assert_sync::<Channel<String, u32>>();
1163        assert_sync::<Channel<String, u32>>();
1164        assert_sync::<Responder<'static, String, u32>>();
1165        assert_sync::<Requester<'static, String, u32>>();
1166
1167        assert_sync::<Channel<&'static mut String, u32>>();
1168        assert_sync::<Responder<'static, &'static mut String, u32>>();
1169        assert_sync::<Requester<'static, &'static mut String, u32>>();
1170    }
1171}