Skip to main content

vox_types/
channel.rs

1#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
2use std::collections::HashMap;
3use std::collections::VecDeque;
4use std::marker::PhantomData;
5use std::panic::Location;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::Mutex;
9#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
10use std::sync::OnceLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use facet::Facet;
14#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
15use facet_core::ConstTypeId;
16use facet_core::PtrConst;
17#[cfg(target_arch = "wasm32")]
18use moire::sync::TryAcquireError;
19use moire::sync::{Notify, Semaphore};
20#[cfg(not(target_arch = "wasm32"))]
21use tokio::sync::TryAcquireError;
22
23use crate::{Backing, ChannelClose, ChannelItem, ChannelReset, Metadata, Payload, SelfRef};
24use crate::{
25    ChannelCloseReason, ChannelDebugContext, ChannelEvent, ChannelEventContext, ChannelResetReason,
26    ChannelSendOutcome, ChannelTrySendOutcome, ConnectionCloseReason, SourceLocation,
27    VoxObserverHandle,
28};
29use crate::{ChannelId, ConnectionId};
30
31#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
32struct ChannelDecodePlan {
33    plan: vox_postcard::TranslationPlan,
34    registry: vox_schema::SchemaRegistry,
35}
36
37// ---------------------------------------------------------------------------
38// Thread-local channel binder — set during deserialization so TryFrom impls
39// can bind channels immediately.
40// ---------------------------------------------------------------------------
41
42std::thread_local! {
43    static CHANNEL_BINDER: std::cell::RefCell<Option<&'static dyn ChannelBinder>> =
44        const { std::cell::RefCell::new(None) };
45}
46
47/// Set the thread-local channel binder for the duration of `f`.
48///
49/// Any `Tx<T>` or `Rx<T>` deserialized (via `TryFrom<ChannelId>`) during `f`
50/// will be bound through this binder.
51pub fn with_channel_binder<R>(binder: &dyn ChannelBinder, f: impl FnOnce() -> R) -> R {
52    let _guard = set_channel_binder(binder);
53    f()
54}
55
56/// Set the thread-local channel binder, returning a guard that restores
57/// the previous value on drop.
58///
59/// Prefer this over [`with_channel_binder`] when the code that runs under
60/// the binder needs to return borrowed data (closures can't return borrows
61/// from captures).
62pub fn set_channel_binder(binder: &dyn ChannelBinder) -> ChannelBinderGuard<'_> {
63    // SAFETY: we restore the previous value on drop (via ChannelBinderGuard),
64    // so the binder reference doesn't escape the guard's lifetime.
65    #[allow(unsafe_code)]
66    let static_ref: &'static dyn ChannelBinder = unsafe { std::mem::transmute(binder) };
67    let prev = CHANNEL_BINDER.with(|cell| cell.borrow_mut().replace(static_ref));
68    ChannelBinderGuard {
69        prev,
70        _lifetime: std::marker::PhantomData,
71    }
72}
73
74/// RAII guard that restores the previous thread-local channel binder on drop.
75pub struct ChannelBinderGuard<'a> {
76    prev: Option<&'static dyn ChannelBinder>,
77    _lifetime: std::marker::PhantomData<&'a dyn ChannelBinder>,
78}
79
80impl Drop for ChannelBinderGuard<'_> {
81    fn drop(&mut self) {
82        CHANNEL_BINDER.with(|cell| {
83            *cell.borrow_mut() = self.prev.take();
84        });
85    }
86}
87
88// r[impl rpc.channel.pair]
89/// The binding stored in a channel core — either a sink or a receiver, never both.
90pub enum ChannelBinding {
91    Sink(BoundChannelSink),
92    Receiver(BoundChannelReceiver),
93}
94
95pub trait ChannelLiveness: crate::MaybeSend + crate::MaybeSync + 'static {}
96
97impl<T: crate::MaybeSend + crate::MaybeSync + 'static> ChannelLiveness for T {}
98
99pub type ChannelLivenessHandle = Arc<dyn ChannelLiveness>;
100
101pub trait ChannelCreditReplenisher: crate::MaybeSend + crate::MaybeSync + 'static {
102    fn on_item_consumed(&self);
103
104    fn on_receiver_dropped(&self) {}
105
106    fn channel_id(&self) -> Option<ChannelId> {
107        None
108    }
109
110    fn connection_id(&self) -> Option<ConnectionId> {
111        None
112    }
113
114    fn debug_context(&self) -> Option<ChannelDebugContext> {
115        None
116    }
117
118    fn observer(&self) -> Option<VoxObserverHandle> {
119        None
120    }
121}
122
123pub type ChannelCreditReplenisherHandle = Arc<dyn ChannelCreditReplenisher>;
124
125#[derive(Clone)]
126pub struct BoundChannelSink {
127    pub sink: Arc<dyn ChannelSink>,
128    pub liveness: Option<ChannelLivenessHandle>,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ChannelMailboxStats {
133    pub len: usize,
134    pub capacity: usize,
135    pub receiver_closed: bool,
136    pub sender_count: usize,
137}
138
139pub struct ChannelMailboxSendError<T> {
140    item: T,
141}
142
143impl<T> std::fmt::Debug for ChannelMailboxSendError<T> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("ChannelMailboxSendError")
146            .finish_non_exhaustive()
147    }
148}
149
150impl<T> ChannelMailboxSendError<T> {
151    pub fn into_inner(self) -> T {
152        self.item
153    }
154}
155
156struct ChannelMailboxState<T> {
157    inner: Mutex<ChannelMailboxInner<T>>,
158    not_empty: Notify,
159    not_full: Notify,
160}
161
162struct ChannelMailboxInner<T> {
163    queue: VecDeque<T>,
164    capacity: usize,
165    receiver_closed: bool,
166    sender_count: usize,
167}
168
169// r[impl rpc.channel.delivery.reliable]
170pub struct ChannelMailboxSender<T> {
171    state: Arc<ChannelMailboxState<T>>,
172}
173
174pub struct ChannelMailboxReceiver<T> {
175    state: Arc<ChannelMailboxState<T>>,
176}
177
178pub fn channel_mailbox<T>(
179    name: &'static str,
180    capacity: usize,
181) -> (ChannelMailboxSender<T>, ChannelMailboxReceiver<T>) {
182    assert!(capacity > 0, "channel mailbox capacity must be non-zero");
183    let state = Arc::new(ChannelMailboxState {
184        inner: Mutex::new(ChannelMailboxInner {
185            queue: VecDeque::with_capacity(capacity),
186            capacity,
187            receiver_closed: false,
188            sender_count: 1,
189        }),
190        not_empty: Notify::new(name),
191        not_full: Notify::new(name),
192    });
193    (
194        ChannelMailboxSender {
195            state: Arc::clone(&state),
196        },
197        ChannelMailboxReceiver { state },
198    )
199}
200
201impl<T> Clone for ChannelMailboxSender<T> {
202    fn clone(&self) -> Self {
203        let mut guard = self
204            .state
205            .inner
206            .lock()
207            .expect("channel mailbox mutex poisoned");
208        guard.sender_count = guard.sender_count.saturating_add(1);
209        drop(guard);
210        Self {
211            state: Arc::clone(&self.state),
212        }
213    }
214}
215
216impl<T> Drop for ChannelMailboxSender<T> {
217    fn drop(&mut self) {
218        let mut guard = self
219            .state
220            .inner
221            .lock()
222            .expect("channel mailbox mutex poisoned");
223        guard.sender_count = guard.sender_count.saturating_sub(1);
224        let closed = guard.sender_count == 0;
225        drop(guard);
226        if closed {
227            self.state.not_empty.notify_waiters();
228            self.state.not_full.notify_waiters();
229        }
230    }
231}
232
233impl<T> ChannelMailboxSender<T> {
234    pub async fn send(&self, item: T) -> Result<(), ChannelMailboxSendError<T>> {
235        let mut item = Some(item);
236        loop {
237            let notified = {
238                let mut guard = self
239                    .state
240                    .inner
241                    .lock()
242                    .expect("channel mailbox mutex poisoned");
243                if guard.receiver_closed {
244                    return Err(ChannelMailboxSendError {
245                        item: item.take().expect("mailbox item already sent"),
246                    });
247                }
248                if guard.queue.len() < guard.capacity {
249                    guard
250                        .queue
251                        .push_back(item.take().expect("mailbox item already sent"));
252                    drop(guard);
253                    self.state.not_empty.notify_waiters();
254                    return Ok(());
255                }
256                self.state.not_full.notified()
257            };
258            notified.await;
259        }
260    }
261
262    pub fn force_send(&self, item: T) -> Result<(), ChannelMailboxSendError<T>> {
263        let mut guard = self
264            .state
265            .inner
266            .lock()
267            .expect("channel mailbox mutex poisoned");
268        if guard.receiver_closed {
269            return Err(ChannelMailboxSendError { item });
270        }
271        guard.queue.push_back(item);
272        drop(guard);
273        self.state.not_empty.notify_waiters();
274        Ok(())
275    }
276
277    pub fn stats(&self) -> ChannelMailboxStats {
278        self.state.stats()
279    }
280}
281
282impl<T> Drop for ChannelMailboxReceiver<T> {
283    fn drop(&mut self) {
284        let mut guard = self
285            .state
286            .inner
287            .lock()
288            .expect("channel mailbox mutex poisoned");
289        guard.receiver_closed = true;
290        guard.queue.clear();
291        drop(guard);
292        self.state.not_full.notify_waiters();
293        self.state.not_empty.notify_waiters();
294    }
295}
296
297impl<T> ChannelMailboxReceiver<T> {
298    pub async fn recv(&mut self) -> Option<T> {
299        loop {
300            let notified = {
301                let mut guard = self
302                    .state
303                    .inner
304                    .lock()
305                    .expect("channel mailbox mutex poisoned");
306                if let Some(item) = guard.queue.pop_front() {
307                    drop(guard);
308                    self.state.not_full.notify_waiters();
309                    return Some(item);
310                }
311                if guard.sender_count == 0 {
312                    return None;
313                }
314                self.state.not_empty.notified()
315            };
316            notified.await;
317        }
318    }
319
320    pub fn stats(&self) -> ChannelMailboxStats {
321        self.state.stats()
322    }
323}
324
325impl<T> ChannelMailboxState<T> {
326    fn stats(&self) -> ChannelMailboxStats {
327        let guard = self.inner.lock().expect("channel mailbox mutex poisoned");
328        ChannelMailboxStats {
329            len: guard.queue.len(),
330            capacity: guard.capacity,
331            receiver_closed: guard.receiver_closed,
332            sender_count: guard.sender_count,
333        }
334    }
335}
336
337pub struct BoundChannelReceiver {
338    pub receiver: ChannelMailboxReceiver<IncomingChannelMessage>,
339    pub liveness: Option<ChannelLivenessHandle>,
340    pub replenisher: Option<ChannelCreditReplenisherHandle>,
341}
342
343struct LogicalReceiverState {
344    generation: u64,
345    liveness: Option<ChannelLivenessHandle>,
346    replenisher: Option<ChannelCreditReplenisherHandle>,
347    sender: Option<ChannelMailboxSender<LogicalIncomingChannelMessage>>,
348    receiver: Option<ChannelMailboxReceiver<LogicalIncomingChannelMessage>>,
349}
350
351// r[impl rpc.channel.pair]
352/// Shared state between a `Tx`/`Rx` pair created by `channel()`.
353///
354/// Contains a `Mutex<Option<ChannelBinding>>` that is written once during
355/// binding and read/taken by the paired handle. The mutex is only locked
356/// during binding (once) and on first use by the paired handle (once).
357pub struct ChannelCore {
358    binding: Mutex<Option<ChannelBinding>>,
359    logical_receiver: Mutex<Option<LogicalReceiverState>>,
360    binding_changed: Notify,
361    debug_context: ChannelDebugContext,
362}
363
364impl ChannelCore {
365    fn new(debug_context: ChannelDebugContext) -> Self {
366        Self {
367            binding: Mutex::new(None),
368            logical_receiver: Mutex::new(None),
369            binding_changed: Notify::new("vox_types.channel.binding_changed"),
370            debug_context,
371        }
372    }
373
374    /// Store or replace a binding in the core.
375    pub fn set_binding(&self, binding: ChannelBinding) {
376        let mut guard = self.binding.lock().expect("channel core mutex poisoned");
377        *guard = Some(binding);
378        self.binding_changed.notify_waiters();
379    }
380
381    /// Clone the sink from the core (for Tx reading the sink).
382    /// Returns None if no sink has been set or if the binding is a Receiver.
383    pub fn get_sink(&self) -> Option<Arc<dyn ChannelSink>> {
384        let guard = self.binding.lock().expect("channel core mutex poisoned");
385        match guard.as_ref() {
386            Some(ChannelBinding::Sink(bound)) => Some(bound.sink.clone()),
387            _ => None,
388        }
389    }
390
391    /// Take the receiver out of the core (for Rx on first recv).
392    /// Returns None if no receiver has been set or if it was already taken.
393    pub fn take_receiver(&self) -> Option<BoundChannelReceiver> {
394        let mut guard = self.binding.lock().expect("channel core mutex poisoned");
395        match guard.take() {
396            Some(ChannelBinding::Receiver(bound)) => Some(bound),
397            other => {
398                // Put it back if it wasn't a receiver
399                *guard = other;
400                None
401            }
402        }
403    }
404
405    pub fn bind_retryable_receiver(self: &Arc<Self>, bound: BoundChannelReceiver) {
406        #[cfg(not(target_arch = "wasm32"))]
407        if tokio::runtime::Handle::try_current().is_err() {
408            self.set_binding(ChannelBinding::Receiver(bound));
409            return;
410        }
411
412        let mut guard = self
413            .logical_receiver
414            .lock()
415            .expect("channel core logical receiver mutex poisoned");
416        let state = guard.get_or_insert_with(|| {
417            let (tx, rx) = channel_mailbox("vox_types.channel.logical_receiver", 64);
418            LogicalReceiverState {
419                generation: 0,
420                liveness: None,
421                replenisher: None,
422                sender: Some(tx),
423                receiver: Some(rx),
424            }
425        });
426        state.generation = state.generation.wrapping_add(1);
427        state.liveness = bound.liveness.clone();
428        state.replenisher = bound.replenisher.clone();
429        let generation = state.generation;
430
431        let Some(sender) = state.sender.clone() else {
432            return;
433        };
434
435        self.binding_changed.notify_waiters();
436
437        drop(guard);
438        let core = Arc::clone(self);
439
440        moire::task::spawn(async move {
441            let mut receiver = bound.receiver;
442            let replenisher = bound.replenisher.clone();
443            while let Some(msg) = receiver.recv().await {
444                let is_current_generation = {
445                    let guard = core
446                        .logical_receiver
447                        .lock()
448                        .expect("channel core logical receiver mutex poisoned");
449                    guard
450                        .as_ref()
451                        .map(|state| state.generation == generation)
452                        .unwrap_or(false)
453                };
454                if !is_current_generation {
455                    return;
456                }
457                let forwarded = LogicalIncomingChannelMessage {
458                    msg,
459                    replenisher: replenisher.clone(),
460                };
461                if sender.send(forwarded).await.is_err() {
462                    return;
463                }
464            }
465        });
466    }
467
468    pub fn take_logical_receiver(
469        &self,
470    ) -> Option<(
471        ChannelMailboxReceiver<LogicalIncomingChannelMessage>,
472        Option<ChannelLivenessHandle>,
473        Option<ChannelCreditReplenisherHandle>,
474    )> {
475        self.logical_receiver
476            .lock()
477            .expect("channel core logical receiver mutex poisoned")
478            .as_mut()
479            .and_then(|state| {
480                state
481                    .receiver
482                    .take()
483                    .map(|receiver| (receiver, state.liveness.clone(), state.replenisher.clone()))
484            })
485    }
486
487    pub fn finish_retry_binding(&self) {
488        let mut guard = self
489            .logical_receiver
490            .lock()
491            .expect("channel core logical receiver mutex poisoned");
492        if let Some(state) = guard.as_mut() {
493            if let Some(sender) = state.sender.as_ref() {
494                let close = SelfRef::owning(
495                    Backing::Boxed(Box::<[u8]>::default()),
496                    ChannelClose {
497                        metadata: Metadata::default(),
498                    },
499                );
500                let _ = sender.force_send(LogicalIncomingChannelMessage {
501                    msg: IncomingChannelMessage::Close(close),
502                    replenisher: None,
503                });
504            }
505            state.sender.take();
506        }
507        *guard = None;
508        let mut guard = self.binding.lock().expect("channel core mutex poisoned");
509        *guard = None;
510        self.binding_changed.notify_waiters();
511    }
512
513    pub fn debug_context(&self) -> ChannelDebugContext {
514        self.debug_context
515    }
516}
517
518/// Slot for the shared channel core, accessible via facet reflection.
519#[derive(Facet)]
520#[facet(opaque)]
521pub(crate) struct CoreSlot {
522    pub(crate) inner: Option<Arc<ChannelCore>>,
523}
524
525impl CoreSlot {
526    pub(crate) fn empty() -> Self {
527        Self { inner: None }
528    }
529}
530
531// r[impl rpc.channel.pair]
532// r[impl rpc.observability.channel.context]
533/// Create a channel pair with shared state.
534///
535/// Both ends hold an `Arc` reference to the same `ChannelCore`. The framework
536/// binds the handle that appears in args or return values, and the paired
537/// handle reads or takes the binding from the shared core.
538#[track_caller]
539pub fn channel<T>() -> (Tx<T>, Rx<T>) {
540    let caller = Location::caller();
541    let debug_context = ChannelDebugContext {
542        type_name: Some(std::any::type_name::<T>()),
543        source_location: Some(SourceLocation {
544            file: caller.file(),
545            line: caller.line(),
546            column: caller.column(),
547        }),
548        ..ChannelDebugContext::default()
549    };
550    let core = Arc::new(ChannelCore::new(debug_context));
551    (Tx::paired(core.clone()), Rx::paired(core))
552}
553
554fn merge_debug_context(
555    primary: Option<ChannelDebugContext>,
556    fallback: ChannelDebugContext,
557) -> Option<ChannelDebugContext> {
558    match (
559        primary.and_then(ChannelDebugContext::into_option),
560        fallback.into_option(),
561    ) {
562        (Some(primary), Some(fallback)) => ChannelDebugContext {
563            label: primary.label.or(fallback.label),
564            type_name: primary.type_name.or(fallback.type_name),
565            source_location: primary.source_location.or(fallback.source_location),
566            service: primary.service.or(fallback.service),
567            method: primary.method.or(fallback.method),
568        }
569        .into_option(),
570        (Some(primary), None) => Some(primary),
571        (None, fallback) => fallback,
572    }
573}
574
575fn sink_event_context(
576    sink: &dyn ChannelSink,
577    channel_id: ChannelId,
578    fallback: ChannelDebugContext,
579) -> ChannelEventContext {
580    ChannelEventContext {
581        connection_id: sink.connection_id(),
582        channel_id,
583        debug: merge_debug_context(sink.debug_context(), fallback),
584    }
585}
586
587fn replenisher_event_context(
588    replenisher: &dyn ChannelCreditReplenisher,
589    channel_id: ChannelId,
590    fallback: ChannelDebugContext,
591) -> ChannelEventContext {
592    ChannelEventContext {
593        connection_id: replenisher.connection_id(),
594        channel_id,
595        debug: merge_debug_context(replenisher.debug_context(), fallback),
596    }
597}
598
599fn observe_sink_channel(
600    sink: &dyn ChannelSink,
601    channel_id: Option<ChannelId>,
602    fallback: ChannelDebugContext,
603    event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
604) {
605    if let (Some(observer), Some(channel_id)) = (sink.observer(), channel_id) {
606        observer.channel_event(event(sink_event_context(sink, channel_id, fallback)));
607    }
608}
609
610fn observe_replenisher_channel(
611    replenisher: &dyn ChannelCreditReplenisher,
612    fallback: ChannelDebugContext,
613    event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
614) {
615    if let (Some(observer), Some(channel_id)) = (replenisher.observer(), replenisher.channel_id()) {
616        observer.channel_event(event(replenisher_event_context(
617            replenisher,
618            channel_id,
619            fallback,
620        )));
621    }
622}
623
624fn observe_optional_replenisher_channel(
625    replenisher: Option<&ChannelCreditReplenisherHandle>,
626    fallback: ChannelDebugContext,
627    event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
628) {
629    if let Some(replenisher) = replenisher {
630        observe_replenisher_channel(replenisher.as_ref(), fallback, event);
631    }
632}
633
634#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
635fn channel_decode_plan<T: Facet<'static>>() -> Arc<ChannelDecodePlan> {
636    static PLANS: OnceLock<Mutex<HashMap<ConstTypeId, Arc<ChannelDecodePlan>>>> = OnceLock::new();
637
638    let plans = PLANS.get_or_init(|| Mutex::new(HashMap::new()));
639    let key = T::SHAPE.id;
640    let mut plans = plans.lock().expect("channel decode plan mutex poisoned");
641    plans
642        .entry(key)
643        .or_insert_with(|| {
644            Arc::new(ChannelDecodePlan {
645                plan: vox_postcard::build_identity_plan(T::SHAPE),
646                registry: vox_schema::SchemaRegistry::new(),
647            })
648        })
649        .clone()
650}
651
652#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
653fn decode_channel_payload<T: Facet<'static>>(bytes: &'static [u8]) -> Result<T, RxError> {
654    if vox_jit::require_pure_jit() && vox_jit::force_fallback() {
655        panic!(
656            "VOX_JIT_REQUIRE_PURE=1 but channel payload decode for '{}' was forced off by VOX_CODEC",
657            T::SHAPE
658        );
659    }
660
661    let resolved = channel_decode_plan::<T>();
662    match vox_jit::global_runtime().try_decode_borrowed::<T>(
663        bytes,
664        0,
665        &resolved.plan,
666        &resolved.registry,
667    ) {
668        Some(result) => result.map_err(RxError::Deserialize),
669        None if vox_jit::require_pure_jit() => {
670            panic!(
671                "VOX_JIT_REQUIRE_PURE=1 but channel payload decode for '{}' did not use the JIT",
672                T::SHAPE
673            )
674        }
675        None => vox_postcard::from_slice_borrowed(bytes).map_err(RxError::Deserialize),
676    }
677}
678
679#[cfg(not(all(feature = "jit", not(target_arch = "wasm32"))))]
680fn decode_channel_payload<T: Facet<'static>>(bytes: &'static [u8]) -> Result<T, RxError> {
681    vox_postcard::from_slice_borrowed(bytes).map_err(RxError::Deserialize)
682}
683
684fn decode_channel_item<T>(msg: SelfRef<ChannelItem<'static>>) -> Result<Option<SelfRef<T>>, RxError>
685where
686    T: Facet<'static>,
687{
688    msg.try_repack(|item, _backing_bytes| {
689        let Payload::PostcardBytes(bytes) = item.item else {
690            return Err(RxError::Protocol(
691                "incoming channel item payload was not Incoming".into(),
692            ));
693        };
694        decode_channel_payload(bytes)
695    })
696    .map(Some)
697}
698
699fn handle_incoming_channel_message<T>(
700    msg: Option<IncomingChannelMessage>,
701    replenisher: Option<&ChannelCreditReplenisherHandle>,
702    debug_context: ChannelDebugContext,
703    closed: &AtomicBool,
704) -> Result<Option<SelfRef<T>>, RxError>
705where
706    T: Facet<'static>,
707{
708    match msg {
709        Some(IncomingChannelMessage::Close(_)) => {
710            observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
711                ChannelEvent::Closed {
712                    channel,
713                    reason: ChannelCloseReason::Remote,
714                }
715            });
716            closed.store(true, Ordering::Release);
717            Ok(None)
718        }
719        Some(IncomingChannelMessage::ConnectionClosed(reason)) => {
720            observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
721                ChannelEvent::Closed {
722                    channel,
723                    reason: ChannelCloseReason::ConnectionClosed,
724                }
725            });
726            closed.store(true, Ordering::Release);
727            Err(RxError::ConnectionClosed(reason))
728        }
729        None => {
730            observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
731                ChannelEvent::Closed {
732                    channel,
733                    reason: ChannelCloseReason::Unknown,
734                }
735            });
736            closed.store(true, Ordering::Release);
737            Ok(None)
738        }
739        Some(IncomingChannelMessage::Reset(_)) => {
740            observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
741                ChannelEvent::Reset {
742                    channel,
743                    reason: ChannelResetReason::Remote,
744                }
745            });
746            closed.store(true, Ordering::Release);
747            Err(RxError::Reset)
748        }
749        Some(IncomingChannelMessage::Item(msg)) => {
750            let value = decode_channel_item(msg);
751            if value.is_ok() {
752                observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
753                    ChannelEvent::ItemConsumed { channel }
754                });
755                if let Some(replenisher) = replenisher {
756                    replenisher.on_item_consumed();
757                }
758            }
759            value
760        }
761    }
762}
763
764/// Runtime sink implemented by the session driver.
765///
766/// The contract is strict: successful completion means the item has gone
767/// through the conduit to the link commit boundary.
768pub trait ChannelSink: crate::MaybeSend + crate::MaybeSync + 'static {
769    fn send_payload<'payload>(
770        &self,
771        payload: Payload<'payload>,
772    ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>>;
773
774    fn channel_id(&self) -> Option<ChannelId> {
775        None
776    }
777
778    fn connection_id(&self) -> Option<ConnectionId> {
779        None
780    }
781
782    fn debug_context(&self) -> Option<ChannelDebugContext> {
783        None
784    }
785
786    fn observer(&self) -> Option<VoxObserverHandle> {
787        None
788    }
789
790    #[doc(hidden)]
791    fn note_send_started(&self) {}
792
793    #[doc(hidden)]
794    fn note_send_waiting_for_credit(&self) {}
795
796    #[doc(hidden)]
797    fn note_send_finished(&self, _outcome: ChannelSendOutcome) {}
798
799    #[doc(hidden)]
800    fn note_try_send_outcome(&self, _outcome: ChannelTrySendOutcome) {}
801
802    #[doc(hidden)]
803    fn try_send_payload_with_outcome<'payload>(
804        &self,
805        payload: Payload<'payload>,
806    ) -> Result<(), ChannelTrySendOutcome> {
807        self.try_send_payload(payload).map_err(|err| match err {
808            TrySendError::Full(()) => ChannelTrySendOutcome::FullRuntimeQueue,
809            TrySendError::Closed(()) => ChannelTrySendOutcome::Closed,
810        })
811    }
812
813    // r[impl rpc.flow-control.credit.try-send]
814    fn try_send_payload<'payload>(
815        &self,
816        _payload: Payload<'payload>,
817    ) -> Result<(), TrySendError<()>> {
818        Err(TrySendError::Full(()))
819    }
820
821    fn close_channel(
822        &self,
823        metadata: Metadata,
824    ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>>;
825
826    /// Synchronous drop-time close signal.
827    ///
828    /// This is used by `Tx::drop` to notify the runtime immediately without
829    /// spawning detached tasks. Implementations should enqueue a close intent
830    /// to their runtime/driver if possible.
831    fn close_channel_on_drop(&self) {}
832}
833
834// r[impl rpc.flow-control.credit]
835// r[impl rpc.flow-control.credit.exhaustion]
836/// A [`ChannelSink`] wrapper that enforces credit-based flow control.
837///
838/// Each `send_payload` acquires one permit from the semaphore, blocking if
839/// credit is zero. The semaphore is shared with the driver so that incoming
840/// `GrantCredit` messages can add permits via [`CreditSink::credit`].
841pub struct CreditSink<S: ChannelSink> {
842    inner: S,
843    credit: Arc<Semaphore>,
844}
845
846impl<S: ChannelSink> CreditSink<S> {
847    // r[impl rpc.flow-control.credit.initial]
848    // r[impl rpc.flow-control.credit.initial.zero]
849    /// Wrap `inner` with runtime-configured initial credit permits.
850    pub fn new(inner: S, initial_credit: u32) -> Self {
851        Self {
852            inner,
853            credit: Arc::new(Semaphore::new(
854                "vox_types.channel.credit",
855                initial_credit as usize,
856            )),
857        }
858    }
859
860    /// Returns the credit semaphore. The driver holds a clone so
861    /// `GrantCredit` messages can call `add_permits`.
862    pub fn credit(&self) -> &Arc<Semaphore> {
863        &self.credit
864    }
865}
866
867impl<S: ChannelSink> ChannelSink for CreditSink<S> {
868    fn send_payload<'payload>(
869        &self,
870        payload: Payload<'payload>,
871    ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
872        let credit = self.credit.clone();
873        let channel_id = self.channel_id();
874        if credit.available_permits() == 0 {
875            self.inner.note_send_waiting_for_credit();
876            observe_sink_channel(
877                self,
878                channel_id,
879                ChannelDebugContext::default(),
880                |channel| ChannelEvent::SendWaitingForCredit { channel },
881            );
882        }
883        let fut = self.inner.send_payload(payload);
884        Box::pin(async move {
885            let permit = credit
886                .acquire_owned()
887                .await
888                .map_err(|_| TxError::Transport("channel credit semaphore closed".into()))?;
889            std::mem::forget(permit);
890            fut.await
891        })
892    }
893
894    fn channel_id(&self) -> Option<ChannelId> {
895        self.inner.channel_id()
896    }
897
898    fn connection_id(&self) -> Option<ConnectionId> {
899        self.inner.connection_id()
900    }
901
902    fn debug_context(&self) -> Option<ChannelDebugContext> {
903        self.inner.debug_context()
904    }
905
906    fn observer(&self) -> Option<VoxObserverHandle> {
907        self.inner.observer()
908    }
909
910    fn note_send_started(&self) {
911        self.inner.note_send_started();
912    }
913
914    fn note_send_waiting_for_credit(&self) {
915        self.inner.note_send_waiting_for_credit();
916    }
917
918    fn note_send_finished(&self, outcome: ChannelSendOutcome) {
919        self.inner.note_send_finished(outcome);
920    }
921
922    fn note_try_send_outcome(&self, outcome: ChannelTrySendOutcome) {
923        self.inner.note_try_send_outcome(outcome);
924    }
925
926    // r[impl rpc.observability.channel.try-send-detail]
927    fn try_send_payload_with_outcome<'payload>(
928        &self,
929        payload: Payload<'payload>,
930    ) -> Result<(), ChannelTrySendOutcome> {
931        let permit = self.credit.try_acquire_owned().map_err(|err| match err {
932            TryAcquireError::NoPermits => ChannelTrySendOutcome::FullCredit,
933            TryAcquireError::Closed => ChannelTrySendOutcome::Closed,
934        })?;
935
936        match self.inner.try_send_payload_with_outcome(payload) {
937            Ok(()) => {
938                std::mem::forget(permit);
939                Ok(())
940            }
941            Err(err) => Err(err),
942        }
943    }
944
945    fn close_channel(
946        &self,
947        metadata: Metadata,
948    ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
949        // Close does not consume credit — it's a control message.
950        self.inner.close_channel(metadata)
951    }
952
953    fn close_channel_on_drop(&self) {
954        self.inner.close_channel_on_drop();
955    }
956}
957
958/// Message delivered to an `Rx` by the driver.
959pub enum IncomingChannelMessage {
960    Item(SelfRef<ChannelItem<'static>>),
961    Close(SelfRef<ChannelClose<'static>>),
962    Reset(SelfRef<ChannelReset<'static>>),
963    // r[impl rpc.channel.connection-closure]
964    ConnectionClosed(ConnectionCloseReason),
965}
966
967pub struct LogicalIncomingChannelMessage {
968    pub msg: IncomingChannelMessage,
969    pub replenisher: Option<ChannelCreditReplenisherHandle>,
970}
971
972/// Sender-side runtime slot.
973#[derive(Facet)]
974#[facet(opaque)]
975pub(crate) struct SinkSlot {
976    pub(crate) inner: Option<Arc<dyn ChannelSink>>,
977}
978
979impl SinkSlot {
980    pub(crate) fn empty() -> Self {
981        Self { inner: None }
982    }
983}
984
985/// Opaque liveness retention slot for bound channel handles.
986#[derive(Facet)]
987#[facet(opaque)]
988pub(crate) struct LivenessSlot {
989    pub(crate) inner: Option<ChannelLivenessHandle>,
990}
991
992impl LivenessSlot {
993    pub(crate) fn empty() -> Self {
994        Self { inner: None }
995    }
996}
997
998/// Receiver-side runtime slot.
999#[derive(Facet)]
1000#[facet(opaque)]
1001pub(crate) struct ReceiverSlot {
1002    pub(crate) inner: Option<ChannelMailboxReceiver<IncomingChannelMessage>>,
1003}
1004
1005impl ReceiverSlot {
1006    pub(crate) fn empty() -> Self {
1007        Self { inner: None }
1008    }
1009}
1010
1011#[derive(Facet)]
1012#[facet(opaque)]
1013pub(crate) struct LogicalReceiverSlot {
1014    pub(crate) inner: Option<ChannelMailboxReceiver<LogicalIncomingChannelMessage>>,
1015}
1016
1017impl LogicalReceiverSlot {
1018    pub(crate) fn empty() -> Self {
1019        Self { inner: None }
1020    }
1021}
1022
1023/// Receiver-side credit replenishment slot.
1024#[derive(Facet)]
1025#[facet(opaque)]
1026pub(crate) struct ReplenisherSlot {
1027    pub(crate) inner: Option<ChannelCreditReplenisherHandle>,
1028}
1029
1030impl ReplenisherSlot {
1031    pub(crate) fn empty() -> Self {
1032        Self { inner: None }
1033    }
1034}
1035
1036/// Sender handle: "I send". The holder of a `Tx<T>` sends items of type `T`.
1037///
1038/// In method args, the handler holds it (handler sends → caller).
1039///
1040/// Wire encoding is always unit (`()`), with channel IDs carried exclusively
1041/// in `Message::Request.channels`.
1042// r[impl rpc.channel]
1043// r[impl rpc.channel.direction]
1044// r[impl rpc.channel.payload-encoding]
1045#[derive(Facet)]
1046#[facet(proxy = crate::ChannelId)]
1047pub struct Tx<T> {
1048    pub(crate) channel_id: ChannelId,
1049    pub(crate) sink: SinkSlot,
1050    pub(crate) core: CoreSlot,
1051    pub(crate) liveness: LivenessSlot,
1052    #[facet(opaque)]
1053    debug_context: ChannelDebugContext,
1054    #[facet(opaque)]
1055    closed: AtomicBool,
1056    #[facet(opaque)]
1057    _marker: PhantomData<T>,
1058}
1059
1060impl<T> Tx<T> {
1061    /// Create a standalone unbound Tx (used by deserialization).
1062    #[track_caller]
1063    pub fn unbound() -> Self {
1064        let caller = Location::caller();
1065        Self::unbound_with_context(ChannelDebugContext {
1066            type_name: Some(std::any::type_name::<T>()),
1067            source_location: Some(SourceLocation {
1068                file: caller.file(),
1069                line: caller.line(),
1070                column: caller.column(),
1071            }),
1072            ..ChannelDebugContext::default()
1073        })
1074    }
1075
1076    fn unbound_with_context(debug_context: ChannelDebugContext) -> Self {
1077        Self {
1078            channel_id: ChannelId::RESERVED,
1079            sink: SinkSlot::empty(),
1080            core: CoreSlot::empty(),
1081            liveness: LivenessSlot::empty(),
1082            debug_context,
1083            closed: AtomicBool::new(false),
1084            _marker: PhantomData,
1085        }
1086    }
1087
1088    /// Create a Tx that is part of a `channel()` pair.
1089    fn paired(core: Arc<ChannelCore>) -> Self {
1090        let debug_context = core.debug_context();
1091        Self {
1092            channel_id: ChannelId::RESERVED,
1093            sink: SinkSlot::empty(),
1094            core: CoreSlot { inner: Some(core) },
1095            liveness: LivenessSlot::empty(),
1096            debug_context,
1097            closed: AtomicBool::new(false),
1098            _marker: PhantomData,
1099        }
1100    }
1101
1102    pub fn debug_context(&self) -> ChannelDebugContext {
1103        self.debug_context
1104    }
1105
1106    pub fn is_bound(&self) -> bool {
1107        if self.sink.inner.is_some() {
1108            return true;
1109        }
1110        if let Some(core) = &self.core.inner {
1111            return core.get_sink().is_some();
1112        }
1113        false
1114    }
1115
1116    /// Check if this Tx is part of a channel() pair (has a shared core).
1117    pub fn has_core(&self) -> bool {
1118        self.core.inner.is_some()
1119    }
1120
1121    // r[impl rpc.channel.pair.tx-read]
1122    fn resolve_sink_now(&self) -> Option<Arc<dyn ChannelSink>> {
1123        // Fast path: local slot (standalone/callee-side handle)
1124        if let Some(sink) = &self.sink.inner {
1125            return Some(sink.clone());
1126        }
1127        // Slow path: read from shared core (paired handle)
1128        if let Some(core) = &self.core.inner
1129            && let Some(sink) = core.get_sink()
1130        {
1131            return Some(sink);
1132        }
1133        None
1134    }
1135
1136    fn channel_id_for_sink(&self, sink: &dyn ChannelSink) -> Option<ChannelId> {
1137        if self.channel_id == ChannelId::RESERVED {
1138            sink.channel_id()
1139        } else {
1140            Some(self.channel_id)
1141        }
1142    }
1143
1144    fn observe_sink_event(
1145        &self,
1146        sink: &dyn ChannelSink,
1147        channel_id: Option<ChannelId>,
1148        event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
1149    ) {
1150        observe_sink_channel(sink, channel_id, self.debug_context, event);
1151    }
1152
1153    fn observe_try_send(
1154        &self,
1155        sink: &dyn ChannelSink,
1156        channel_id: Option<ChannelId>,
1157        outcome: ChannelTrySendOutcome,
1158    ) {
1159        self.observe_sink_event(sink, channel_id, |channel| ChannelEvent::TrySend {
1160            channel,
1161            outcome,
1162        });
1163    }
1164
1165    pub async fn send<'value>(&self, value: T) -> Result<(), TxError>
1166    where
1167        T: Facet<'value>,
1168    {
1169        let sink = if let Some(sink) = self.resolve_sink_now() {
1170            sink
1171        } else if let Some(core) = &self.core.inner {
1172            loop {
1173                let notified = core.binding_changed.notified();
1174                if let Some(sink) = self.resolve_sink_now() {
1175                    break sink;
1176                }
1177                notified.await;
1178            }
1179        } else {
1180            return Err(TxError::Unbound);
1181        };
1182        let channel_id = self.channel_id_for_sink(sink.as_ref());
1183        sink.note_send_started();
1184        self.observe_sink_event(sink.as_ref(), channel_id, |channel| {
1185            ChannelEvent::SendStarted { channel }
1186        });
1187        let started_at = crate::time::Instant::now();
1188        let ptr = PtrConst::new((&value as *const T).cast::<u8>());
1189        // SAFETY: `value` is explicitly dropped only after `await`, so the pointer
1190        // remains valid for the whole send operation.
1191        let payload = unsafe { Payload::outgoing_unchecked(ptr, T::SHAPE) };
1192        let result = sink.send_payload(payload).await;
1193        let outcome = match &result {
1194            Ok(()) => ChannelSendOutcome::Sent,
1195            Err(TxError::Transport(message)) if message == "channel closed" => {
1196                ChannelSendOutcome::Closed
1197            }
1198            Err(_) => ChannelSendOutcome::TransportError,
1199        };
1200        self.observe_sink_event(sink.as_ref(), channel_id, |channel| {
1201            ChannelEvent::SendFinished {
1202                channel,
1203                outcome,
1204                elapsed: started_at.elapsed(),
1205            }
1206        });
1207        sink.note_send_finished(outcome);
1208        drop(value);
1209        result
1210    }
1211
1212    // r[impl rpc.flow-control.credit.try-send]
1213    pub fn try_send<'value>(&self, value: T) -> Result<(), TrySendError<T>>
1214    where
1215        T: Facet<'value>,
1216    {
1217        if self.closed.load(Ordering::Acquire) {
1218            return Err(TrySendError::Closed(value));
1219        }
1220
1221        let Some(sink) = self.resolve_sink_now() else {
1222            return Err(TrySendError::Full(value));
1223        };
1224        let channel_id = self.channel_id_for_sink(sink.as_ref());
1225
1226        let ptr = PtrConst::new((&value as *const T).cast::<u8>());
1227        // SAFETY: `try_send_payload` must complete synchronously before this
1228        // function returns, so `value` stays alive for the full borrow.
1229        let payload = unsafe { Payload::outgoing_unchecked(ptr, T::SHAPE) };
1230        match sink.try_send_payload_with_outcome(payload) {
1231            Ok(()) => {
1232                sink.note_try_send_outcome(ChannelTrySendOutcome::Sent);
1233                self.observe_try_send(sink.as_ref(), channel_id, ChannelTrySendOutcome::Sent);
1234                drop(value);
1235                Ok(())
1236            }
1237            Err(ChannelTrySendOutcome::Closed) => {
1238                sink.note_try_send_outcome(ChannelTrySendOutcome::Closed);
1239                self.observe_try_send(sink.as_ref(), channel_id, ChannelTrySendOutcome::Closed);
1240                self.closed.store(true, Ordering::Release);
1241                Err(TrySendError::Closed(value))
1242            }
1243            Err(outcome) => {
1244                sink.note_try_send_outcome(outcome);
1245                self.observe_try_send(sink.as_ref(), channel_id, outcome);
1246                Err(TrySendError::Full(value))
1247            }
1248        }
1249    }
1250
1251    // r[impl rpc.channel.lifecycle]
1252    pub async fn close<'value>(&self, metadata: Metadata<'value>) -> Result<(), TxError> {
1253        self.closed.store(true, Ordering::Release);
1254        let sink = if let Some(sink) = self.resolve_sink_now() {
1255            sink
1256        } else if let Some(core) = &self.core.inner {
1257            loop {
1258                let notified = core.binding_changed.notified();
1259                if let Some(sink) = self.resolve_sink_now() {
1260                    break sink;
1261                }
1262                notified.await;
1263            }
1264        } else {
1265            return Err(TxError::Unbound);
1266        };
1267        sink.close_channel(metadata).await
1268    }
1269
1270    #[doc(hidden)]
1271    pub fn bind(&mut self, sink: Arc<dyn ChannelSink>) {
1272        self.bind_with_liveness(sink, None);
1273    }
1274
1275    #[doc(hidden)]
1276    pub fn bind_with_liveness(
1277        &mut self,
1278        sink: Arc<dyn ChannelSink>,
1279        liveness: Option<ChannelLivenessHandle>,
1280    ) {
1281        self.sink.inner = Some(sink);
1282        self.liveness.inner = liveness;
1283    }
1284
1285    #[doc(hidden)]
1286    pub fn finish_retry_binding(&self) {
1287        if let Some(core) = &self.core.inner {
1288            core.finish_retry_binding();
1289        }
1290    }
1291}
1292
1293impl<T> Drop for Tx<T> {
1294    // r[impl rpc.channel.lifecycle]
1295    fn drop(&mut self) {
1296        if self.closed.swap(true, Ordering::AcqRel) {
1297            return;
1298        }
1299
1300        let sink = if let Some(sink) = &self.sink.inner {
1301            Some(sink.clone())
1302        } else if let Some(core) = &self.core.inner {
1303            core.get_sink()
1304        } else {
1305            None
1306        };
1307
1308        let Some(sink) = sink else {
1309            return;
1310        };
1311
1312        // Synchronous signal into the runtime/driver; no detached async work here.
1313        sink.close_channel_on_drop();
1314    }
1315}
1316
1317impl<T> TryFrom<&Tx<T>> for ChannelId {
1318    type Error = String;
1319
1320    fn try_from(value: &Tx<T>) -> Result<Self, Self::Error> {
1321        // Case 1: Caller passes Tx in args (callee sends, caller receives).
1322        // Allocate a channel ID and store the receiver binding in the shared
1323        // core so the caller's paired Rx can pick it up.
1324        CHANNEL_BINDER.with(|cell| {
1325            let borrow = cell.borrow();
1326            let Some(binder) = *borrow else {
1327                return Err("serializing Tx requires an active ChannelBinder".to_string());
1328            };
1329            let (channel_id, bound) = binder.create_rx_with_context(Some(value.debug_context));
1330            if let Some(core) = &value.core.inner {
1331                core.bind_retryable_receiver(bound);
1332            }
1333            Ok(channel_id)
1334        })
1335    }
1336}
1337
1338impl<T> TryFrom<ChannelId> for Tx<T> {
1339    type Error = String;
1340
1341    fn try_from(channel_id: ChannelId) -> Result<Self, Self::Error> {
1342        let debug_context = ChannelDebugContext {
1343            type_name: Some(std::any::type_name::<T>()),
1344            ..ChannelDebugContext::default()
1345        };
1346        let mut tx = Self::unbound_with_context(debug_context);
1347        tx.channel_id = channel_id;
1348
1349        CHANNEL_BINDER.with(|cell| {
1350            let Some(binder) = *cell.borrow() else {
1351                return Err("deserializing Tx requires an active ChannelBinder".to_string());
1352            };
1353            let sink = binder.bind_tx_with_context(channel_id, Some(debug_context));
1354            let liveness = binder.channel_liveness();
1355            tx.bind_with_liveness(sink, liveness);
1356            Ok(())
1357        })?;
1358
1359        Ok(tx)
1360    }
1361}
1362
1363/// Error when sending on a `Tx`.
1364#[derive(Debug)]
1365pub enum TxError {
1366    Unbound,
1367    Transport(String),
1368}
1369
1370impl std::fmt::Display for TxError {
1371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1372        match self {
1373            Self::Unbound => write!(f, "channel is not bound"),
1374            Self::Transport(msg) => write!(f, "transport error: {msg}"),
1375        }
1376    }
1377}
1378
1379impl std::error::Error for TxError {}
1380
1381/// Error returned by [`Tx::try_send`].
1382#[derive(Debug, Clone, PartialEq, Eq)]
1383pub enum TrySendError<T> {
1384    /// Sending would block because channel credit or runtime queue capacity is exhausted.
1385    Full(T),
1386    /// The channel or underlying connection is closed.
1387    Closed(T),
1388}
1389
1390impl<T> TrySendError<T> {
1391    pub fn into_inner(self) -> T {
1392        match self {
1393            Self::Full(value) | Self::Closed(value) => value,
1394        }
1395    }
1396}
1397
1398impl<T> std::fmt::Display for TrySendError<T> {
1399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1400        match self {
1401            Self::Full(_) => write!(f, "channel is full"),
1402            Self::Closed(_) => write!(f, "channel is closed"),
1403        }
1404    }
1405}
1406
1407impl<T: std::fmt::Debug> std::error::Error for TrySendError<T> {}
1408
1409/// Receiver handle: "I receive". The holder of an `Rx<T>` receives items of type `T`.
1410///
1411/// In method args, the handler holds it (handler receives ← caller).
1412///
1413/// Channel IDs are serialized inline in the postcard payload.
1414#[derive(Facet)]
1415#[facet(proxy = crate::ChannelId)]
1416pub struct Rx<T> {
1417    pub(crate) channel_id: ChannelId,
1418    pub(crate) receiver: ReceiverSlot,
1419    pub(crate) logical_receiver: LogicalReceiverSlot,
1420    pub(crate) core: CoreSlot,
1421    pub(crate) liveness: LivenessSlot,
1422    pub(crate) replenisher: ReplenisherSlot,
1423    #[facet(opaque)]
1424    debug_context: ChannelDebugContext,
1425    #[facet(opaque)]
1426    closed: AtomicBool,
1427    #[facet(opaque)]
1428    _marker: PhantomData<T>,
1429}
1430
1431impl<T> Rx<T> {
1432    /// Create a standalone unbound Rx (used by deserialization).
1433    #[track_caller]
1434    pub fn unbound() -> Self {
1435        let caller = Location::caller();
1436        Self::unbound_with_context(ChannelDebugContext {
1437            type_name: Some(std::any::type_name::<T>()),
1438            source_location: Some(SourceLocation {
1439                file: caller.file(),
1440                line: caller.line(),
1441                column: caller.column(),
1442            }),
1443            ..ChannelDebugContext::default()
1444        })
1445    }
1446
1447    fn unbound_with_context(debug_context: ChannelDebugContext) -> Self {
1448        Self {
1449            channel_id: ChannelId::RESERVED,
1450            receiver: ReceiverSlot::empty(),
1451            logical_receiver: LogicalReceiverSlot::empty(),
1452            core: CoreSlot::empty(),
1453            liveness: LivenessSlot::empty(),
1454            replenisher: ReplenisherSlot::empty(),
1455            debug_context,
1456            closed: AtomicBool::new(false),
1457            _marker: PhantomData,
1458        }
1459    }
1460
1461    /// Create an Rx that is part of a `channel()` pair.
1462    fn paired(core: Arc<ChannelCore>) -> Self {
1463        let debug_context = core.debug_context();
1464        Self {
1465            channel_id: ChannelId::RESERVED,
1466            receiver: ReceiverSlot::empty(),
1467            logical_receiver: LogicalReceiverSlot::empty(),
1468            core: CoreSlot { inner: Some(core) },
1469            liveness: LivenessSlot::empty(),
1470            replenisher: ReplenisherSlot::empty(),
1471            debug_context,
1472            closed: AtomicBool::new(false),
1473            _marker: PhantomData,
1474        }
1475    }
1476
1477    pub fn debug_context(&self) -> ChannelDebugContext {
1478        self.debug_context
1479    }
1480
1481    pub fn is_bound(&self) -> bool {
1482        self.receiver.inner.is_some()
1483    }
1484
1485    /// Check if this Rx is part of a channel() pair (has a shared core).
1486    pub fn has_core(&self) -> bool {
1487        self.core.inner.is_some()
1488    }
1489
1490    // r[impl rpc.channel.pair.rx-take]
1491    pub async fn recv(&mut self) -> Result<Option<SelfRef<T>>, RxError>
1492    where
1493        T: Facet<'static>,
1494    {
1495        loop {
1496            if self.logical_receiver.inner.is_none()
1497                && let Some(core) = &self.core.inner
1498                && let Some((receiver, liveness, replenisher)) = core.take_logical_receiver()
1499            {
1500                self.logical_receiver.inner = Some(receiver);
1501                self.liveness.inner = liveness;
1502                self.replenisher.inner = replenisher;
1503            }
1504
1505            if let Some(receiver) = self.logical_receiver.inner.as_mut() {
1506                let received = receiver.recv().await;
1507                return match received {
1508                    Some(LogicalIncomingChannelMessage { msg, replenisher }) => {
1509                        handle_incoming_channel_message(
1510                            Some(msg),
1511                            replenisher.as_ref(),
1512                            self.debug_context,
1513                            &self.closed,
1514                        )
1515                    }
1516                    None => handle_incoming_channel_message(
1517                        None,
1518                        None,
1519                        self.debug_context,
1520                        &self.closed,
1521                    ),
1522                };
1523            }
1524
1525            if self.receiver.inner.is_none()
1526                && let Some(core) = &self.core.inner
1527                && let Some(bound) = core.take_receiver()
1528            {
1529                self.receiver.inner = Some(bound.receiver);
1530                self.liveness.inner = bound.liveness;
1531                self.replenisher.inner = bound.replenisher;
1532            }
1533
1534            if let Some(receiver) = self.receiver.inner.as_mut() {
1535                return handle_incoming_channel_message(
1536                    receiver.recv().await,
1537                    self.replenisher.inner.as_ref(),
1538                    self.debug_context,
1539                    &self.closed,
1540                );
1541            }
1542
1543            let Some(core) = &self.core.inner else {
1544                return Err(RxError::Unbound);
1545            };
1546            core.binding_changed.notified().await;
1547        }
1548    }
1549    #[doc(hidden)]
1550    pub fn bind(&mut self, receiver: ChannelMailboxReceiver<IncomingChannelMessage>) {
1551        self.bind_with_liveness(receiver, None);
1552    }
1553
1554    #[doc(hidden)]
1555    pub fn bind_with_liveness(
1556        &mut self,
1557        receiver: ChannelMailboxReceiver<IncomingChannelMessage>,
1558        liveness: Option<ChannelLivenessHandle>,
1559    ) {
1560        self.receiver.inner = Some(receiver);
1561        self.logical_receiver.inner = None;
1562        self.liveness.inner = liveness;
1563        self.replenisher.inner = None;
1564        self.closed.store(false, Ordering::Release);
1565    }
1566}
1567
1568impl<T> Drop for Rx<T> {
1569    // r[impl rpc.channel.lifecycle]
1570    fn drop(&mut self) {
1571        if self.closed.swap(true, Ordering::AcqRel) {
1572            return;
1573        }
1574
1575        if self.replenisher.inner.is_none()
1576            && let Some(core) = &self.core.inner
1577        {
1578            if let Some((_receiver, _liveness, replenisher)) = core.take_logical_receiver() {
1579                self.replenisher.inner = replenisher;
1580            } else if let Some(bound) = core.take_receiver() {
1581                self.replenisher.inner = bound.replenisher;
1582            }
1583        }
1584
1585        if let Some(replenisher) = &self.replenisher.inner {
1586            observe_replenisher_channel(replenisher.as_ref(), self.debug_context, |channel| {
1587                ChannelEvent::Reset {
1588                    channel,
1589                    reason: ChannelResetReason::ReceiverDropped,
1590                }
1591            });
1592            replenisher.on_receiver_dropped();
1593        }
1594    }
1595}
1596
1597impl<T> TryFrom<&Rx<T>> for ChannelId {
1598    type Error = String;
1599
1600    fn try_from(value: &Rx<T>) -> Result<Self, Self::Error> {
1601        // Case 2: Caller passes Rx in args (callee receives, caller sends).
1602        // Allocate a channel ID and store the sink binding in the shared
1603        // core so the caller's paired Tx can pick it up.
1604        CHANNEL_BINDER.with(|cell| {
1605            let borrow = cell.borrow();
1606            let Some(binder) = *borrow else {
1607                return Err("serializing Rx requires an active ChannelBinder".to_string());
1608            };
1609            let (channel_id, sink) = binder.create_tx_with_context(Some(value.debug_context));
1610            let liveness = binder.channel_liveness();
1611            if let Some(core) = &value.core.inner {
1612                core.set_binding(ChannelBinding::Sink(BoundChannelSink { sink, liveness }));
1613            }
1614            Ok(channel_id)
1615        })
1616    }
1617}
1618
1619impl<T> TryFrom<ChannelId> for Rx<T> {
1620    type Error = String;
1621
1622    fn try_from(channel_id: ChannelId) -> Result<Self, Self::Error> {
1623        let debug_context = ChannelDebugContext {
1624            type_name: Some(std::any::type_name::<T>()),
1625            ..ChannelDebugContext::default()
1626        };
1627        let mut rx = Self::unbound_with_context(debug_context);
1628        rx.channel_id = channel_id;
1629
1630        CHANNEL_BINDER.with(|cell| {
1631            let Some(binder) = *cell.borrow() else {
1632                return Err("deserializing Rx requires an active ChannelBinder".to_string());
1633            };
1634            let bound = binder.register_rx_with_context(channel_id, Some(debug_context));
1635            rx.receiver.inner = Some(bound.receiver);
1636            rx.liveness.inner = bound.liveness;
1637            rx.replenisher.inner = bound.replenisher;
1638            Ok(())
1639        })?;
1640
1641        Ok(rx)
1642    }
1643}
1644
1645/// Error when receiving from an `Rx`.
1646#[derive(Debug)]
1647pub enum RxError {
1648    Unbound,
1649    Reset,
1650    ConnectionClosed(ConnectionCloseReason),
1651    Deserialize(vox_postcard::error::DeserializeError),
1652    Protocol(String),
1653}
1654
1655impl std::fmt::Display for RxError {
1656    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1657        match self {
1658            Self::Unbound => write!(f, "channel is not bound"),
1659            Self::Reset => write!(f, "channel reset by peer"),
1660            Self::ConnectionClosed(reason) => {
1661                write!(f, "connection closed while receiving channel: {reason:?}")
1662            }
1663            Self::Deserialize(e) => write!(f, "deserialize error: {e}"),
1664            Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
1665        }
1666    }
1667}
1668
1669impl std::error::Error for RxError {}
1670
1671/// Check if a shape represents a `Tx` channel.
1672pub fn is_tx(shape: &facet_core::Shape) -> bool {
1673    shape.decl_id == Tx::<()>::SHAPE.decl_id
1674}
1675
1676/// Check if a shape represents an `Rx` channel.
1677pub fn is_rx(shape: &facet_core::Shape) -> bool {
1678    shape.decl_id == Rx::<()>::SHAPE.decl_id
1679}
1680
1681/// Check if a shape represents any channel type (`Tx` or `Rx`).
1682pub fn is_channel(shape: &facet_core::Shape) -> bool {
1683    is_tx(shape) || is_rx(shape)
1684}
1685
1686pub trait ChannelBinder: crate::MaybeSend + crate::MaybeSync {
1687    /// Allocate a channel ID and create a sink for sending items.
1688    ///
1689    fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>);
1690
1691    fn create_tx_with_context(
1692        &self,
1693        debug_context: Option<ChannelDebugContext>,
1694    ) -> (ChannelId, Arc<dyn ChannelSink>) {
1695        let _ = debug_context;
1696        self.create_tx()
1697    }
1698
1699    /// Allocate a channel ID, register it for routing, and return a receiver.
1700    fn create_rx(&self) -> (ChannelId, BoundChannelReceiver);
1701
1702    fn create_rx_with_context(
1703        &self,
1704        debug_context: Option<ChannelDebugContext>,
1705    ) -> (ChannelId, BoundChannelReceiver) {
1706        let _ = debug_context;
1707        self.create_rx()
1708    }
1709
1710    /// Create a sink for a known channel ID (callee side).
1711    ///
1712    /// The channel ID comes from `Request.channels`.
1713    fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink>;
1714
1715    fn bind_tx_with_context(
1716        &self,
1717        channel_id: ChannelId,
1718        debug_context: Option<ChannelDebugContext>,
1719    ) -> Arc<dyn ChannelSink> {
1720        let _ = debug_context;
1721        self.bind_tx(channel_id)
1722    }
1723
1724    /// Register an inbound channel by ID and return the receiver (callee side).
1725    ///
1726    /// The channel ID comes from `Request.channels`.
1727    fn register_rx(&self, channel_id: ChannelId) -> BoundChannelReceiver;
1728
1729    fn register_rx_with_context(
1730        &self,
1731        channel_id: ChannelId,
1732        debug_context: Option<ChannelDebugContext>,
1733    ) -> BoundChannelReceiver {
1734        let _ = debug_context;
1735        self.register_rx(channel_id)
1736    }
1737
1738    /// Optional opaque handle that keeps the underlying session/connection alive
1739    /// for the lifetime of any bound channel handle.
1740    fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
1741        None
1742    }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use super::*;
1748    use crate::{Backing, ChannelClose, ChannelItem, ChannelReset, Metadata, SelfRef};
1749    use std::sync::atomic::{AtomicUsize, Ordering};
1750
1751    struct CountingSink {
1752        send_calls: AtomicUsize,
1753        close_calls: AtomicUsize,
1754        close_on_drop_calls: AtomicUsize,
1755    }
1756
1757    impl CountingSink {
1758        fn new() -> Self {
1759            Self {
1760                send_calls: AtomicUsize::new(0),
1761                close_calls: AtomicUsize::new(0),
1762                close_on_drop_calls: AtomicUsize::new(0),
1763            }
1764        }
1765    }
1766
1767    impl ChannelSink for CountingSink {
1768        fn send_payload<'payload>(
1769            &self,
1770            _payload: Payload<'payload>,
1771        ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
1772            self.send_calls.fetch_add(1, Ordering::AcqRel);
1773            Box::pin(async { Ok(()) })
1774        }
1775
1776        fn close_channel(
1777            &self,
1778            _metadata: Metadata,
1779        ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
1780            self.close_calls.fetch_add(1, Ordering::AcqRel);
1781            Box::pin(async { Ok(()) })
1782        }
1783
1784        fn close_channel_on_drop(&self) {
1785            self.close_on_drop_calls.fetch_add(1, Ordering::AcqRel);
1786        }
1787    }
1788
1789    struct CountingReplenisher {
1790        calls: AtomicUsize,
1791        dropped: AtomicUsize,
1792    }
1793
1794    impl CountingReplenisher {
1795        fn new() -> Self {
1796            Self {
1797                calls: AtomicUsize::new(0),
1798                dropped: AtomicUsize::new(0),
1799            }
1800        }
1801    }
1802
1803    impl ChannelCreditReplenisher for CountingReplenisher {
1804        fn on_item_consumed(&self) {
1805            self.calls.fetch_add(1, Ordering::AcqRel);
1806        }
1807
1808        fn on_receiver_dropped(&self) {
1809            self.dropped.fetch_add(1, Ordering::AcqRel);
1810        }
1811    }
1812
1813    #[tokio::test]
1814    async fn tx_close_does_not_emit_drop_close_after_explicit_close() {
1815        let sink_impl = Arc::new(CountingSink::new());
1816        let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1817
1818        let mut tx = Tx::<u32>::unbound();
1819        tx.bind(sink);
1820        tx.close(Metadata::default())
1821            .await
1822            .expect("close should succeed");
1823        drop(tx);
1824
1825        assert_eq!(sink_impl.close_calls.load(Ordering::Acquire), 1);
1826        assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 0);
1827    }
1828
1829    #[test]
1830    fn tx_drop_emits_close_on_drop_for_bound_sink() {
1831        let sink_impl = Arc::new(CountingSink::new());
1832        let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1833
1834        let mut tx = Tx::<u32>::unbound();
1835        tx.bind(sink);
1836        drop(tx);
1837
1838        assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 1);
1839    }
1840
1841    #[test]
1842    fn tx_drop_emits_close_on_drop_for_paired_core_binding() {
1843        let sink_impl = Arc::new(CountingSink::new());
1844        let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1845
1846        let (tx, _rx) = channel::<u32>();
1847        let core = tx.core.inner.as_ref().expect("paired tx should have core");
1848        core.set_binding(ChannelBinding::Sink(BoundChannelSink {
1849            sink,
1850            liveness: None,
1851        }));
1852        drop(tx);
1853
1854        assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 1);
1855    }
1856
1857    // r[verify rpc.observability.channel.context]
1858    #[test]
1859    fn channel_pair_captures_source_location_and_type_context() {
1860        let expected_line = line!() + 1;
1861        let (tx, rx) = channel::<u32>();
1862
1863        for context in [tx.debug_context(), rx.debug_context()] {
1864            assert_eq!(context.type_name, Some(std::any::type_name::<u32>()));
1865            let location = context
1866                .source_location
1867                .expect("channel should capture source location");
1868            assert_eq!(location.file, file!());
1869            assert_eq!(location.line, expected_line);
1870        }
1871    }
1872
1873    #[tokio::test]
1874    async fn rx_recv_returns_unbound_when_not_bound() {
1875        let mut rx = Rx::<u32>::unbound();
1876        let err = match rx.recv().await {
1877            Ok(_) => panic!("unbound rx should fail"),
1878            Err(err) => err,
1879        };
1880        assert!(matches!(err, RxError::Unbound));
1881    }
1882
1883    #[tokio::test]
1884    async fn rx_recv_returns_none_on_close() {
1885        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx1", 1);
1886        let mut rx = Rx::<u32>::unbound();
1887        rx.bind(rx_inner);
1888
1889        let close = SelfRef::owning(
1890            Backing::Boxed(Box::<[u8]>::default()),
1891            ChannelClose {
1892                metadata: Metadata::default(),
1893            },
1894        );
1895        tx.send(IncomingChannelMessage::Close(close))
1896            .await
1897            .expect("send close");
1898
1899        assert!(rx.recv().await.expect("recv should succeed").is_none());
1900    }
1901
1902    #[tokio::test]
1903    async fn rx_recv_returns_reset_error() {
1904        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx2", 1);
1905        let mut rx = Rx::<u32>::unbound();
1906        rx.bind(rx_inner);
1907
1908        let reset = SelfRef::owning(
1909            Backing::Boxed(Box::<[u8]>::default()),
1910            ChannelReset {
1911                metadata: Metadata::default(),
1912            },
1913        );
1914        tx.send(IncomingChannelMessage::Reset(reset))
1915            .await
1916            .expect("send reset");
1917
1918        let err = match rx.recv().await {
1919            Ok(_) => panic!("reset should be surfaced as error"),
1920            Err(err) => err,
1921        };
1922        assert!(matches!(err, RxError::Reset));
1923    }
1924
1925    // r[verify rpc.channel.connection-closure]
1926    #[tokio::test]
1927    async fn rx_recv_surfaces_connection_closed() {
1928        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_connection_closed", 1);
1929        let mut rx = Rx::<u32>::unbound();
1930        rx.bind(rx_inner);
1931
1932        tx.send(IncomingChannelMessage::ConnectionClosed(
1933            ConnectionCloseReason::Protocol,
1934        ))
1935        .await
1936        .expect("send connection close");
1937
1938        let err = match rx.recv().await {
1939            Ok(_) => panic!("connection closure should be surfaced as error"),
1940            Err(err) => err,
1941        };
1942        assert!(matches!(
1943            err,
1944            RxError::ConnectionClosed(ConnectionCloseReason::Protocol)
1945        ));
1946    }
1947
1948    #[tokio::test]
1949    async fn rx_recv_rejects_outgoing_payload_variant_as_protocol_error() {
1950        static VALUE: u32 = 42;
1951
1952        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx3", 1);
1953        let mut rx = Rx::<u32>::unbound();
1954        rx.bind(rx_inner);
1955
1956        let item = SelfRef::owning(
1957            Backing::Boxed(Box::<[u8]>::default()),
1958            ChannelItem {
1959                item: Payload::outgoing(&VALUE),
1960            },
1961        );
1962        tx.send(IncomingChannelMessage::Item(item))
1963            .await
1964            .expect("send item");
1965
1966        let err = match rx.recv().await {
1967            Ok(_) => panic!("outgoing payload should be protocol error"),
1968            Err(err) => err,
1969        };
1970        assert!(matches!(err, RxError::Protocol(_)));
1971    }
1972
1973    #[tokio::test]
1974    async fn rx_recv_notifies_replenisher_after_consuming_an_item() {
1975        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx4", 1);
1976        let replenisher = Arc::new(CountingReplenisher::new());
1977        let mut rx = Rx::<u32>::unbound();
1978        rx.bind(rx_inner);
1979        rx.replenisher.inner = Some(replenisher.clone());
1980
1981        let encoded = vox_postcard::to_vec(&123_u32).expect("serialize test item");
1982        let item = SelfRef::owning(
1983            Backing::Boxed(Box::<[u8]>::default()),
1984            ChannelItem {
1985                item: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
1986            },
1987        );
1988        tx.send(IncomingChannelMessage::Item(item))
1989            .await
1990            .expect("send item");
1991
1992        let value = rx
1993            .recv()
1994            .await
1995            .expect("recv should succeed")
1996            .expect("expected item");
1997        assert_eq!(*value.get(), 123_u32);
1998        assert_eq!(replenisher.calls.load(Ordering::Acquire), 1);
1999    }
2000
2001    #[test]
2002    fn rx_drop_notifies_replenisher() {
2003        let (_tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_drop", 1);
2004        let replenisher = Arc::new(CountingReplenisher::new());
2005        let mut rx = Rx::<u32>::unbound();
2006        rx.bind(rx_inner);
2007        rx.replenisher.inner = Some(replenisher.clone());
2008
2009        drop(rx);
2010
2011        assert_eq!(replenisher.dropped.load(Ordering::Acquire), 1);
2012    }
2013
2014    #[tokio::test]
2015    async fn rx_drop_after_close_does_not_notify_replenisher() {
2016        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_drop_closed", 1);
2017        let replenisher = Arc::new(CountingReplenisher::new());
2018        let mut rx = Rx::<u32>::unbound();
2019        rx.bind(rx_inner);
2020        rx.replenisher.inner = Some(replenisher.clone());
2021
2022        let close = SelfRef::owning(
2023            Backing::Boxed(Box::<[u8]>::default()),
2024            ChannelClose {
2025                metadata: Metadata::default(),
2026            },
2027        );
2028        tx.send(IncomingChannelMessage::Close(close))
2029            .await
2030            .expect("send close");
2031
2032        assert!(rx.recv().await.expect("recv should succeed").is_none());
2033        drop(rx);
2034
2035        assert_eq!(replenisher.dropped.load(Ordering::Acquire), 0);
2036    }
2037
2038    #[test]
2039    fn logical_rx_drop_notifies_replenisher() {
2040        let (_tx, rx_inner) = channel_mailbox("vox_types.channel.test.logical_rx_drop", 1);
2041        let replenisher = Arc::new(CountingReplenisher::new());
2042        let core = Arc::new(ChannelCore::new(ChannelDebugContext::default()));
2043        core.bind_retryable_receiver(BoundChannelReceiver {
2044            receiver: rx_inner,
2045            liveness: None,
2046            replenisher: Some(replenisher.clone()),
2047        });
2048
2049        let rx = Rx::<u32>::paired(core);
2050        drop(rx);
2051
2052        assert_eq!(replenisher.dropped.load(Ordering::Acquire), 1);
2053    }
2054
2055    #[tokio::test]
2056    async fn rx_recv_logical_receiver_decodes_items_and_notifies_replenisher() {
2057        let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx5", 1);
2058        let replenisher = Arc::new(CountingReplenisher::new());
2059        let core = Arc::new(ChannelCore::new(ChannelDebugContext::default()));
2060        core.bind_retryable_receiver(BoundChannelReceiver {
2061            receiver: rx_inner,
2062            liveness: None,
2063            replenisher: Some(replenisher.clone()),
2064        });
2065
2066        let mut rx = Rx::<u32>::paired(core);
2067
2068        let encoded = vox_postcard::to_vec(&321_u32).expect("serialize test item");
2069        let item = SelfRef::owning(
2070            Backing::Boxed(Box::<[u8]>::default()),
2071            ChannelItem {
2072                item: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
2073            },
2074        );
2075        tx.send(IncomingChannelMessage::Item(item))
2076            .await
2077            .expect("send item");
2078
2079        let value = rx
2080            .recv()
2081            .await
2082            .expect("recv should succeed")
2083            .expect("expected item");
2084        assert_eq!(*value.get(), 321_u32);
2085        assert_eq!(replenisher.calls.load(Ordering::Acquire), 1);
2086    }
2087
2088    // ========================================================================
2089    // Channel binding through ser/deser
2090    // ========================================================================
2091
2092    /// A test binder that tracks allocations and bindings.
2093    struct TestBinder {
2094        next_id: std::sync::Mutex<u64>,
2095    }
2096
2097    impl TestBinder {
2098        fn new() -> Self {
2099            Self {
2100                next_id: std::sync::Mutex::new(100),
2101            }
2102        }
2103
2104        fn alloc_id(&self) -> ChannelId {
2105            let mut guard = self.next_id.lock().unwrap();
2106            let id = *guard;
2107            *guard += 2;
2108            ChannelId(id)
2109        }
2110    }
2111
2112    impl ChannelBinder for TestBinder {
2113        fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
2114            (self.alloc_id(), Arc::new(CountingSink::new()))
2115        }
2116
2117        fn create_rx(&self) -> (ChannelId, BoundChannelReceiver) {
2118            let (tx, rx) = channel_mailbox("vox_types.channel.test.bind_retryable1", 8);
2119            // Keep the sender alive by leaking it — test only.
2120            std::mem::forget(tx);
2121            (
2122                self.alloc_id(),
2123                BoundChannelReceiver {
2124                    receiver: rx,
2125                    liveness: None,
2126                    replenisher: None,
2127                },
2128            )
2129        }
2130
2131        fn bind_tx(&self, _channel_id: ChannelId) -> Arc<dyn ChannelSink> {
2132            Arc::new(CountingSink::new())
2133        }
2134
2135        fn register_rx(&self, _channel_id: ChannelId) -> BoundChannelReceiver {
2136            let (tx, rx) = channel_mailbox("vox_types.channel.test.bind_retryable2", 8);
2137            std::mem::forget(tx);
2138            BoundChannelReceiver {
2139                receiver: rx,
2140                liveness: None,
2141                replenisher: None,
2142            }
2143        }
2144    }
2145
2146    // Case 1: Caller passes Tx in args, keeps paired Rx.
2147    // Serializing the Tx allocates a channel ID via create_rx() and stores
2148    // the receiver in the shared logical core so the kept Rx can survive retries.
2149    #[tokio::test]
2150    async fn case1_serialize_tx_allocates_and_binds_paired_rx() {
2151        use facet::Facet;
2152
2153        #[derive(Facet)]
2154        struct Args {
2155            data: u32,
2156            tx: Tx<u32>,
2157        }
2158
2159        let (tx, rx) = channel::<u32>();
2160        let args = Args { data: 42, tx };
2161
2162        let binder = TestBinder::new();
2163        let bytes =
2164            with_channel_binder(&binder, || vox_postcard::to_vec(&args).expect("serialize"));
2165
2166        // The channel ID should be in the serialized bytes (after the u32 data field).
2167        assert!(!bytes.is_empty());
2168
2169        // The kept Rx should now have a receiver binding in the shared core.
2170        assert!(
2171            rx.core.inner.is_some(),
2172            "paired Rx should have a shared core"
2173        );
2174        let core = rx.core.inner.as_ref().unwrap();
2175        assert!(
2176            core.take_logical_receiver().is_some(),
2177            "core should have a logical receiver binding from create_rx()"
2178        );
2179    }
2180
2181    // Case 2: Caller passes Rx in args, keeps paired Tx.
2182    // Serializing the Rx allocates a channel ID via create_tx() and stores
2183    // the sink in the shared core so the kept Tx can use it.
2184    #[test]
2185    fn case2_serialize_rx_allocates_and_binds_paired_tx() {
2186        use facet::Facet;
2187
2188        #[derive(Facet)]
2189        struct Args {
2190            data: u32,
2191            rx: Rx<u32>,
2192        }
2193
2194        let (tx, rx) = channel::<u32>();
2195        let args = Args { data: 42, rx };
2196
2197        let binder = TestBinder::new();
2198        let bytes =
2199            with_channel_binder(&binder, || vox_postcard::to_vec(&args).expect("serialize"));
2200
2201        assert!(!bytes.is_empty());
2202
2203        // The kept Tx should now have a sink binding in the shared core.
2204        assert!(tx.core.inner.is_some());
2205        let core = tx.core.inner.as_ref().unwrap();
2206        assert!(
2207            core.get_sink().is_some(),
2208            "core should have a Sink binding from create_tx()"
2209        );
2210    }
2211
2212    // Case 3: Callee deserializes Tx from args.
2213    // The Tx is bound directly via bind_tx() during deserialization.
2214    #[test]
2215    fn case3_deserialize_tx_binds_via_binder() {
2216        use facet::Facet;
2217
2218        #[derive(Facet)]
2219        struct Args {
2220            data: u32,
2221            tx: Tx<u32>,
2222        }
2223
2224        // Simulate wire bytes: a u32 (42) followed by a channel ID (varint 7).
2225        let mut bytes = vox_postcard::to_vec(&42_u32).unwrap();
2226        bytes.extend_from_slice(&vox_postcard::to_vec(&ChannelId(7)).unwrap());
2227
2228        let binder = TestBinder::new();
2229        let args: Args = with_channel_binder(&binder, || {
2230            vox_postcard::from_slice(&bytes).expect("deserialize")
2231        });
2232
2233        assert_eq!(args.data, 42);
2234        assert_eq!(args.tx.channel_id, ChannelId(7));
2235        assert!(
2236            args.tx.is_bound(),
2237            "deserialized Tx should be bound via bind_tx()"
2238        );
2239    }
2240
2241    // Case 4: Callee deserializes Rx from args.
2242    // The Rx is bound directly via register_rx() during deserialization.
2243    #[test]
2244    fn case4_deserialize_rx_binds_via_binder() {
2245        use facet::Facet;
2246
2247        #[derive(Facet)]
2248        struct Args {
2249            data: u32,
2250            rx: Rx<u32>,
2251        }
2252
2253        // Simulate wire bytes: a u32 (42) followed by a channel ID (varint 7).
2254        let mut bytes = vox_postcard::to_vec(&42_u32).unwrap();
2255        bytes.extend_from_slice(&vox_postcard::to_vec(&ChannelId(7)).unwrap());
2256
2257        let binder = TestBinder::new();
2258        let args: Args = with_channel_binder(&binder, || {
2259            vox_postcard::from_slice(&bytes).expect("deserialize")
2260        });
2261
2262        assert_eq!(args.data, 42);
2263        assert_eq!(args.rx.channel_id, ChannelId(7));
2264        assert!(
2265            args.rx.is_bound(),
2266            "deserialized Rx should be bound via register_rx()"
2267        );
2268    }
2269
2270    // Round-trip: serialize with caller binder, deserialize with callee binder.
2271    // Verifies the channel ID allocated during serialization appears in the
2272    // deserialized handle.
2273    #[test]
2274    fn channel_id_round_trips_through_ser_deser() {
2275        use facet::Facet;
2276
2277        #[derive(Facet)]
2278        struct Args {
2279            tx: Tx<u32>,
2280        }
2281
2282        let (tx, _rx) = channel::<u32>();
2283        let args = Args { tx };
2284
2285        let caller_binder = TestBinder::new();
2286        let bytes = with_channel_binder(&caller_binder, || {
2287            vox_postcard::to_vec(&args).expect("serialize")
2288        });
2289
2290        let callee_binder = TestBinder::new();
2291        let deserialized: Args = with_channel_binder(&callee_binder, || {
2292            vox_postcard::from_slice(&bytes).expect("deserialize")
2293        });
2294
2295        // The caller binder starts at ID 100, so the deserialized Tx should have that ID.
2296        assert_eq!(deserialized.tx.channel_id, ChannelId(100));
2297        assert!(deserialized.tx.is_bound());
2298    }
2299}