1#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
2use std::collections::HashMap;
3use std::collections::VecDeque;
4use std::marker::PhantomData;
5use std::panic::Location;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::Mutex;
9#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
10use std::sync::OnceLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use facet::Facet;
14#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
15use facet_core::ConstTypeId;
16use facet_core::PtrConst;
17#[cfg(target_arch = "wasm32")]
18use moire::sync::TryAcquireError;
19use moire::sync::{Notify, Semaphore};
20#[cfg(not(target_arch = "wasm32"))]
21use tokio::sync::TryAcquireError;
22
23use crate::{Backing, ChannelClose, ChannelItem, ChannelReset, Metadata, Payload, SelfRef};
24use crate::{
25 ChannelCloseReason, ChannelDebugContext, ChannelEvent, ChannelEventContext, ChannelResetReason,
26 ChannelSendOutcome, ChannelTrySendOutcome, ConnectionCloseReason, SourceLocation,
27 VoxObserverHandle,
28};
29use crate::{ChannelId, ConnectionId};
30
31#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
32struct ChannelDecodePlan {
33 plan: vox_postcard::TranslationPlan,
34 registry: vox_schema::SchemaRegistry,
35}
36
37std::thread_local! {
43 static CHANNEL_BINDER: std::cell::RefCell<Option<&'static dyn ChannelBinder>> =
44 const { std::cell::RefCell::new(None) };
45}
46
47pub fn with_channel_binder<R>(binder: &dyn ChannelBinder, f: impl FnOnce() -> R) -> R {
52 let _guard = set_channel_binder(binder);
53 f()
54}
55
56pub fn set_channel_binder(binder: &dyn ChannelBinder) -> ChannelBinderGuard<'_> {
63 #[allow(unsafe_code)]
66 let static_ref: &'static dyn ChannelBinder = unsafe { std::mem::transmute(binder) };
67 let prev = CHANNEL_BINDER.with(|cell| cell.borrow_mut().replace(static_ref));
68 ChannelBinderGuard {
69 prev,
70 _lifetime: std::marker::PhantomData,
71 }
72}
73
74pub struct ChannelBinderGuard<'a> {
76 prev: Option<&'static dyn ChannelBinder>,
77 _lifetime: std::marker::PhantomData<&'a dyn ChannelBinder>,
78}
79
80impl Drop for ChannelBinderGuard<'_> {
81 fn drop(&mut self) {
82 CHANNEL_BINDER.with(|cell| {
83 *cell.borrow_mut() = self.prev.take();
84 });
85 }
86}
87
88pub enum ChannelBinding {
91 Sink(BoundChannelSink),
92 Receiver(BoundChannelReceiver),
93}
94
95pub trait ChannelLiveness: crate::MaybeSend + crate::MaybeSync + 'static {}
96
97impl<T: crate::MaybeSend + crate::MaybeSync + 'static> ChannelLiveness for T {}
98
99pub type ChannelLivenessHandle = Arc<dyn ChannelLiveness>;
100
101pub trait ChannelCreditReplenisher: crate::MaybeSend + crate::MaybeSync + 'static {
102 fn on_item_consumed(&self);
103
104 fn on_receiver_dropped(&self) {}
105
106 fn channel_id(&self) -> Option<ChannelId> {
107 None
108 }
109
110 fn connection_id(&self) -> Option<ConnectionId> {
111 None
112 }
113
114 fn debug_context(&self) -> Option<ChannelDebugContext> {
115 None
116 }
117
118 fn observer(&self) -> Option<VoxObserverHandle> {
119 None
120 }
121}
122
123pub type ChannelCreditReplenisherHandle = Arc<dyn ChannelCreditReplenisher>;
124
125#[derive(Clone)]
126pub struct BoundChannelSink {
127 pub sink: Arc<dyn ChannelSink>,
128 pub liveness: Option<ChannelLivenessHandle>,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ChannelMailboxStats {
133 pub len: usize,
134 pub capacity: usize,
135 pub receiver_closed: bool,
136 pub sender_count: usize,
137}
138
139pub struct ChannelMailboxSendError<T> {
140 item: T,
141}
142
143impl<T> std::fmt::Debug for ChannelMailboxSendError<T> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("ChannelMailboxSendError")
146 .finish_non_exhaustive()
147 }
148}
149
150impl<T> ChannelMailboxSendError<T> {
151 pub fn into_inner(self) -> T {
152 self.item
153 }
154}
155
156struct ChannelMailboxState<T> {
157 inner: Mutex<ChannelMailboxInner<T>>,
158 not_empty: Notify,
159 not_full: Notify,
160}
161
162struct ChannelMailboxInner<T> {
163 queue: VecDeque<T>,
164 capacity: usize,
165 receiver_closed: bool,
166 sender_count: usize,
167}
168
169pub struct ChannelMailboxSender<T> {
171 state: Arc<ChannelMailboxState<T>>,
172}
173
174pub struct ChannelMailboxReceiver<T> {
175 state: Arc<ChannelMailboxState<T>>,
176}
177
178pub fn channel_mailbox<T>(
179 name: &'static str,
180 capacity: usize,
181) -> (ChannelMailboxSender<T>, ChannelMailboxReceiver<T>) {
182 assert!(capacity > 0, "channel mailbox capacity must be non-zero");
183 let state = Arc::new(ChannelMailboxState {
184 inner: Mutex::new(ChannelMailboxInner {
185 queue: VecDeque::with_capacity(capacity),
186 capacity,
187 receiver_closed: false,
188 sender_count: 1,
189 }),
190 not_empty: Notify::new(name),
191 not_full: Notify::new(name),
192 });
193 (
194 ChannelMailboxSender {
195 state: Arc::clone(&state),
196 },
197 ChannelMailboxReceiver { state },
198 )
199}
200
201impl<T> Clone for ChannelMailboxSender<T> {
202 fn clone(&self) -> Self {
203 let mut guard = self
204 .state
205 .inner
206 .lock()
207 .expect("channel mailbox mutex poisoned");
208 guard.sender_count = guard.sender_count.saturating_add(1);
209 drop(guard);
210 Self {
211 state: Arc::clone(&self.state),
212 }
213 }
214}
215
216impl<T> Drop for ChannelMailboxSender<T> {
217 fn drop(&mut self) {
218 let mut guard = self
219 .state
220 .inner
221 .lock()
222 .expect("channel mailbox mutex poisoned");
223 guard.sender_count = guard.sender_count.saturating_sub(1);
224 let closed = guard.sender_count == 0;
225 drop(guard);
226 if closed {
227 self.state.not_empty.notify_waiters();
228 self.state.not_full.notify_waiters();
229 }
230 }
231}
232
233impl<T> ChannelMailboxSender<T> {
234 pub async fn send(&self, item: T) -> Result<(), ChannelMailboxSendError<T>> {
235 let mut item = Some(item);
236 loop {
237 let notified = {
238 let mut guard = self
239 .state
240 .inner
241 .lock()
242 .expect("channel mailbox mutex poisoned");
243 if guard.receiver_closed {
244 return Err(ChannelMailboxSendError {
245 item: item.take().expect("mailbox item already sent"),
246 });
247 }
248 if guard.queue.len() < guard.capacity {
249 guard
250 .queue
251 .push_back(item.take().expect("mailbox item already sent"));
252 drop(guard);
253 self.state.not_empty.notify_waiters();
254 return Ok(());
255 }
256 self.state.not_full.notified()
257 };
258 notified.await;
259 }
260 }
261
262 pub fn force_send(&self, item: T) -> Result<(), ChannelMailboxSendError<T>> {
263 let mut guard = self
264 .state
265 .inner
266 .lock()
267 .expect("channel mailbox mutex poisoned");
268 if guard.receiver_closed {
269 return Err(ChannelMailboxSendError { item });
270 }
271 guard.queue.push_back(item);
272 drop(guard);
273 self.state.not_empty.notify_waiters();
274 Ok(())
275 }
276
277 pub fn stats(&self) -> ChannelMailboxStats {
278 self.state.stats()
279 }
280}
281
282impl<T> Drop for ChannelMailboxReceiver<T> {
283 fn drop(&mut self) {
284 let mut guard = self
285 .state
286 .inner
287 .lock()
288 .expect("channel mailbox mutex poisoned");
289 guard.receiver_closed = true;
290 guard.queue.clear();
291 drop(guard);
292 self.state.not_full.notify_waiters();
293 self.state.not_empty.notify_waiters();
294 }
295}
296
297impl<T> ChannelMailboxReceiver<T> {
298 pub async fn recv(&mut self) -> Option<T> {
299 loop {
300 let notified = {
301 let mut guard = self
302 .state
303 .inner
304 .lock()
305 .expect("channel mailbox mutex poisoned");
306 if let Some(item) = guard.queue.pop_front() {
307 drop(guard);
308 self.state.not_full.notify_waiters();
309 return Some(item);
310 }
311 if guard.sender_count == 0 {
312 return None;
313 }
314 self.state.not_empty.notified()
315 };
316 notified.await;
317 }
318 }
319
320 pub fn stats(&self) -> ChannelMailboxStats {
321 self.state.stats()
322 }
323}
324
325impl<T> ChannelMailboxState<T> {
326 fn stats(&self) -> ChannelMailboxStats {
327 let guard = self.inner.lock().expect("channel mailbox mutex poisoned");
328 ChannelMailboxStats {
329 len: guard.queue.len(),
330 capacity: guard.capacity,
331 receiver_closed: guard.receiver_closed,
332 sender_count: guard.sender_count,
333 }
334 }
335}
336
337pub struct BoundChannelReceiver {
338 pub receiver: ChannelMailboxReceiver<IncomingChannelMessage>,
339 pub liveness: Option<ChannelLivenessHandle>,
340 pub replenisher: Option<ChannelCreditReplenisherHandle>,
341}
342
343struct LogicalReceiverState {
344 generation: u64,
345 liveness: Option<ChannelLivenessHandle>,
346 replenisher: Option<ChannelCreditReplenisherHandle>,
347 sender: Option<ChannelMailboxSender<LogicalIncomingChannelMessage>>,
348 receiver: Option<ChannelMailboxReceiver<LogicalIncomingChannelMessage>>,
349}
350
351pub struct ChannelCore {
358 binding: Mutex<Option<ChannelBinding>>,
359 logical_receiver: Mutex<Option<LogicalReceiverState>>,
360 binding_changed: Notify,
361 debug_context: ChannelDebugContext,
362}
363
364impl ChannelCore {
365 fn new(debug_context: ChannelDebugContext) -> Self {
366 Self {
367 binding: Mutex::new(None),
368 logical_receiver: Mutex::new(None),
369 binding_changed: Notify::new("vox_types.channel.binding_changed"),
370 debug_context,
371 }
372 }
373
374 pub fn set_binding(&self, binding: ChannelBinding) {
376 let mut guard = self.binding.lock().expect("channel core mutex poisoned");
377 *guard = Some(binding);
378 self.binding_changed.notify_waiters();
379 }
380
381 pub fn get_sink(&self) -> Option<Arc<dyn ChannelSink>> {
384 let guard = self.binding.lock().expect("channel core mutex poisoned");
385 match guard.as_ref() {
386 Some(ChannelBinding::Sink(bound)) => Some(bound.sink.clone()),
387 _ => None,
388 }
389 }
390
391 pub fn take_receiver(&self) -> Option<BoundChannelReceiver> {
394 let mut guard = self.binding.lock().expect("channel core mutex poisoned");
395 match guard.take() {
396 Some(ChannelBinding::Receiver(bound)) => Some(bound),
397 other => {
398 *guard = other;
400 None
401 }
402 }
403 }
404
405 pub fn bind_retryable_receiver(self: &Arc<Self>, bound: BoundChannelReceiver) {
406 #[cfg(not(target_arch = "wasm32"))]
407 if tokio::runtime::Handle::try_current().is_err() {
408 self.set_binding(ChannelBinding::Receiver(bound));
409 return;
410 }
411
412 let mut guard = self
413 .logical_receiver
414 .lock()
415 .expect("channel core logical receiver mutex poisoned");
416 let state = guard.get_or_insert_with(|| {
417 let (tx, rx) = channel_mailbox("vox_types.channel.logical_receiver", 64);
418 LogicalReceiverState {
419 generation: 0,
420 liveness: None,
421 replenisher: None,
422 sender: Some(tx),
423 receiver: Some(rx),
424 }
425 });
426 state.generation = state.generation.wrapping_add(1);
427 state.liveness = bound.liveness.clone();
428 state.replenisher = bound.replenisher.clone();
429 let generation = state.generation;
430
431 let Some(sender) = state.sender.clone() else {
432 return;
433 };
434
435 self.binding_changed.notify_waiters();
436
437 drop(guard);
438 let core = Arc::clone(self);
439
440 moire::task::spawn(async move {
441 let mut receiver = bound.receiver;
442 let replenisher = bound.replenisher.clone();
443 while let Some(msg) = receiver.recv().await {
444 let is_current_generation = {
445 let guard = core
446 .logical_receiver
447 .lock()
448 .expect("channel core logical receiver mutex poisoned");
449 guard
450 .as_ref()
451 .map(|state| state.generation == generation)
452 .unwrap_or(false)
453 };
454 if !is_current_generation {
455 return;
456 }
457 let forwarded = LogicalIncomingChannelMessage {
458 msg,
459 replenisher: replenisher.clone(),
460 };
461 if sender.send(forwarded).await.is_err() {
462 return;
463 }
464 }
465 });
466 }
467
468 pub fn take_logical_receiver(
469 &self,
470 ) -> Option<(
471 ChannelMailboxReceiver<LogicalIncomingChannelMessage>,
472 Option<ChannelLivenessHandle>,
473 Option<ChannelCreditReplenisherHandle>,
474 )> {
475 self.logical_receiver
476 .lock()
477 .expect("channel core logical receiver mutex poisoned")
478 .as_mut()
479 .and_then(|state| {
480 state
481 .receiver
482 .take()
483 .map(|receiver| (receiver, state.liveness.clone(), state.replenisher.clone()))
484 })
485 }
486
487 pub fn finish_retry_binding(&self) {
488 let mut guard = self
489 .logical_receiver
490 .lock()
491 .expect("channel core logical receiver mutex poisoned");
492 if let Some(state) = guard.as_mut() {
493 if let Some(sender) = state.sender.as_ref() {
494 let close = SelfRef::owning(
495 Backing::Boxed(Box::<[u8]>::default()),
496 ChannelClose {
497 metadata: Metadata::default(),
498 },
499 );
500 let _ = sender.force_send(LogicalIncomingChannelMessage {
501 msg: IncomingChannelMessage::Close(close),
502 replenisher: None,
503 });
504 }
505 state.sender.take();
506 }
507 *guard = None;
508 let mut guard = self.binding.lock().expect("channel core mutex poisoned");
509 *guard = None;
510 self.binding_changed.notify_waiters();
511 }
512
513 pub fn debug_context(&self) -> ChannelDebugContext {
514 self.debug_context
515 }
516}
517
518#[derive(Facet)]
520#[facet(opaque)]
521pub(crate) struct CoreSlot {
522 pub(crate) inner: Option<Arc<ChannelCore>>,
523}
524
525impl CoreSlot {
526 pub(crate) fn empty() -> Self {
527 Self { inner: None }
528 }
529}
530
531#[track_caller]
539pub fn channel<T>() -> (Tx<T>, Rx<T>) {
540 let caller = Location::caller();
541 let debug_context = ChannelDebugContext {
542 type_name: Some(std::any::type_name::<T>()),
543 source_location: Some(SourceLocation {
544 file: caller.file(),
545 line: caller.line(),
546 column: caller.column(),
547 }),
548 ..ChannelDebugContext::default()
549 };
550 let core = Arc::new(ChannelCore::new(debug_context));
551 (Tx::paired(core.clone()), Rx::paired(core))
552}
553
554fn merge_debug_context(
555 primary: Option<ChannelDebugContext>,
556 fallback: ChannelDebugContext,
557) -> Option<ChannelDebugContext> {
558 match (
559 primary.and_then(ChannelDebugContext::into_option),
560 fallback.into_option(),
561 ) {
562 (Some(primary), Some(fallback)) => ChannelDebugContext {
563 label: primary.label.or(fallback.label),
564 type_name: primary.type_name.or(fallback.type_name),
565 source_location: primary.source_location.or(fallback.source_location),
566 service: primary.service.or(fallback.service),
567 method: primary.method.or(fallback.method),
568 }
569 .into_option(),
570 (Some(primary), None) => Some(primary),
571 (None, fallback) => fallback,
572 }
573}
574
575fn sink_event_context(
576 sink: &dyn ChannelSink,
577 channel_id: ChannelId,
578 fallback: ChannelDebugContext,
579) -> ChannelEventContext {
580 ChannelEventContext {
581 connection_id: sink.connection_id(),
582 channel_id,
583 debug: merge_debug_context(sink.debug_context(), fallback),
584 }
585}
586
587fn replenisher_event_context(
588 replenisher: &dyn ChannelCreditReplenisher,
589 channel_id: ChannelId,
590 fallback: ChannelDebugContext,
591) -> ChannelEventContext {
592 ChannelEventContext {
593 connection_id: replenisher.connection_id(),
594 channel_id,
595 debug: merge_debug_context(replenisher.debug_context(), fallback),
596 }
597}
598
599fn observe_sink_channel(
600 sink: &dyn ChannelSink,
601 channel_id: Option<ChannelId>,
602 fallback: ChannelDebugContext,
603 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
604) {
605 if let (Some(observer), Some(channel_id)) = (sink.observer(), channel_id) {
606 observer.channel_event(event(sink_event_context(sink, channel_id, fallback)));
607 }
608}
609
610fn observe_replenisher_channel(
611 replenisher: &dyn ChannelCreditReplenisher,
612 fallback: ChannelDebugContext,
613 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
614) {
615 if let (Some(observer), Some(channel_id)) = (replenisher.observer(), replenisher.channel_id()) {
616 observer.channel_event(event(replenisher_event_context(
617 replenisher,
618 channel_id,
619 fallback,
620 )));
621 }
622}
623
624fn observe_optional_replenisher_channel(
625 replenisher: Option<&ChannelCreditReplenisherHandle>,
626 fallback: ChannelDebugContext,
627 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
628) {
629 if let Some(replenisher) = replenisher {
630 observe_replenisher_channel(replenisher.as_ref(), fallback, event);
631 }
632}
633
634#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
635fn channel_decode_plan<T: Facet<'static>>() -> Arc<ChannelDecodePlan> {
636 static PLANS: OnceLock<Mutex<HashMap<ConstTypeId, Arc<ChannelDecodePlan>>>> = OnceLock::new();
637
638 let plans = PLANS.get_or_init(|| Mutex::new(HashMap::new()));
639 let key = T::SHAPE.id;
640 let mut plans = plans.lock().expect("channel decode plan mutex poisoned");
641 plans
642 .entry(key)
643 .or_insert_with(|| {
644 Arc::new(ChannelDecodePlan {
645 plan: vox_postcard::build_identity_plan(T::SHAPE),
646 registry: vox_schema::SchemaRegistry::new(),
647 })
648 })
649 .clone()
650}
651
652#[cfg(all(feature = "jit", not(target_arch = "wasm32")))]
653fn decode_channel_payload<T: Facet<'static>>(bytes: &'static [u8]) -> Result<T, RxError> {
654 if vox_jit::require_pure_jit() && vox_jit::force_fallback() {
655 panic!(
656 "VOX_JIT_REQUIRE_PURE=1 but channel payload decode for '{}' was forced off by VOX_CODEC",
657 T::SHAPE
658 );
659 }
660
661 let resolved = channel_decode_plan::<T>();
662 match vox_jit::global_runtime().try_decode_borrowed::<T>(
663 bytes,
664 0,
665 &resolved.plan,
666 &resolved.registry,
667 ) {
668 Some(result) => result.map_err(RxError::Deserialize),
669 None if vox_jit::require_pure_jit() => {
670 panic!(
671 "VOX_JIT_REQUIRE_PURE=1 but channel payload decode for '{}' did not use the JIT",
672 T::SHAPE
673 )
674 }
675 None => vox_postcard::from_slice_borrowed(bytes).map_err(RxError::Deserialize),
676 }
677}
678
679#[cfg(not(all(feature = "jit", not(target_arch = "wasm32"))))]
680fn decode_channel_payload<T: Facet<'static>>(bytes: &'static [u8]) -> Result<T, RxError> {
681 vox_postcard::from_slice_borrowed(bytes).map_err(RxError::Deserialize)
682}
683
684fn decode_channel_item<T>(msg: SelfRef<ChannelItem<'static>>) -> Result<Option<SelfRef<T>>, RxError>
685where
686 T: Facet<'static>,
687{
688 msg.try_repack(|item, _backing_bytes| {
689 let Payload::PostcardBytes(bytes) = item.item else {
690 return Err(RxError::Protocol(
691 "incoming channel item payload was not Incoming".into(),
692 ));
693 };
694 decode_channel_payload(bytes)
695 })
696 .map(Some)
697}
698
699fn handle_incoming_channel_message<T>(
700 msg: Option<IncomingChannelMessage>,
701 replenisher: Option<&ChannelCreditReplenisherHandle>,
702 debug_context: ChannelDebugContext,
703 closed: &AtomicBool,
704) -> Result<Option<SelfRef<T>>, RxError>
705where
706 T: Facet<'static>,
707{
708 match msg {
709 Some(IncomingChannelMessage::Close(_)) => {
710 observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
711 ChannelEvent::Closed {
712 channel,
713 reason: ChannelCloseReason::Remote,
714 }
715 });
716 closed.store(true, Ordering::Release);
717 Ok(None)
718 }
719 Some(IncomingChannelMessage::ConnectionClosed(reason)) => {
720 observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
721 ChannelEvent::Closed {
722 channel,
723 reason: ChannelCloseReason::ConnectionClosed,
724 }
725 });
726 closed.store(true, Ordering::Release);
727 Err(RxError::ConnectionClosed(reason))
728 }
729 None => {
730 observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
731 ChannelEvent::Closed {
732 channel,
733 reason: ChannelCloseReason::Unknown,
734 }
735 });
736 closed.store(true, Ordering::Release);
737 Ok(None)
738 }
739 Some(IncomingChannelMessage::Reset(_)) => {
740 observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
741 ChannelEvent::Reset {
742 channel,
743 reason: ChannelResetReason::Remote,
744 }
745 });
746 closed.store(true, Ordering::Release);
747 Err(RxError::Reset)
748 }
749 Some(IncomingChannelMessage::Item(msg)) => {
750 let value = decode_channel_item(msg);
751 if value.is_ok() {
752 observe_optional_replenisher_channel(replenisher, debug_context, |channel| {
753 ChannelEvent::ItemConsumed { channel }
754 });
755 if let Some(replenisher) = replenisher {
756 replenisher.on_item_consumed();
757 }
758 }
759 value
760 }
761 }
762}
763
764pub trait ChannelSink: crate::MaybeSend + crate::MaybeSync + 'static {
769 fn send_payload<'payload>(
770 &self,
771 payload: Payload<'payload>,
772 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>>;
773
774 fn channel_id(&self) -> Option<ChannelId> {
775 None
776 }
777
778 fn connection_id(&self) -> Option<ConnectionId> {
779 None
780 }
781
782 fn debug_context(&self) -> Option<ChannelDebugContext> {
783 None
784 }
785
786 fn observer(&self) -> Option<VoxObserverHandle> {
787 None
788 }
789
790 #[doc(hidden)]
791 fn note_send_started(&self) {}
792
793 #[doc(hidden)]
794 fn note_send_waiting_for_credit(&self) {}
795
796 #[doc(hidden)]
797 fn note_send_finished(&self, _outcome: ChannelSendOutcome) {}
798
799 #[doc(hidden)]
800 fn note_try_send_outcome(&self, _outcome: ChannelTrySendOutcome) {}
801
802 #[doc(hidden)]
803 fn try_send_payload_with_outcome<'payload>(
804 &self,
805 payload: Payload<'payload>,
806 ) -> Result<(), ChannelTrySendOutcome> {
807 self.try_send_payload(payload).map_err(|err| match err {
808 TrySendError::Full(()) => ChannelTrySendOutcome::FullRuntimeQueue,
809 TrySendError::Closed(()) => ChannelTrySendOutcome::Closed,
810 })
811 }
812
813 fn try_send_payload<'payload>(
815 &self,
816 _payload: Payload<'payload>,
817 ) -> Result<(), TrySendError<()>> {
818 Err(TrySendError::Full(()))
819 }
820
821 fn close_channel(
822 &self,
823 metadata: Metadata,
824 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>>;
825
826 fn close_channel_on_drop(&self) {}
832}
833
834pub struct CreditSink<S: ChannelSink> {
842 inner: S,
843 credit: Arc<Semaphore>,
844}
845
846impl<S: ChannelSink> CreditSink<S> {
847 pub fn new(inner: S, initial_credit: u32) -> Self {
851 Self {
852 inner,
853 credit: Arc::new(Semaphore::new(
854 "vox_types.channel.credit",
855 initial_credit as usize,
856 )),
857 }
858 }
859
860 pub fn credit(&self) -> &Arc<Semaphore> {
863 &self.credit
864 }
865}
866
867impl<S: ChannelSink> ChannelSink for CreditSink<S> {
868 fn send_payload<'payload>(
869 &self,
870 payload: Payload<'payload>,
871 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
872 let credit = self.credit.clone();
873 let channel_id = self.channel_id();
874 if credit.available_permits() == 0 {
875 self.inner.note_send_waiting_for_credit();
876 observe_sink_channel(
877 self,
878 channel_id,
879 ChannelDebugContext::default(),
880 |channel| ChannelEvent::SendWaitingForCredit { channel },
881 );
882 }
883 let fut = self.inner.send_payload(payload);
884 Box::pin(async move {
885 let permit = credit
886 .acquire_owned()
887 .await
888 .map_err(|_| TxError::Transport("channel credit semaphore closed".into()))?;
889 std::mem::forget(permit);
890 fut.await
891 })
892 }
893
894 fn channel_id(&self) -> Option<ChannelId> {
895 self.inner.channel_id()
896 }
897
898 fn connection_id(&self) -> Option<ConnectionId> {
899 self.inner.connection_id()
900 }
901
902 fn debug_context(&self) -> Option<ChannelDebugContext> {
903 self.inner.debug_context()
904 }
905
906 fn observer(&self) -> Option<VoxObserverHandle> {
907 self.inner.observer()
908 }
909
910 fn note_send_started(&self) {
911 self.inner.note_send_started();
912 }
913
914 fn note_send_waiting_for_credit(&self) {
915 self.inner.note_send_waiting_for_credit();
916 }
917
918 fn note_send_finished(&self, outcome: ChannelSendOutcome) {
919 self.inner.note_send_finished(outcome);
920 }
921
922 fn note_try_send_outcome(&self, outcome: ChannelTrySendOutcome) {
923 self.inner.note_try_send_outcome(outcome);
924 }
925
926 fn try_send_payload_with_outcome<'payload>(
928 &self,
929 payload: Payload<'payload>,
930 ) -> Result<(), ChannelTrySendOutcome> {
931 let permit = self.credit.try_acquire_owned().map_err(|err| match err {
932 TryAcquireError::NoPermits => ChannelTrySendOutcome::FullCredit,
933 TryAcquireError::Closed => ChannelTrySendOutcome::Closed,
934 })?;
935
936 match self.inner.try_send_payload_with_outcome(payload) {
937 Ok(()) => {
938 std::mem::forget(permit);
939 Ok(())
940 }
941 Err(err) => Err(err),
942 }
943 }
944
945 fn close_channel(
946 &self,
947 metadata: Metadata,
948 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
949 self.inner.close_channel(metadata)
951 }
952
953 fn close_channel_on_drop(&self) {
954 self.inner.close_channel_on_drop();
955 }
956}
957
958pub enum IncomingChannelMessage {
960 Item(SelfRef<ChannelItem<'static>>),
961 Close(SelfRef<ChannelClose<'static>>),
962 Reset(SelfRef<ChannelReset<'static>>),
963 ConnectionClosed(ConnectionCloseReason),
965}
966
967pub struct LogicalIncomingChannelMessage {
968 pub msg: IncomingChannelMessage,
969 pub replenisher: Option<ChannelCreditReplenisherHandle>,
970}
971
972#[derive(Facet)]
974#[facet(opaque)]
975pub(crate) struct SinkSlot {
976 pub(crate) inner: Option<Arc<dyn ChannelSink>>,
977}
978
979impl SinkSlot {
980 pub(crate) fn empty() -> Self {
981 Self { inner: None }
982 }
983}
984
985#[derive(Facet)]
987#[facet(opaque)]
988pub(crate) struct LivenessSlot {
989 pub(crate) inner: Option<ChannelLivenessHandle>,
990}
991
992impl LivenessSlot {
993 pub(crate) fn empty() -> Self {
994 Self { inner: None }
995 }
996}
997
998#[derive(Facet)]
1000#[facet(opaque)]
1001pub(crate) struct ReceiverSlot {
1002 pub(crate) inner: Option<ChannelMailboxReceiver<IncomingChannelMessage>>,
1003}
1004
1005impl ReceiverSlot {
1006 pub(crate) fn empty() -> Self {
1007 Self { inner: None }
1008 }
1009}
1010
1011#[derive(Facet)]
1012#[facet(opaque)]
1013pub(crate) struct LogicalReceiverSlot {
1014 pub(crate) inner: Option<ChannelMailboxReceiver<LogicalIncomingChannelMessage>>,
1015}
1016
1017impl LogicalReceiverSlot {
1018 pub(crate) fn empty() -> Self {
1019 Self { inner: None }
1020 }
1021}
1022
1023#[derive(Facet)]
1025#[facet(opaque)]
1026pub(crate) struct ReplenisherSlot {
1027 pub(crate) inner: Option<ChannelCreditReplenisherHandle>,
1028}
1029
1030impl ReplenisherSlot {
1031 pub(crate) fn empty() -> Self {
1032 Self { inner: None }
1033 }
1034}
1035
1036#[derive(Facet)]
1046#[facet(proxy = crate::ChannelId)]
1047pub struct Tx<T> {
1048 pub(crate) channel_id: ChannelId,
1049 pub(crate) sink: SinkSlot,
1050 pub(crate) core: CoreSlot,
1051 pub(crate) liveness: LivenessSlot,
1052 #[facet(opaque)]
1053 debug_context: ChannelDebugContext,
1054 #[facet(opaque)]
1055 closed: AtomicBool,
1056 #[facet(opaque)]
1057 _marker: PhantomData<T>,
1058}
1059
1060impl<T> Tx<T> {
1061 #[track_caller]
1063 pub fn unbound() -> Self {
1064 let caller = Location::caller();
1065 Self::unbound_with_context(ChannelDebugContext {
1066 type_name: Some(std::any::type_name::<T>()),
1067 source_location: Some(SourceLocation {
1068 file: caller.file(),
1069 line: caller.line(),
1070 column: caller.column(),
1071 }),
1072 ..ChannelDebugContext::default()
1073 })
1074 }
1075
1076 fn unbound_with_context(debug_context: ChannelDebugContext) -> Self {
1077 Self {
1078 channel_id: ChannelId::RESERVED,
1079 sink: SinkSlot::empty(),
1080 core: CoreSlot::empty(),
1081 liveness: LivenessSlot::empty(),
1082 debug_context,
1083 closed: AtomicBool::new(false),
1084 _marker: PhantomData,
1085 }
1086 }
1087
1088 fn paired(core: Arc<ChannelCore>) -> Self {
1090 let debug_context = core.debug_context();
1091 Self {
1092 channel_id: ChannelId::RESERVED,
1093 sink: SinkSlot::empty(),
1094 core: CoreSlot { inner: Some(core) },
1095 liveness: LivenessSlot::empty(),
1096 debug_context,
1097 closed: AtomicBool::new(false),
1098 _marker: PhantomData,
1099 }
1100 }
1101
1102 pub fn debug_context(&self) -> ChannelDebugContext {
1103 self.debug_context
1104 }
1105
1106 pub fn is_bound(&self) -> bool {
1107 if self.sink.inner.is_some() {
1108 return true;
1109 }
1110 if let Some(core) = &self.core.inner {
1111 return core.get_sink().is_some();
1112 }
1113 false
1114 }
1115
1116 pub fn has_core(&self) -> bool {
1118 self.core.inner.is_some()
1119 }
1120
1121 fn resolve_sink_now(&self) -> Option<Arc<dyn ChannelSink>> {
1123 if let Some(sink) = &self.sink.inner {
1125 return Some(sink.clone());
1126 }
1127 if let Some(core) = &self.core.inner
1129 && let Some(sink) = core.get_sink()
1130 {
1131 return Some(sink);
1132 }
1133 None
1134 }
1135
1136 fn channel_id_for_sink(&self, sink: &dyn ChannelSink) -> Option<ChannelId> {
1137 if self.channel_id == ChannelId::RESERVED {
1138 sink.channel_id()
1139 } else {
1140 Some(self.channel_id)
1141 }
1142 }
1143
1144 fn observe_sink_event(
1145 &self,
1146 sink: &dyn ChannelSink,
1147 channel_id: Option<ChannelId>,
1148 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
1149 ) {
1150 observe_sink_channel(sink, channel_id, self.debug_context, event);
1151 }
1152
1153 fn observe_try_send(
1154 &self,
1155 sink: &dyn ChannelSink,
1156 channel_id: Option<ChannelId>,
1157 outcome: ChannelTrySendOutcome,
1158 ) {
1159 self.observe_sink_event(sink, channel_id, |channel| ChannelEvent::TrySend {
1160 channel,
1161 outcome,
1162 });
1163 }
1164
1165 pub async fn send<'value>(&self, value: T) -> Result<(), TxError>
1166 where
1167 T: Facet<'value>,
1168 {
1169 let sink = if let Some(sink) = self.resolve_sink_now() {
1170 sink
1171 } else if let Some(core) = &self.core.inner {
1172 loop {
1173 let notified = core.binding_changed.notified();
1174 if let Some(sink) = self.resolve_sink_now() {
1175 break sink;
1176 }
1177 notified.await;
1178 }
1179 } else {
1180 return Err(TxError::Unbound);
1181 };
1182 let channel_id = self.channel_id_for_sink(sink.as_ref());
1183 sink.note_send_started();
1184 self.observe_sink_event(sink.as_ref(), channel_id, |channel| {
1185 ChannelEvent::SendStarted { channel }
1186 });
1187 let started_at = crate::time::Instant::now();
1188 let ptr = PtrConst::new((&value as *const T).cast::<u8>());
1189 let payload = unsafe { Payload::outgoing_unchecked(ptr, T::SHAPE) };
1192 let result = sink.send_payload(payload).await;
1193 let outcome = match &result {
1194 Ok(()) => ChannelSendOutcome::Sent,
1195 Err(TxError::Transport(message)) if message == "channel closed" => {
1196 ChannelSendOutcome::Closed
1197 }
1198 Err(_) => ChannelSendOutcome::TransportError,
1199 };
1200 self.observe_sink_event(sink.as_ref(), channel_id, |channel| {
1201 ChannelEvent::SendFinished {
1202 channel,
1203 outcome,
1204 elapsed: started_at.elapsed(),
1205 }
1206 });
1207 sink.note_send_finished(outcome);
1208 drop(value);
1209 result
1210 }
1211
1212 pub fn try_send<'value>(&self, value: T) -> Result<(), TrySendError<T>>
1214 where
1215 T: Facet<'value>,
1216 {
1217 if self.closed.load(Ordering::Acquire) {
1218 return Err(TrySendError::Closed(value));
1219 }
1220
1221 let Some(sink) = self.resolve_sink_now() else {
1222 return Err(TrySendError::Full(value));
1223 };
1224 let channel_id = self.channel_id_for_sink(sink.as_ref());
1225
1226 let ptr = PtrConst::new((&value as *const T).cast::<u8>());
1227 let payload = unsafe { Payload::outgoing_unchecked(ptr, T::SHAPE) };
1230 match sink.try_send_payload_with_outcome(payload) {
1231 Ok(()) => {
1232 sink.note_try_send_outcome(ChannelTrySendOutcome::Sent);
1233 self.observe_try_send(sink.as_ref(), channel_id, ChannelTrySendOutcome::Sent);
1234 drop(value);
1235 Ok(())
1236 }
1237 Err(ChannelTrySendOutcome::Closed) => {
1238 sink.note_try_send_outcome(ChannelTrySendOutcome::Closed);
1239 self.observe_try_send(sink.as_ref(), channel_id, ChannelTrySendOutcome::Closed);
1240 self.closed.store(true, Ordering::Release);
1241 Err(TrySendError::Closed(value))
1242 }
1243 Err(outcome) => {
1244 sink.note_try_send_outcome(outcome);
1245 self.observe_try_send(sink.as_ref(), channel_id, outcome);
1246 Err(TrySendError::Full(value))
1247 }
1248 }
1249 }
1250
1251 pub async fn close<'value>(&self, metadata: Metadata<'value>) -> Result<(), TxError> {
1253 self.closed.store(true, Ordering::Release);
1254 let sink = if let Some(sink) = self.resolve_sink_now() {
1255 sink
1256 } else if let Some(core) = &self.core.inner {
1257 loop {
1258 let notified = core.binding_changed.notified();
1259 if let Some(sink) = self.resolve_sink_now() {
1260 break sink;
1261 }
1262 notified.await;
1263 }
1264 } else {
1265 return Err(TxError::Unbound);
1266 };
1267 sink.close_channel(metadata).await
1268 }
1269
1270 #[doc(hidden)]
1271 pub fn bind(&mut self, sink: Arc<dyn ChannelSink>) {
1272 self.bind_with_liveness(sink, None);
1273 }
1274
1275 #[doc(hidden)]
1276 pub fn bind_with_liveness(
1277 &mut self,
1278 sink: Arc<dyn ChannelSink>,
1279 liveness: Option<ChannelLivenessHandle>,
1280 ) {
1281 self.sink.inner = Some(sink);
1282 self.liveness.inner = liveness;
1283 }
1284
1285 #[doc(hidden)]
1286 pub fn finish_retry_binding(&self) {
1287 if let Some(core) = &self.core.inner {
1288 core.finish_retry_binding();
1289 }
1290 }
1291}
1292
1293impl<T> Drop for Tx<T> {
1294 fn drop(&mut self) {
1296 if self.closed.swap(true, Ordering::AcqRel) {
1297 return;
1298 }
1299
1300 let sink = if let Some(sink) = &self.sink.inner {
1301 Some(sink.clone())
1302 } else if let Some(core) = &self.core.inner {
1303 core.get_sink()
1304 } else {
1305 None
1306 };
1307
1308 let Some(sink) = sink else {
1309 return;
1310 };
1311
1312 sink.close_channel_on_drop();
1314 }
1315}
1316
1317impl<T> TryFrom<&Tx<T>> for ChannelId {
1318 type Error = String;
1319
1320 fn try_from(value: &Tx<T>) -> Result<Self, Self::Error> {
1321 CHANNEL_BINDER.with(|cell| {
1325 let borrow = cell.borrow();
1326 let Some(binder) = *borrow else {
1327 return Err("serializing Tx requires an active ChannelBinder".to_string());
1328 };
1329 let (channel_id, bound) = binder.create_rx_with_context(Some(value.debug_context));
1330 if let Some(core) = &value.core.inner {
1331 core.bind_retryable_receiver(bound);
1332 }
1333 Ok(channel_id)
1334 })
1335 }
1336}
1337
1338impl<T> TryFrom<ChannelId> for Tx<T> {
1339 type Error = String;
1340
1341 fn try_from(channel_id: ChannelId) -> Result<Self, Self::Error> {
1342 let debug_context = ChannelDebugContext {
1343 type_name: Some(std::any::type_name::<T>()),
1344 ..ChannelDebugContext::default()
1345 };
1346 let mut tx = Self::unbound_with_context(debug_context);
1347 tx.channel_id = channel_id;
1348
1349 CHANNEL_BINDER.with(|cell| {
1350 let Some(binder) = *cell.borrow() else {
1351 return Err("deserializing Tx requires an active ChannelBinder".to_string());
1352 };
1353 let sink = binder.bind_tx_with_context(channel_id, Some(debug_context));
1354 let liveness = binder.channel_liveness();
1355 tx.bind_with_liveness(sink, liveness);
1356 Ok(())
1357 })?;
1358
1359 Ok(tx)
1360 }
1361}
1362
1363#[derive(Debug)]
1365pub enum TxError {
1366 Unbound,
1367 Transport(String),
1368}
1369
1370impl std::fmt::Display for TxError {
1371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1372 match self {
1373 Self::Unbound => write!(f, "channel is not bound"),
1374 Self::Transport(msg) => write!(f, "transport error: {msg}"),
1375 }
1376 }
1377}
1378
1379impl std::error::Error for TxError {}
1380
1381#[derive(Debug, Clone, PartialEq, Eq)]
1383pub enum TrySendError<T> {
1384 Full(T),
1386 Closed(T),
1388}
1389
1390impl<T> TrySendError<T> {
1391 pub fn into_inner(self) -> T {
1392 match self {
1393 Self::Full(value) | Self::Closed(value) => value,
1394 }
1395 }
1396}
1397
1398impl<T> std::fmt::Display for TrySendError<T> {
1399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1400 match self {
1401 Self::Full(_) => write!(f, "channel is full"),
1402 Self::Closed(_) => write!(f, "channel is closed"),
1403 }
1404 }
1405}
1406
1407impl<T: std::fmt::Debug> std::error::Error for TrySendError<T> {}
1408
1409#[derive(Facet)]
1415#[facet(proxy = crate::ChannelId)]
1416pub struct Rx<T> {
1417 pub(crate) channel_id: ChannelId,
1418 pub(crate) receiver: ReceiverSlot,
1419 pub(crate) logical_receiver: LogicalReceiverSlot,
1420 pub(crate) core: CoreSlot,
1421 pub(crate) liveness: LivenessSlot,
1422 pub(crate) replenisher: ReplenisherSlot,
1423 #[facet(opaque)]
1424 debug_context: ChannelDebugContext,
1425 #[facet(opaque)]
1426 closed: AtomicBool,
1427 #[facet(opaque)]
1428 _marker: PhantomData<T>,
1429}
1430
1431impl<T> Rx<T> {
1432 #[track_caller]
1434 pub fn unbound() -> Self {
1435 let caller = Location::caller();
1436 Self::unbound_with_context(ChannelDebugContext {
1437 type_name: Some(std::any::type_name::<T>()),
1438 source_location: Some(SourceLocation {
1439 file: caller.file(),
1440 line: caller.line(),
1441 column: caller.column(),
1442 }),
1443 ..ChannelDebugContext::default()
1444 })
1445 }
1446
1447 fn unbound_with_context(debug_context: ChannelDebugContext) -> Self {
1448 Self {
1449 channel_id: ChannelId::RESERVED,
1450 receiver: ReceiverSlot::empty(),
1451 logical_receiver: LogicalReceiverSlot::empty(),
1452 core: CoreSlot::empty(),
1453 liveness: LivenessSlot::empty(),
1454 replenisher: ReplenisherSlot::empty(),
1455 debug_context,
1456 closed: AtomicBool::new(false),
1457 _marker: PhantomData,
1458 }
1459 }
1460
1461 fn paired(core: Arc<ChannelCore>) -> Self {
1463 let debug_context = core.debug_context();
1464 Self {
1465 channel_id: ChannelId::RESERVED,
1466 receiver: ReceiverSlot::empty(),
1467 logical_receiver: LogicalReceiverSlot::empty(),
1468 core: CoreSlot { inner: Some(core) },
1469 liveness: LivenessSlot::empty(),
1470 replenisher: ReplenisherSlot::empty(),
1471 debug_context,
1472 closed: AtomicBool::new(false),
1473 _marker: PhantomData,
1474 }
1475 }
1476
1477 pub fn debug_context(&self) -> ChannelDebugContext {
1478 self.debug_context
1479 }
1480
1481 pub fn is_bound(&self) -> bool {
1482 self.receiver.inner.is_some()
1483 }
1484
1485 pub fn has_core(&self) -> bool {
1487 self.core.inner.is_some()
1488 }
1489
1490 pub async fn recv(&mut self) -> Result<Option<SelfRef<T>>, RxError>
1492 where
1493 T: Facet<'static>,
1494 {
1495 loop {
1496 if self.logical_receiver.inner.is_none()
1497 && let Some(core) = &self.core.inner
1498 && let Some((receiver, liveness, replenisher)) = core.take_logical_receiver()
1499 {
1500 self.logical_receiver.inner = Some(receiver);
1501 self.liveness.inner = liveness;
1502 self.replenisher.inner = replenisher;
1503 }
1504
1505 if let Some(receiver) = self.logical_receiver.inner.as_mut() {
1506 let received = receiver.recv().await;
1507 return match received {
1508 Some(LogicalIncomingChannelMessage { msg, replenisher }) => {
1509 handle_incoming_channel_message(
1510 Some(msg),
1511 replenisher.as_ref(),
1512 self.debug_context,
1513 &self.closed,
1514 )
1515 }
1516 None => handle_incoming_channel_message(
1517 None,
1518 None,
1519 self.debug_context,
1520 &self.closed,
1521 ),
1522 };
1523 }
1524
1525 if self.receiver.inner.is_none()
1526 && let Some(core) = &self.core.inner
1527 && let Some(bound) = core.take_receiver()
1528 {
1529 self.receiver.inner = Some(bound.receiver);
1530 self.liveness.inner = bound.liveness;
1531 self.replenisher.inner = bound.replenisher;
1532 }
1533
1534 if let Some(receiver) = self.receiver.inner.as_mut() {
1535 return handle_incoming_channel_message(
1536 receiver.recv().await,
1537 self.replenisher.inner.as_ref(),
1538 self.debug_context,
1539 &self.closed,
1540 );
1541 }
1542
1543 let Some(core) = &self.core.inner else {
1544 return Err(RxError::Unbound);
1545 };
1546 core.binding_changed.notified().await;
1547 }
1548 }
1549 #[doc(hidden)]
1550 pub fn bind(&mut self, receiver: ChannelMailboxReceiver<IncomingChannelMessage>) {
1551 self.bind_with_liveness(receiver, None);
1552 }
1553
1554 #[doc(hidden)]
1555 pub fn bind_with_liveness(
1556 &mut self,
1557 receiver: ChannelMailboxReceiver<IncomingChannelMessage>,
1558 liveness: Option<ChannelLivenessHandle>,
1559 ) {
1560 self.receiver.inner = Some(receiver);
1561 self.logical_receiver.inner = None;
1562 self.liveness.inner = liveness;
1563 self.replenisher.inner = None;
1564 self.closed.store(false, Ordering::Release);
1565 }
1566}
1567
1568impl<T> Drop for Rx<T> {
1569 fn drop(&mut self) {
1571 if self.closed.swap(true, Ordering::AcqRel) {
1572 return;
1573 }
1574
1575 if self.replenisher.inner.is_none()
1576 && let Some(core) = &self.core.inner
1577 {
1578 if let Some((_receiver, _liveness, replenisher)) = core.take_logical_receiver() {
1579 self.replenisher.inner = replenisher;
1580 } else if let Some(bound) = core.take_receiver() {
1581 self.replenisher.inner = bound.replenisher;
1582 }
1583 }
1584
1585 if let Some(replenisher) = &self.replenisher.inner {
1586 observe_replenisher_channel(replenisher.as_ref(), self.debug_context, |channel| {
1587 ChannelEvent::Reset {
1588 channel,
1589 reason: ChannelResetReason::ReceiverDropped,
1590 }
1591 });
1592 replenisher.on_receiver_dropped();
1593 }
1594 }
1595}
1596
1597impl<T> TryFrom<&Rx<T>> for ChannelId {
1598 type Error = String;
1599
1600 fn try_from(value: &Rx<T>) -> Result<Self, Self::Error> {
1601 CHANNEL_BINDER.with(|cell| {
1605 let borrow = cell.borrow();
1606 let Some(binder) = *borrow else {
1607 return Err("serializing Rx requires an active ChannelBinder".to_string());
1608 };
1609 let (channel_id, sink) = binder.create_tx_with_context(Some(value.debug_context));
1610 let liveness = binder.channel_liveness();
1611 if let Some(core) = &value.core.inner {
1612 core.set_binding(ChannelBinding::Sink(BoundChannelSink { sink, liveness }));
1613 }
1614 Ok(channel_id)
1615 })
1616 }
1617}
1618
1619impl<T> TryFrom<ChannelId> for Rx<T> {
1620 type Error = String;
1621
1622 fn try_from(channel_id: ChannelId) -> Result<Self, Self::Error> {
1623 let debug_context = ChannelDebugContext {
1624 type_name: Some(std::any::type_name::<T>()),
1625 ..ChannelDebugContext::default()
1626 };
1627 let mut rx = Self::unbound_with_context(debug_context);
1628 rx.channel_id = channel_id;
1629
1630 CHANNEL_BINDER.with(|cell| {
1631 let Some(binder) = *cell.borrow() else {
1632 return Err("deserializing Rx requires an active ChannelBinder".to_string());
1633 };
1634 let bound = binder.register_rx_with_context(channel_id, Some(debug_context));
1635 rx.receiver.inner = Some(bound.receiver);
1636 rx.liveness.inner = bound.liveness;
1637 rx.replenisher.inner = bound.replenisher;
1638 Ok(())
1639 })?;
1640
1641 Ok(rx)
1642 }
1643}
1644
1645#[derive(Debug)]
1647pub enum RxError {
1648 Unbound,
1649 Reset,
1650 ConnectionClosed(ConnectionCloseReason),
1651 Deserialize(vox_postcard::error::DeserializeError),
1652 Protocol(String),
1653}
1654
1655impl std::fmt::Display for RxError {
1656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1657 match self {
1658 Self::Unbound => write!(f, "channel is not bound"),
1659 Self::Reset => write!(f, "channel reset by peer"),
1660 Self::ConnectionClosed(reason) => {
1661 write!(f, "connection closed while receiving channel: {reason:?}")
1662 }
1663 Self::Deserialize(e) => write!(f, "deserialize error: {e}"),
1664 Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
1665 }
1666 }
1667}
1668
1669impl std::error::Error for RxError {}
1670
1671pub fn is_tx(shape: &facet_core::Shape) -> bool {
1673 shape.decl_id == Tx::<()>::SHAPE.decl_id
1674}
1675
1676pub fn is_rx(shape: &facet_core::Shape) -> bool {
1678 shape.decl_id == Rx::<()>::SHAPE.decl_id
1679}
1680
1681pub fn is_channel(shape: &facet_core::Shape) -> bool {
1683 is_tx(shape) || is_rx(shape)
1684}
1685
1686pub trait ChannelBinder: crate::MaybeSend + crate::MaybeSync {
1687 fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>);
1690
1691 fn create_tx_with_context(
1692 &self,
1693 debug_context: Option<ChannelDebugContext>,
1694 ) -> (ChannelId, Arc<dyn ChannelSink>) {
1695 let _ = debug_context;
1696 self.create_tx()
1697 }
1698
1699 fn create_rx(&self) -> (ChannelId, BoundChannelReceiver);
1701
1702 fn create_rx_with_context(
1703 &self,
1704 debug_context: Option<ChannelDebugContext>,
1705 ) -> (ChannelId, BoundChannelReceiver) {
1706 let _ = debug_context;
1707 self.create_rx()
1708 }
1709
1710 fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink>;
1714
1715 fn bind_tx_with_context(
1716 &self,
1717 channel_id: ChannelId,
1718 debug_context: Option<ChannelDebugContext>,
1719 ) -> Arc<dyn ChannelSink> {
1720 let _ = debug_context;
1721 self.bind_tx(channel_id)
1722 }
1723
1724 fn register_rx(&self, channel_id: ChannelId) -> BoundChannelReceiver;
1728
1729 fn register_rx_with_context(
1730 &self,
1731 channel_id: ChannelId,
1732 debug_context: Option<ChannelDebugContext>,
1733 ) -> BoundChannelReceiver {
1734 let _ = debug_context;
1735 self.register_rx(channel_id)
1736 }
1737
1738 fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
1741 None
1742 }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748 use crate::{Backing, ChannelClose, ChannelItem, ChannelReset, Metadata, SelfRef};
1749 use std::sync::atomic::{AtomicUsize, Ordering};
1750
1751 struct CountingSink {
1752 send_calls: AtomicUsize,
1753 close_calls: AtomicUsize,
1754 close_on_drop_calls: AtomicUsize,
1755 }
1756
1757 impl CountingSink {
1758 fn new() -> Self {
1759 Self {
1760 send_calls: AtomicUsize::new(0),
1761 close_calls: AtomicUsize::new(0),
1762 close_on_drop_calls: AtomicUsize::new(0),
1763 }
1764 }
1765 }
1766
1767 impl ChannelSink for CountingSink {
1768 fn send_payload<'payload>(
1769 &self,
1770 _payload: Payload<'payload>,
1771 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
1772 self.send_calls.fetch_add(1, Ordering::AcqRel);
1773 Box::pin(async { Ok(()) })
1774 }
1775
1776 fn close_channel(
1777 &self,
1778 _metadata: Metadata,
1779 ) -> Pin<Box<dyn crate::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
1780 self.close_calls.fetch_add(1, Ordering::AcqRel);
1781 Box::pin(async { Ok(()) })
1782 }
1783
1784 fn close_channel_on_drop(&self) {
1785 self.close_on_drop_calls.fetch_add(1, Ordering::AcqRel);
1786 }
1787 }
1788
1789 struct CountingReplenisher {
1790 calls: AtomicUsize,
1791 dropped: AtomicUsize,
1792 }
1793
1794 impl CountingReplenisher {
1795 fn new() -> Self {
1796 Self {
1797 calls: AtomicUsize::new(0),
1798 dropped: AtomicUsize::new(0),
1799 }
1800 }
1801 }
1802
1803 impl ChannelCreditReplenisher for CountingReplenisher {
1804 fn on_item_consumed(&self) {
1805 self.calls.fetch_add(1, Ordering::AcqRel);
1806 }
1807
1808 fn on_receiver_dropped(&self) {
1809 self.dropped.fetch_add(1, Ordering::AcqRel);
1810 }
1811 }
1812
1813 #[tokio::test]
1814 async fn tx_close_does_not_emit_drop_close_after_explicit_close() {
1815 let sink_impl = Arc::new(CountingSink::new());
1816 let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1817
1818 let mut tx = Tx::<u32>::unbound();
1819 tx.bind(sink);
1820 tx.close(Metadata::default())
1821 .await
1822 .expect("close should succeed");
1823 drop(tx);
1824
1825 assert_eq!(sink_impl.close_calls.load(Ordering::Acquire), 1);
1826 assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 0);
1827 }
1828
1829 #[test]
1830 fn tx_drop_emits_close_on_drop_for_bound_sink() {
1831 let sink_impl = Arc::new(CountingSink::new());
1832 let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1833
1834 let mut tx = Tx::<u32>::unbound();
1835 tx.bind(sink);
1836 drop(tx);
1837
1838 assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 1);
1839 }
1840
1841 #[test]
1842 fn tx_drop_emits_close_on_drop_for_paired_core_binding() {
1843 let sink_impl = Arc::new(CountingSink::new());
1844 let sink: Arc<dyn ChannelSink> = sink_impl.clone();
1845
1846 let (tx, _rx) = channel::<u32>();
1847 let core = tx.core.inner.as_ref().expect("paired tx should have core");
1848 core.set_binding(ChannelBinding::Sink(BoundChannelSink {
1849 sink,
1850 liveness: None,
1851 }));
1852 drop(tx);
1853
1854 assert_eq!(sink_impl.close_on_drop_calls.load(Ordering::Acquire), 1);
1855 }
1856
1857 #[test]
1859 fn channel_pair_captures_source_location_and_type_context() {
1860 let expected_line = line!() + 1;
1861 let (tx, rx) = channel::<u32>();
1862
1863 for context in [tx.debug_context(), rx.debug_context()] {
1864 assert_eq!(context.type_name, Some(std::any::type_name::<u32>()));
1865 let location = context
1866 .source_location
1867 .expect("channel should capture source location");
1868 assert_eq!(location.file, file!());
1869 assert_eq!(location.line, expected_line);
1870 }
1871 }
1872
1873 #[tokio::test]
1874 async fn rx_recv_returns_unbound_when_not_bound() {
1875 let mut rx = Rx::<u32>::unbound();
1876 let err = match rx.recv().await {
1877 Ok(_) => panic!("unbound rx should fail"),
1878 Err(err) => err,
1879 };
1880 assert!(matches!(err, RxError::Unbound));
1881 }
1882
1883 #[tokio::test]
1884 async fn rx_recv_returns_none_on_close() {
1885 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx1", 1);
1886 let mut rx = Rx::<u32>::unbound();
1887 rx.bind(rx_inner);
1888
1889 let close = SelfRef::owning(
1890 Backing::Boxed(Box::<[u8]>::default()),
1891 ChannelClose {
1892 metadata: Metadata::default(),
1893 },
1894 );
1895 tx.send(IncomingChannelMessage::Close(close))
1896 .await
1897 .expect("send close");
1898
1899 assert!(rx.recv().await.expect("recv should succeed").is_none());
1900 }
1901
1902 #[tokio::test]
1903 async fn rx_recv_returns_reset_error() {
1904 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx2", 1);
1905 let mut rx = Rx::<u32>::unbound();
1906 rx.bind(rx_inner);
1907
1908 let reset = SelfRef::owning(
1909 Backing::Boxed(Box::<[u8]>::default()),
1910 ChannelReset {
1911 metadata: Metadata::default(),
1912 },
1913 );
1914 tx.send(IncomingChannelMessage::Reset(reset))
1915 .await
1916 .expect("send reset");
1917
1918 let err = match rx.recv().await {
1919 Ok(_) => panic!("reset should be surfaced as error"),
1920 Err(err) => err,
1921 };
1922 assert!(matches!(err, RxError::Reset));
1923 }
1924
1925 #[tokio::test]
1927 async fn rx_recv_surfaces_connection_closed() {
1928 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_connection_closed", 1);
1929 let mut rx = Rx::<u32>::unbound();
1930 rx.bind(rx_inner);
1931
1932 tx.send(IncomingChannelMessage::ConnectionClosed(
1933 ConnectionCloseReason::Protocol,
1934 ))
1935 .await
1936 .expect("send connection close");
1937
1938 let err = match rx.recv().await {
1939 Ok(_) => panic!("connection closure should be surfaced as error"),
1940 Err(err) => err,
1941 };
1942 assert!(matches!(
1943 err,
1944 RxError::ConnectionClosed(ConnectionCloseReason::Protocol)
1945 ));
1946 }
1947
1948 #[tokio::test]
1949 async fn rx_recv_rejects_outgoing_payload_variant_as_protocol_error() {
1950 static VALUE: u32 = 42;
1951
1952 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx3", 1);
1953 let mut rx = Rx::<u32>::unbound();
1954 rx.bind(rx_inner);
1955
1956 let item = SelfRef::owning(
1957 Backing::Boxed(Box::<[u8]>::default()),
1958 ChannelItem {
1959 item: Payload::outgoing(&VALUE),
1960 },
1961 );
1962 tx.send(IncomingChannelMessage::Item(item))
1963 .await
1964 .expect("send item");
1965
1966 let err = match rx.recv().await {
1967 Ok(_) => panic!("outgoing payload should be protocol error"),
1968 Err(err) => err,
1969 };
1970 assert!(matches!(err, RxError::Protocol(_)));
1971 }
1972
1973 #[tokio::test]
1974 async fn rx_recv_notifies_replenisher_after_consuming_an_item() {
1975 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx4", 1);
1976 let replenisher = Arc::new(CountingReplenisher::new());
1977 let mut rx = Rx::<u32>::unbound();
1978 rx.bind(rx_inner);
1979 rx.replenisher.inner = Some(replenisher.clone());
1980
1981 let encoded = vox_postcard::to_vec(&123_u32).expect("serialize test item");
1982 let item = SelfRef::owning(
1983 Backing::Boxed(Box::<[u8]>::default()),
1984 ChannelItem {
1985 item: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
1986 },
1987 );
1988 tx.send(IncomingChannelMessage::Item(item))
1989 .await
1990 .expect("send item");
1991
1992 let value = rx
1993 .recv()
1994 .await
1995 .expect("recv should succeed")
1996 .expect("expected item");
1997 assert_eq!(*value.get(), 123_u32);
1998 assert_eq!(replenisher.calls.load(Ordering::Acquire), 1);
1999 }
2000
2001 #[test]
2002 fn rx_drop_notifies_replenisher() {
2003 let (_tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_drop", 1);
2004 let replenisher = Arc::new(CountingReplenisher::new());
2005 let mut rx = Rx::<u32>::unbound();
2006 rx.bind(rx_inner);
2007 rx.replenisher.inner = Some(replenisher.clone());
2008
2009 drop(rx);
2010
2011 assert_eq!(replenisher.dropped.load(Ordering::Acquire), 1);
2012 }
2013
2014 #[tokio::test]
2015 async fn rx_drop_after_close_does_not_notify_replenisher() {
2016 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx_drop_closed", 1);
2017 let replenisher = Arc::new(CountingReplenisher::new());
2018 let mut rx = Rx::<u32>::unbound();
2019 rx.bind(rx_inner);
2020 rx.replenisher.inner = Some(replenisher.clone());
2021
2022 let close = SelfRef::owning(
2023 Backing::Boxed(Box::<[u8]>::default()),
2024 ChannelClose {
2025 metadata: Metadata::default(),
2026 },
2027 );
2028 tx.send(IncomingChannelMessage::Close(close))
2029 .await
2030 .expect("send close");
2031
2032 assert!(rx.recv().await.expect("recv should succeed").is_none());
2033 drop(rx);
2034
2035 assert_eq!(replenisher.dropped.load(Ordering::Acquire), 0);
2036 }
2037
2038 #[test]
2039 fn logical_rx_drop_notifies_replenisher() {
2040 let (_tx, rx_inner) = channel_mailbox("vox_types.channel.test.logical_rx_drop", 1);
2041 let replenisher = Arc::new(CountingReplenisher::new());
2042 let core = Arc::new(ChannelCore::new(ChannelDebugContext::default()));
2043 core.bind_retryable_receiver(BoundChannelReceiver {
2044 receiver: rx_inner,
2045 liveness: None,
2046 replenisher: Some(replenisher.clone()),
2047 });
2048
2049 let rx = Rx::<u32>::paired(core);
2050 drop(rx);
2051
2052 assert_eq!(replenisher.dropped.load(Ordering::Acquire), 1);
2053 }
2054
2055 #[tokio::test]
2056 async fn rx_recv_logical_receiver_decodes_items_and_notifies_replenisher() {
2057 let (tx, rx_inner) = channel_mailbox("vox_types.channel.test.rx5", 1);
2058 let replenisher = Arc::new(CountingReplenisher::new());
2059 let core = Arc::new(ChannelCore::new(ChannelDebugContext::default()));
2060 core.bind_retryable_receiver(BoundChannelReceiver {
2061 receiver: rx_inner,
2062 liveness: None,
2063 replenisher: Some(replenisher.clone()),
2064 });
2065
2066 let mut rx = Rx::<u32>::paired(core);
2067
2068 let encoded = vox_postcard::to_vec(&321_u32).expect("serialize test item");
2069 let item = SelfRef::owning(
2070 Backing::Boxed(Box::<[u8]>::default()),
2071 ChannelItem {
2072 item: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
2073 },
2074 );
2075 tx.send(IncomingChannelMessage::Item(item))
2076 .await
2077 .expect("send item");
2078
2079 let value = rx
2080 .recv()
2081 .await
2082 .expect("recv should succeed")
2083 .expect("expected item");
2084 assert_eq!(*value.get(), 321_u32);
2085 assert_eq!(replenisher.calls.load(Ordering::Acquire), 1);
2086 }
2087
2088 struct TestBinder {
2094 next_id: std::sync::Mutex<u64>,
2095 }
2096
2097 impl TestBinder {
2098 fn new() -> Self {
2099 Self {
2100 next_id: std::sync::Mutex::new(100),
2101 }
2102 }
2103
2104 fn alloc_id(&self) -> ChannelId {
2105 let mut guard = self.next_id.lock().unwrap();
2106 let id = *guard;
2107 *guard += 2;
2108 ChannelId(id)
2109 }
2110 }
2111
2112 impl ChannelBinder for TestBinder {
2113 fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
2114 (self.alloc_id(), Arc::new(CountingSink::new()))
2115 }
2116
2117 fn create_rx(&self) -> (ChannelId, BoundChannelReceiver) {
2118 let (tx, rx) = channel_mailbox("vox_types.channel.test.bind_retryable1", 8);
2119 std::mem::forget(tx);
2121 (
2122 self.alloc_id(),
2123 BoundChannelReceiver {
2124 receiver: rx,
2125 liveness: None,
2126 replenisher: None,
2127 },
2128 )
2129 }
2130
2131 fn bind_tx(&self, _channel_id: ChannelId) -> Arc<dyn ChannelSink> {
2132 Arc::new(CountingSink::new())
2133 }
2134
2135 fn register_rx(&self, _channel_id: ChannelId) -> BoundChannelReceiver {
2136 let (tx, rx) = channel_mailbox("vox_types.channel.test.bind_retryable2", 8);
2137 std::mem::forget(tx);
2138 BoundChannelReceiver {
2139 receiver: rx,
2140 liveness: None,
2141 replenisher: None,
2142 }
2143 }
2144 }
2145
2146 #[tokio::test]
2150 async fn case1_serialize_tx_allocates_and_binds_paired_rx() {
2151 use facet::Facet;
2152
2153 #[derive(Facet)]
2154 struct Args {
2155 data: u32,
2156 tx: Tx<u32>,
2157 }
2158
2159 let (tx, rx) = channel::<u32>();
2160 let args = Args { data: 42, tx };
2161
2162 let binder = TestBinder::new();
2163 let bytes =
2164 with_channel_binder(&binder, || vox_postcard::to_vec(&args).expect("serialize"));
2165
2166 assert!(!bytes.is_empty());
2168
2169 assert!(
2171 rx.core.inner.is_some(),
2172 "paired Rx should have a shared core"
2173 );
2174 let core = rx.core.inner.as_ref().unwrap();
2175 assert!(
2176 core.take_logical_receiver().is_some(),
2177 "core should have a logical receiver binding from create_rx()"
2178 );
2179 }
2180
2181 #[test]
2185 fn case2_serialize_rx_allocates_and_binds_paired_tx() {
2186 use facet::Facet;
2187
2188 #[derive(Facet)]
2189 struct Args {
2190 data: u32,
2191 rx: Rx<u32>,
2192 }
2193
2194 let (tx, rx) = channel::<u32>();
2195 let args = Args { data: 42, rx };
2196
2197 let binder = TestBinder::new();
2198 let bytes =
2199 with_channel_binder(&binder, || vox_postcard::to_vec(&args).expect("serialize"));
2200
2201 assert!(!bytes.is_empty());
2202
2203 assert!(tx.core.inner.is_some());
2205 let core = tx.core.inner.as_ref().unwrap();
2206 assert!(
2207 core.get_sink().is_some(),
2208 "core should have a Sink binding from create_tx()"
2209 );
2210 }
2211
2212 #[test]
2215 fn case3_deserialize_tx_binds_via_binder() {
2216 use facet::Facet;
2217
2218 #[derive(Facet)]
2219 struct Args {
2220 data: u32,
2221 tx: Tx<u32>,
2222 }
2223
2224 let mut bytes = vox_postcard::to_vec(&42_u32).unwrap();
2226 bytes.extend_from_slice(&vox_postcard::to_vec(&ChannelId(7)).unwrap());
2227
2228 let binder = TestBinder::new();
2229 let args: Args = with_channel_binder(&binder, || {
2230 vox_postcard::from_slice(&bytes).expect("deserialize")
2231 });
2232
2233 assert_eq!(args.data, 42);
2234 assert_eq!(args.tx.channel_id, ChannelId(7));
2235 assert!(
2236 args.tx.is_bound(),
2237 "deserialized Tx should be bound via bind_tx()"
2238 );
2239 }
2240
2241 #[test]
2244 fn case4_deserialize_rx_binds_via_binder() {
2245 use facet::Facet;
2246
2247 #[derive(Facet)]
2248 struct Args {
2249 data: u32,
2250 rx: Rx<u32>,
2251 }
2252
2253 let mut bytes = vox_postcard::to_vec(&42_u32).unwrap();
2255 bytes.extend_from_slice(&vox_postcard::to_vec(&ChannelId(7)).unwrap());
2256
2257 let binder = TestBinder::new();
2258 let args: Args = with_channel_binder(&binder, || {
2259 vox_postcard::from_slice(&bytes).expect("deserialize")
2260 });
2261
2262 assert_eq!(args.data, 42);
2263 assert_eq!(args.rx.channel_id, ChannelId(7));
2264 assert!(
2265 args.rx.is_bound(),
2266 "deserialized Rx should be bound via register_rx()"
2267 );
2268 }
2269
2270 #[test]
2274 fn channel_id_round_trips_through_ser_deser() {
2275 use facet::Facet;
2276
2277 #[derive(Facet)]
2278 struct Args {
2279 tx: Tx<u32>,
2280 }
2281
2282 let (tx, _rx) = channel::<u32>();
2283 let args = Args { tx };
2284
2285 let caller_binder = TestBinder::new();
2286 let bytes = with_channel_binder(&caller_binder, || {
2287 vox_postcard::to_vec(&args).expect("serialize")
2288 });
2289
2290 let callee_binder = TestBinder::new();
2291 let deserialized: Args = with_channel_binder(&callee_binder, || {
2292 vox_postcard::from_slice(&bytes).expect("deserialize")
2293 });
2294
2295 assert_eq!(deserialized.tx.channel_id, ChannelId(100));
2297 assert!(deserialized.tx.is_bound());
2298 }
2299}