1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use facet::Facet;
6use facet_core::PtrConst;
7use facet_reflect::Peek;
8use vox_types::{
9 Conduit, ConduitRx, ConduitTx, ConduitTxPermit, Link, LinkRx, LinkTx, LinkTxPermit, MsgFamily,
10 Payload, SelfRef, WriteSlot,
11};
12
13use crate::MessagePlan;
14use zerocopy::little_endian::U32 as LeU32;
15
16mod replay_buffer;
17use replay_buffer::{PacketAck, PacketSeq, ReplayBuffer};
18
19#[derive(
25 Clone,
26 Copy,
27 zerocopy::FromBytes,
28 zerocopy::IntoBytes,
29 zerocopy::KnownLayout,
30 zerocopy::Immutable,
31)]
32#[repr(C)]
33struct ResumeKey([u8; 16]);
34
35const CLIENT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOCH");
36const SERVER_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOSH");
37
38const CH_HAS_RESUME_KEY: u8 = 0b0000_0001;
40const CH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
41
42const SH_REJECTED: u8 = 0b0000_0001;
44const SH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
45
46#[derive(
49 Clone,
50 Copy,
51 zerocopy::FromBytes,
52 zerocopy::IntoBytes,
53 zerocopy::KnownLayout,
54 zerocopy::Immutable,
55)]
56#[repr(C)]
57pub struct ClientHello {
58 magic: LeU32,
59 flags: u8,
60 resume_key: ResumeKey,
61 last_received: LeU32,
62}
63
64#[derive(
68 Clone,
69 Copy,
70 zerocopy::FromBytes,
71 zerocopy::IntoBytes,
72 zerocopy::KnownLayout,
73 zerocopy::Immutable,
74)]
75#[repr(C)]
76struct ServerHello {
77 magic: LeU32,
78 flags: u8,
79 resume_key: ResumeKey,
80 last_received: LeU32,
81}
82
83#[derive(Facet, Debug)]
91struct Frame<'a> {
92 seq: PacketSeq,
93 ack: Option<PacketAck>,
95 item: Payload<'a>,
96}
97
98pub struct Attachment<L> {
107 link: L,
108 client_hello: Option<ClientHello>,
109}
110
111impl<L> Attachment<L> {
112 pub fn initiator(link: L) -> Self {
114 Self {
115 link,
116 client_hello: None,
117 }
118 }
119
120 pub(crate) fn into_link(self) -> L {
121 self.link
122 }
123}
124
125pub struct SplitLink<Tx, Rx> {
130 tx: Tx,
131 rx: Rx,
132}
133
134impl<Tx, Rx> Link for SplitLink<Tx, Rx>
135where
136 Tx: LinkTx,
137 Rx: LinkRx,
138{
139 type Tx = Tx;
140 type Rx = Rx;
141
142 fn split(self) -> (Self::Tx, Self::Rx) {
143 (self.tx, self.rx)
144 }
145}
146
147pub async fn prepare_acceptor_attachment<L: Link>(
152 link: L,
153) -> Result<Attachment<SplitLink<L::Tx, L::Rx>>, StableConduitError> {
154 let (tx, mut rx) = link.split();
155 let client_hello = recv_handshake::<_, ClientHello>(&mut rx).await?;
156 Ok(Attachment {
157 link: SplitLink { tx, rx },
158 client_hello: Some(client_hello),
159 })
160}
161
162pub trait LinkSource: Send + 'static {
164 type Link: Link + Send;
165
166 fn next_link(
167 &mut self,
168 ) -> impl Future<Output = std::io::Result<Attachment<Self::Link>>> + Send + '_;
169}
170
171pub struct SingleAttachmentSource<L> {
173 attachment: Option<Attachment<L>>,
174}
175
176pub fn single_attachment_source<L: Link + Send + 'static>(
178 attachment: Attachment<L>,
179) -> SingleAttachmentSource<L> {
180 SingleAttachmentSource {
181 attachment: Some(attachment),
182 }
183}
184
185pub fn single_link_source<L: Link + Send + 'static>(link: L) -> SingleAttachmentSource<L> {
187 single_attachment_source(Attachment::initiator(link))
188}
189
190pub fn exhausted_source<L: Link + Send + 'static>() -> SingleAttachmentSource<L> {
194 SingleAttachmentSource { attachment: None }
195}
196
197impl<L: Link + Send + 'static> LinkSource for SingleAttachmentSource<L> {
198 type Link = L;
199
200 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
201 self.attachment.take().ok_or_else(|| {
202 std::io::Error::new(
203 std::io::ErrorKind::ConnectionRefused,
204 "single-use LinkSource exhausted",
205 )
206 })
207 }
208}
209
210pub struct StableConduit<F: MsgFamily, LS: LinkSource> {
217 shared: Arc<Shared<LS>>,
218 message_plan: Option<MessagePlan>,
219 _phantom: PhantomData<fn(F) -> F>,
220}
221
222struct Shared<LS: LinkSource> {
223 inner: Mutex<Inner<LS>>,
224 reconnecting: AtomicBool,
225 reconnected: moire::sync::Notify,
226 tx_ready: moire::sync::Notify,
227}
228
229struct Inner<LS: LinkSource> {
230 source: Option<LS>,
231 link_generation: u64,
234 tx: Option<<LS::Link as Link>::Tx>,
235 rx: Option<<LS::Link as Link>::Rx>,
236 tx_checked_out: bool,
237 resume_key: Option<ResumeKey>,
238 next_send_seq: PacketSeq,
240 last_received: Option<PacketSeq>,
241 replay: ReplayBuffer,
244}
245
246impl<F: MsgFamily, LS: LinkSource> StableConduit<F, LS> {
247 pub async fn new(mut source: LS) -> Result<Self, StableConduitError> {
248 let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
249 let (link_tx, link_rx) = attachment.link.split();
250 Self::with_first_link(link_tx, link_rx, attachment.client_hello, source).await
251 }
252
253 pub async fn with_first_link(
259 link_tx: <LS::Link as Link>::Tx,
260 mut link_rx: <LS::Link as Link>::Rx,
261 client_hello: Option<ClientHello>,
262 source: LS,
263 ) -> Result<Self, StableConduitError> {
264 let (resume_key, _peer_last_received) =
265 handshake::<LS::Link>(&link_tx, &mut link_rx, client_hello, None, None).await?;
266
267 let inner = Inner {
268 source: Some(source),
269 link_generation: 0,
270 tx: Some(link_tx),
271 rx: Some(link_rx),
272 tx_checked_out: false,
273 resume_key: Some(resume_key),
274 next_send_seq: PacketSeq(0),
275 last_received: None,
276 replay: ReplayBuffer::new(),
277 };
278
279 Ok(Self {
280 shared: Arc::new(Shared {
281 inner: Mutex::new(inner),
282 reconnecting: AtomicBool::new(false),
283 reconnected: moire::sync::Notify::new("stable_conduit.reconnected"),
284 tx_ready: moire::sync::Notify::new("stable_conduit.tx_ready"),
285 }),
286 message_plan: None,
287 _phantom: PhantomData,
288 })
289 }
290
291 pub fn with_message_plan(mut self, plan: MessagePlan) -> Self {
294 self.message_plan = Some(plan);
295 self
296 }
297}
298
299impl<LS: LinkSource> Shared<LS> {
304 fn lock_inner(&self) -> Result<MutexGuard<'_, Inner<LS>>, StableConduitError> {
305 self.inner
306 .lock()
307 .map_err(|_| StableConduitError::Setup("stable conduit mutex poisoned".into()))
308 }
309
310 async fn ensure_reconnected(&self, generation: u64) -> Result<(), StableConduitError> {
311 loop {
312 {
313 let inner = self.lock_inner()?;
314 if inner.link_generation != generation {
315 return Ok(());
316 }
317 }
318
319 if self
320 .reconnecting
321 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
322 .is_ok()
323 {
324 let result = self.reconnect_once(generation).await;
325 self.reconnecting.store(false, Ordering::Release);
326 self.reconnected.notify_waiters();
327 return result;
328 }
329
330 self.reconnected.notified().await;
331 }
332 }
333
334 async fn reconnect_once(&self, generation: u64) -> Result<(), StableConduitError> {
341 let (mut source, resume_key, last_received, replay_frames) = {
342 let mut inner = self.lock_inner()?;
343 if inner.link_generation != generation {
344 return Ok(());
345 }
346 let source = inner
347 .source
348 .take()
349 .ok_or_else(|| StableConduitError::Setup("link source unavailable".into()))?;
350 let replay_frames = inner
351 .replay
352 .iter()
353 .map(|(seq, bytes)| (*seq, bytes.clone()))
354 .collect::<Vec<_>>();
355 (source, inner.resume_key, inner.last_received, replay_frames)
356 };
357
358 let reconnect_result = async {
359 let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
360 let (new_tx, mut new_rx) = attachment.link.split();
361
362 let (new_resume_key, peer_last_received) = handshake::<LS::Link>(
363 &new_tx,
364 &mut new_rx,
365 attachment.client_hello,
366 resume_key,
367 last_received,
368 )
369 .await?;
370
371 for (seq, frame_bytes) in replay_frames {
375 if peer_last_received.is_some_and(|last| seq <= last) {
376 continue;
377 }
378 let permit = new_tx.reserve().await.map_err(StableConduitError::Io)?;
379 let mut slot = permit
380 .alloc(frame_bytes.len())
381 .map_err(StableConduitError::Io)?;
382 slot.as_mut_slice().copy_from_slice(&frame_bytes);
383 slot.commit();
384 }
385
386 Ok::<_, StableConduitError>((new_tx, new_rx, new_resume_key))
387 }
388 .await;
389
390 let mut inner = self.lock_inner()?;
391 inner.source = Some(source);
392
393 if inner.link_generation != generation {
394 return Ok(());
395 }
396
397 let (new_tx, new_rx, new_resume_key) = reconnect_result?;
398
399 inner.link_generation = inner.link_generation.wrapping_add(1);
400 inner.tx = Some(new_tx);
401 inner.rx = Some(new_rx);
402 inner.tx_checked_out = false;
403 inner.resume_key = Some(new_resume_key);
404 self.tx_ready.notify_waiters();
405
406 Ok(())
407 }
408}
409
410async fn handshake<L: Link>(
418 tx: &L::Tx,
419 rx: &mut L::Rx,
420 client_hello: Option<ClientHello>,
421 resume_key: Option<ResumeKey>,
422 last_received: Option<PacketSeq>,
423) -> Result<(ResumeKey, Option<PacketSeq>), StableConduitError> {
424 match client_hello {
425 None => {
426 let mut flags = 0u8;
428 if resume_key.is_some() {
429 flags |= CH_HAS_RESUME_KEY;
430 }
431 if last_received.is_some() {
432 flags |= CH_HAS_LAST_RECEIVED;
433 }
434 let hello = ClientHello {
435 magic: LeU32::new(CLIENT_HELLO_MAGIC),
436 flags,
437 resume_key: resume_key.unwrap_or(ResumeKey([0u8; 16])),
438 last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
439 };
440 send_handshake(tx, &hello).await?;
441
442 let sh = recv_handshake::<_, ServerHello>(rx).await?;
443 if sh.magic.get() != SERVER_HELLO_MAGIC {
444 return Err(StableConduitError::Setup(
445 "ServerHello magic mismatch".into(),
446 ));
447 }
448 if sh.flags & SH_REJECTED != 0 {
450 return Err(StableConduitError::SessionLost);
451 }
452 let peer_last_received =
453 (sh.flags & SH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(sh.last_received.get()));
454 Ok((sh.resume_key, peer_last_received))
455 }
456 Some(ch) => {
457 let key = fresh_key()?;
459 let mut flags = 0u8;
460 if last_received.is_some() {
461 flags |= SH_HAS_LAST_RECEIVED;
462 }
463 let hello = ServerHello {
464 magic: LeU32::new(SERVER_HELLO_MAGIC),
465 flags,
466 resume_key: key,
467 last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
468 };
469 send_handshake(tx, &hello).await?;
470
471 let peer_last_received =
472 (ch.flags & CH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(ch.last_received.get()));
473 Ok((key, peer_last_received))
474 }
475 }
476}
477
478async fn send_handshake<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
479 tx: <x,
480 msg: &M,
481) -> Result<(), StableConduitError> {
482 let bytes = msg.as_bytes();
483 let permit = tx.reserve().await.map_err(StableConduitError::Io)?;
484 let mut slot = permit.alloc(bytes.len()).map_err(StableConduitError::Io)?;
485 slot.as_mut_slice().copy_from_slice(bytes);
486 slot.commit();
487 Ok(())
488}
489
490async fn recv_handshake<
491 LRx: LinkRx,
492 M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
493>(
494 rx: &mut LRx,
495) -> Result<M, StableConduitError> {
496 let backing = rx
497 .recv()
498 .await
499 .map_err(|_| StableConduitError::LinkDead)?
500 .ok_or(StableConduitError::LinkDead)?;
501 M::read_from_bytes(backing.as_bytes())
502 .map_err(|_| StableConduitError::Setup("handshake message size mismatch".into()))
503}
504
505pub async fn recv_client_hello<Rx: LinkRx>(rx: &mut Rx) -> Result<ClientHello, StableConduitError> {
511 recv_handshake::<_, ClientHello>(rx).await
512}
513
514fn fresh_key() -> Result<ResumeKey, StableConduitError> {
515 let mut key = ResumeKey([0u8; 16]);
516 getrandom::fill(&mut key.0)
517 .map_err(|e| StableConduitError::Setup(format!("failed to generate resume key: {e}")))?;
518 Ok(key)
519}
520
521impl<F: MsgFamily, LS: LinkSource> Conduit for StableConduit<F, LS>
526where
527 <LS::Link as Link>::Tx: Send + 'static,
528 <LS::Link as Link>::Rx: Send + 'static,
529 LS: Send + 'static,
530{
531 type Msg = F;
532 type Tx = StableConduitTx<F, LS>;
533 type Rx = StableConduitRx<F, LS>;
534
535 fn split(self) -> (Self::Tx, Self::Rx) {
536 (
537 StableConduitTx {
538 shared: Arc::clone(&self.shared),
539 _phantom: PhantomData,
540 },
541 StableConduitRx {
542 shared: Arc::clone(&self.shared),
543 message_plan: self.message_plan,
544 _phantom: PhantomData,
545 },
546 )
547 }
548}
549
550pub struct StableConduitTx<F: MsgFamily, LS: LinkSource> {
555 shared: Arc<Shared<LS>>,
556 _phantom: PhantomData<fn(F)>,
557}
558
559impl<F: MsgFamily, LS: LinkSource> ConduitTx for StableConduitTx<F, LS>
560where
561 <LS::Link as Link>::Tx: Send + 'static,
562 <LS::Link as Link>::Rx: Send + 'static,
563 LS: Send + 'static,
564{
565 type Msg = F;
566 type Permit<'a>
567 = StableConduitPermit<F, LS>
568 where
569 Self: 'a;
570
571 async fn reserve(&self) -> std::io::Result<Self::Permit<'_>> {
572 enum TxReservation<Tx> {
573 CheckedOut { tx: Tx, generation: u64 },
574 Wait,
575 Reconnect { generation: u64 },
576 }
577
578 loop {
579 let reservation = {
580 let mut inner = self
581 .shared
582 .lock_inner()
583 .map_err(|e| std::io::Error::other(e.to_string()))?;
584 match inner.tx.take() {
585 Some(tx) => {
586 inner.tx_checked_out = true;
587 TxReservation::CheckedOut {
588 tx,
589 generation: inner.link_generation,
590 }
591 }
592 None if inner.tx_checked_out => TxReservation::Wait,
593 None => TxReservation::Reconnect {
594 generation: inner.link_generation,
595 },
596 }
597 };
598
599 let (tx, generation) = match reservation {
600 TxReservation::CheckedOut { tx, generation } => (tx, generation),
601 TxReservation::Wait => {
602 self.shared.tx_ready.notified().await;
603 continue;
604 }
605 TxReservation::Reconnect { generation } => {
606 self.shared
607 .ensure_reconnected(generation)
608 .await
609 .map_err(|e| std::io::Error::other(e.to_string()))?;
610 continue;
611 }
612 };
613
614 match tx.reserve().await {
615 Ok(link_permit) => {
616 let restore_ok = {
617 let mut inner = self
618 .shared
619 .lock_inner()
620 .map_err(|e| std::io::Error::other(e.to_string()))?;
621 let restore_ok = inner.link_generation == generation && inner.tx.is_none();
622 if restore_ok {
623 inner.tx = Some(tx);
624 }
625 inner.tx_checked_out = false;
626 self.shared.tx_ready.notify_waiters();
627 restore_ok
628 };
629
630 if !restore_ok {
631 drop(link_permit);
632 continue;
633 }
634
635 return Ok(StableConduitPermit {
636 shared: Arc::clone(&self.shared),
637 link_permit,
638 generation,
639 _phantom: PhantomData,
640 });
641 }
642 Err(_) => {
643 {
644 let mut inner = self
645 .shared
646 .lock_inner()
647 .map_err(|e| std::io::Error::other(e.to_string()))?;
648 if inner.link_generation == generation {
649 inner.tx_checked_out = false;
650 }
651 self.shared.tx_ready.notify_waiters();
652 }
653 self.shared
654 .ensure_reconnected(generation)
655 .await
656 .map_err(|e| std::io::Error::other(e.to_string()))?;
657 }
658 }
659 }
660 }
661
662 async fn close(self) -> std::io::Result<()> {
663 let tx = {
664 let mut inner = self
665 .shared
666 .lock_inner()
667 .map_err(|e| std::io::Error::other(e.to_string()))?;
668 inner.tx.take()
669 };
670 if let Some(tx) = tx {
671 tx.close().await?;
672 }
673 Ok(())
674 }
675}
676
677pub struct StableConduitPermit<F: MsgFamily, LS: LinkSource> {
682 shared: Arc<Shared<LS>>,
683 link_permit: <<LS::Link as Link>::Tx as LinkTx>::Permit,
684 generation: u64,
685 _phantom: PhantomData<fn(F)>,
686}
687
688impl<F: MsgFamily, LS: LinkSource> ConduitTxPermit for StableConduitPermit<F, LS> {
689 type Msg = F;
690 type Error = StableConduitError;
691
692 fn send(self, item: F::Msg<'_>) -> Result<(), StableConduitError> {
701 let StableConduitPermit {
702 shared,
703 link_permit,
704 generation,
705 _phantom: _,
706 } = self;
707
708 let (seq, ack) = {
709 let mut inner = shared.lock_inner()?;
710 if inner.link_generation != generation {
711 return Err(StableConduitError::LinkDead);
712 }
713 let seq = inner.next_send_seq;
714 inner.next_send_seq = PacketSeq(seq.0.wrapping_add(1));
715 let ack = inner
716 .last_received
717 .map(|max_delivered| PacketAck { max_delivered });
718 (seq, ack)
719 };
720
721 let msg_shape = F::shape();
725 #[allow(unsafe_code)]
727 let payload = unsafe {
728 Payload::outgoing_unchecked(PtrConst::new((&raw const item).cast::<u8>()), msg_shape)
729 };
730
731 let frame = Frame {
732 seq,
733 ack,
734 item: payload,
735 };
736
737 #[allow(unsafe_code)]
739 let peek = unsafe {
740 Peek::unchecked_new(
741 PtrConst::new((&raw const frame).cast::<u8>()),
742 <Frame<'static> as Facet<'static>>::SHAPE,
743 )
744 };
745 let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(StableConduitError::Encode)?;
746
747 let mut slot = link_permit
748 .alloc(plan.total_size())
749 .map_err(StableConduitError::Io)?;
750 let slot_bytes = slot.as_mut_slice();
751 plan.write_into(slot_bytes);
752
753 shared.lock_inner()?.replay.push(seq, slot_bytes.to_vec());
755 slot.commit();
756
757 Ok(())
758 }
759}
760
761pub struct StableConduitRx<F: MsgFamily, LS: LinkSource> {
766 shared: Arc<Shared<LS>>,
767 message_plan: Option<MessagePlan>,
768 _phantom: PhantomData<fn() -> F>,
769}
770
771impl<F: MsgFamily, LS: LinkSource> ConduitRx for StableConduitRx<F, LS>
772where
773 <LS::Link as Link>::Tx: Send + 'static,
774 <LS::Link as Link>::Rx: Send + 'static,
775 LS: Send + 'static,
776{
777 type Msg = F;
778 type Error = StableConduitError;
779
780 #[moire::instrument]
781 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
782 loop {
783 let (rx_opt, generation) = {
785 let mut inner = self.shared.lock_inner()?;
786 (inner.rx.take(), inner.link_generation)
787 }; let mut rx = match rx_opt {
789 Some(rx) => rx,
790 None => {
791 self.shared.ensure_reconnected(generation).await?;
792 continue;
793 }
794 };
795
796 let recv_result = rx.recv().await;
800
801 {
804 let mut inner = self.shared.lock_inner()?;
805 if inner.link_generation == generation && inner.rx.is_none() {
806 inner.rx = Some(rx);
807 }
808 }
809
810 let backing = match recv_result {
811 Ok(Some(b)) => b,
812 Ok(None) | Err(_) => {
813 self.shared.ensure_reconnected(generation).await?;
815 continue;
816 }
817 };
818
819 let frame: SelfRef<Frame<'static>> =
821 crate::deserialize_postcard(backing).map_err(StableConduitError::Decode)?;
822
823 let is_dup = {
827 let mut inner = self.shared.lock_inner()?;
828
829 if let Some(ack) = frame.ack {
830 inner.replay.trim(ack);
831 }
832
833 let dup = inner.last_received.is_some_and(|prev| frame.seq <= prev);
834 if !dup {
835 inner.last_received = Some(frame.seq);
836 }
837 dup
838 };
839
840 if is_dup {
841 continue;
842 }
843
844 let item_bytes = match &frame.item {
847 Payload::PostcardBytes(bytes) => bytes,
848 _ => unreachable!("deserialized Payload should always be Incoming"),
849 };
850 let item_backing = vox_types::Backing::Boxed(item_bytes.to_vec().into());
851 let msg = match &self.message_plan {
852 Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
853 item_backing,
854 &plan.plan,
855 &plan.registry,
856 ),
857 None => crate::deserialize_postcard::<F::Msg<'static>>(item_backing),
858 }
859 .map_err(StableConduitError::Decode)?;
860
861 return Ok(Some(msg));
862 }
863 }
864}
865
866#[derive(Debug)]
871pub enum StableConduitError {
872 Encode(vox_postcard::SerializeError),
873 Decode(vox_postcard::DeserializeError),
874 Io(std::io::Error),
875 LinkDead,
876 Setup(String),
877 SessionLost,
880}
881
882impl std::fmt::Display for StableConduitError {
883 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
884 match self {
885 Self::Encode(e) => write!(f, "encode error: {e}"),
886 Self::Decode(e) => write!(f, "decode error: {e}"),
887 Self::Io(e) => write!(f, "io error: {e}"),
888 Self::LinkDead => write!(f, "link dead"),
889 Self::Setup(s) => write!(f, "setup error: {s}"),
890 Self::SessionLost => write!(f, "session lost: server rejected resume key"),
891 }
892 }
893}
894
895impl std::error::Error for StableConduitError {}
896
897#[cfg(test)]
902mod tests;