1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use facet::Facet;
6use facet_core::PtrConst;
7use facet_reflect::Peek;
8use vox_types::{
9 Conduit, ConduitRx, ConduitTx, Link, LinkRx, LinkTx, MaybeSend, MsgFamily, Payload, SelfRef,
10};
11
12use crate::MessagePlan;
13use zerocopy::little_endian::U32 as LeU32;
14
15mod replay_buffer;
16use replay_buffer::{PacketAck, PacketSeq, ReplayBuffer};
17
18#[derive(
24 Clone,
25 Copy,
26 zerocopy::FromBytes,
27 zerocopy::IntoBytes,
28 zerocopy::KnownLayout,
29 zerocopy::Immutable,
30)]
31#[repr(C)]
32struct ResumeKey([u8; 16]);
33
34const CLIENT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOCH");
35const SERVER_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOSH");
36
37const CH_HAS_RESUME_KEY: u8 = 0b0000_0001;
39const CH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
40
41const SH_REJECTED: u8 = 0b0000_0001;
43const SH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
44
45#[derive(
48 Clone,
49 Copy,
50 zerocopy::FromBytes,
51 zerocopy::IntoBytes,
52 zerocopy::KnownLayout,
53 zerocopy::Immutable,
54)]
55#[repr(C)]
56pub struct ClientHello {
57 magic: LeU32,
58 flags: u8,
59 resume_key: ResumeKey,
60 last_received: LeU32,
61}
62
63#[derive(
67 Clone,
68 Copy,
69 zerocopy::FromBytes,
70 zerocopy::IntoBytes,
71 zerocopy::KnownLayout,
72 zerocopy::Immutable,
73)]
74#[repr(C)]
75struct ServerHello {
76 magic: LeU32,
77 flags: u8,
78 resume_key: ResumeKey,
79 last_received: LeU32,
80}
81
82#[derive(Facet, Debug)]
90struct Frame<'a> {
91 seq: PacketSeq,
92 ack: Option<PacketAck>,
94 item: Payload<'a>,
95}
96
97vox_types::impl_reborrow!(Frame);
98
99pub struct Attachment<L> {
108 link: L,
109 client_hello: Option<ClientHello>,
110}
111
112impl<L> Attachment<L> {
113 pub fn initiator(link: L) -> Self {
115 Self {
116 link,
117 client_hello: None,
118 }
119 }
120
121 pub(crate) fn into_link(self) -> L {
122 self.link
123 }
124}
125
126pub struct SplitLink<Tx, Rx> {
131 tx: Tx,
132 rx: Rx,
133}
134
135impl<Tx, Rx> Link for SplitLink<Tx, Rx>
136where
137 Tx: LinkTx,
138 Rx: LinkRx,
139{
140 type Tx = Tx;
141 type Rx = Rx;
142
143 fn split(self) -> (Self::Tx, Self::Rx) {
144 (self.tx, self.rx)
145 }
146}
147
148pub async fn prepare_acceptor_attachment<L: Link>(
153 link: L,
154) -> Result<Attachment<SplitLink<L::Tx, L::Rx>>, StableConduitError> {
155 let (tx, mut rx) = link.split();
156 let client_hello = recv_handshake::<_, ClientHello>(&mut rx).await?;
157 Ok(Attachment {
158 link: SplitLink { tx, rx },
159 client_hello: Some(client_hello),
160 })
161}
162
163pub trait LinkSource: MaybeSend + 'static {
165 type Link: Link + MaybeSend;
166
167 fn next_link(
168 &mut self,
169 ) -> impl Future<Output = std::io::Result<Attachment<Self::Link>>> + MaybeSend + '_;
170}
171
172pub struct SingleAttachmentSource<L> {
174 attachment: Option<Attachment<L>>,
175}
176
177pub fn single_attachment_source<L: Link + MaybeSend + 'static>(
179 attachment: Attachment<L>,
180) -> SingleAttachmentSource<L> {
181 SingleAttachmentSource {
182 attachment: Some(attachment),
183 }
184}
185
186pub fn single_link_source<L: Link + MaybeSend + 'static>(link: L) -> SingleAttachmentSource<L> {
188 single_attachment_source(Attachment::initiator(link))
189}
190
191pub fn exhausted_source<L: Link + MaybeSend + 'static>() -> SingleAttachmentSource<L> {
195 SingleAttachmentSource { attachment: None }
196}
197
198impl<L: Link + MaybeSend + 'static> LinkSource for SingleAttachmentSource<L> {
199 type Link = L;
200
201 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
202 self.attachment.take().ok_or_else(|| {
203 std::io::Error::new(
204 std::io::ErrorKind::ConnectionRefused,
205 "single-use LinkSource exhausted",
206 )
207 })
208 }
209}
210
211pub struct StableConduit<F: MsgFamily, LS: LinkSource> {
218 shared: Arc<Shared<LS>>,
219 message_plan: Option<MessagePlan>,
220 _phantom: PhantomData<fn(F) -> F>,
221}
222
223struct Shared<LS: LinkSource> {
224 inner: Mutex<Inner<LS>>,
225 reconnecting: AtomicBool,
226 reconnected: moire::sync::Notify,
227 tx_ready: moire::sync::Notify,
228}
229
230struct Inner<LS: LinkSource> {
231 source: Option<LS>,
232 link_generation: u64,
235 tx: Option<<LS::Link as Link>::Tx>,
236 rx: Option<<LS::Link as Link>::Rx>,
237 tx_checked_out: bool,
238 resume_key: Option<ResumeKey>,
239 next_send_seq: PacketSeq,
241 last_received: Option<PacketSeq>,
242 replay: ReplayBuffer,
245}
246
247impl<F: MsgFamily, LS: LinkSource> StableConduit<F, LS> {
248 pub async fn new(mut source: LS) -> Result<Self, StableConduitError> {
249 let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
250 let (link_tx, link_rx) = attachment.link.split();
251 Self::with_first_link(link_tx, link_rx, attachment.client_hello, source).await
252 }
253
254 pub async fn with_first_link(
260 link_tx: <LS::Link as Link>::Tx,
261 mut link_rx: <LS::Link as Link>::Rx,
262 client_hello: Option<ClientHello>,
263 source: LS,
264 ) -> Result<Self, StableConduitError> {
265 let (resume_key, _peer_last_received) =
266 handshake::<LS::Link>(&link_tx, &mut link_rx, client_hello, None, None).await?;
267
268 let inner = Inner {
269 source: Some(source),
270 link_generation: 0,
271 tx: Some(link_tx),
272 rx: Some(link_rx),
273 tx_checked_out: false,
274 resume_key: Some(resume_key),
275 next_send_seq: PacketSeq(0),
276 last_received: None,
277 replay: ReplayBuffer::new(),
278 };
279
280 Ok(Self {
281 shared: Arc::new(Shared {
282 inner: Mutex::new(inner),
283 reconnecting: AtomicBool::new(false),
284 reconnected: moire::sync::Notify::new("stable_conduit.reconnected"),
285 tx_ready: moire::sync::Notify::new("stable_conduit.tx_ready"),
286 }),
287 message_plan: None,
288 _phantom: PhantomData,
289 })
290 }
291
292 pub fn with_message_plan(mut self, plan: MessagePlan) -> Self {
295 self.message_plan = Some(plan);
296 self
297 }
298}
299
300impl<LS: LinkSource> Shared<LS> {
305 fn lock_inner(&self) -> Result<MutexGuard<'_, Inner<LS>>, StableConduitError> {
306 self.inner
307 .lock()
308 .map_err(|_| StableConduitError::Setup("stable conduit mutex poisoned".into()))
309 }
310
311 async fn ensure_reconnected(&self, generation: u64) -> Result<(), StableConduitError> {
312 loop {
313 {
314 let inner = self.lock_inner()?;
315 if inner.link_generation != generation {
316 return Ok(());
317 }
318 }
319
320 if self
321 .reconnecting
322 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
323 .is_ok()
324 {
325 let result = self.reconnect_once(generation).await;
326 self.reconnecting.store(false, Ordering::Release);
327 self.reconnected.notify_waiters();
328 return result;
329 }
330
331 self.reconnected.notified().await;
332 }
333 }
334
335 async fn reconnect_once(&self, generation: u64) -> Result<(), StableConduitError> {
342 let (mut source, resume_key, last_received, replay_frames) = {
343 let mut inner = self.lock_inner()?;
344 if inner.link_generation != generation {
345 return Ok(());
346 }
347 let source = inner
348 .source
349 .take()
350 .ok_or_else(|| StableConduitError::Setup("link source unavailable".into()))?;
351 let replay_frames = inner
352 .replay
353 .iter()
354 .map(|(seq, bytes)| (*seq, bytes.clone()))
355 .collect::<Vec<_>>();
356 (source, inner.resume_key, inner.last_received, replay_frames)
357 };
358
359 let reconnect_result = async {
360 let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
361 let (new_tx, mut new_rx) = attachment.link.split();
362
363 let (new_resume_key, peer_last_received) = handshake::<LS::Link>(
364 &new_tx,
365 &mut new_rx,
366 attachment.client_hello,
367 resume_key,
368 last_received,
369 )
370 .await?;
371
372 for (seq, frame_bytes) in replay_frames {
376 if peer_last_received.is_some_and(|last| seq <= last) {
377 continue;
378 }
379 new_tx
380 .send(frame_bytes.clone())
381 .await
382 .map_err(StableConduitError::Io)?;
383 }
384
385 Ok::<_, StableConduitError>((new_tx, new_rx, new_resume_key))
386 }
387 .await;
388
389 let mut inner = self.lock_inner()?;
390 inner.source = Some(source);
391
392 if inner.link_generation != generation {
393 return Ok(());
394 }
395
396 let (new_tx, new_rx, new_resume_key) = reconnect_result?;
397
398 inner.link_generation = inner.link_generation.wrapping_add(1);
399 inner.tx = Some(new_tx);
400 inner.rx = Some(new_rx);
401 inner.tx_checked_out = false;
402 inner.resume_key = Some(new_resume_key);
403 self.tx_ready.notify_waiters();
404
405 Ok(())
406 }
407}
408
409async fn handshake<L: Link>(
417 tx: &L::Tx,
418 rx: &mut L::Rx,
419 client_hello: Option<ClientHello>,
420 resume_key: Option<ResumeKey>,
421 last_received: Option<PacketSeq>,
422) -> Result<(ResumeKey, Option<PacketSeq>), StableConduitError> {
423 match client_hello {
424 None => {
425 let mut flags = 0u8;
427 if resume_key.is_some() {
428 flags |= CH_HAS_RESUME_KEY;
429 }
430 if last_received.is_some() {
431 flags |= CH_HAS_LAST_RECEIVED;
432 }
433 let hello = ClientHello {
434 magic: LeU32::new(CLIENT_HELLO_MAGIC),
435 flags,
436 resume_key: resume_key.unwrap_or(ResumeKey([0u8; 16])),
437 last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
438 };
439 send_handshake(tx, &hello).await?;
440
441 let sh = recv_handshake::<_, ServerHello>(rx).await?;
442 if sh.magic.get() != SERVER_HELLO_MAGIC {
443 return Err(StableConduitError::Setup(
444 "ServerHello magic mismatch".into(),
445 ));
446 }
447 if sh.flags & SH_REJECTED != 0 {
449 return Err(StableConduitError::SessionLost);
450 }
451 let peer_last_received =
452 (sh.flags & SH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(sh.last_received.get()));
453 Ok((sh.resume_key, peer_last_received))
454 }
455 Some(ch) => {
456 let key = fresh_key()?;
458 let mut flags = 0u8;
459 if last_received.is_some() {
460 flags |= SH_HAS_LAST_RECEIVED;
461 }
462 let hello = ServerHello {
463 magic: LeU32::new(SERVER_HELLO_MAGIC),
464 flags,
465 resume_key: key,
466 last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
467 };
468 send_handshake(tx, &hello).await?;
469
470 let peer_last_received =
471 (ch.flags & CH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(ch.last_received.get()));
472 Ok((key, peer_last_received))
473 }
474 }
475}
476
477async fn send_handshake<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
478 tx: <x,
479 msg: &M,
480) -> Result<(), StableConduitError> {
481 tx.send(msg.as_bytes().to_vec())
482 .await
483 .map_err(StableConduitError::Io)
484}
485
486async fn recv_handshake<
487 LRx: LinkRx,
488 M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
489>(
490 rx: &mut LRx,
491) -> Result<M, StableConduitError> {
492 let backing = rx
493 .recv()
494 .await
495 .map_err(|_| StableConduitError::LinkDead)?
496 .ok_or(StableConduitError::LinkDead)?;
497 M::read_from_bytes(backing.as_bytes())
498 .map_err(|_| StableConduitError::Setup("handshake message size mismatch".into()))
499}
500
501pub async fn recv_client_hello<Rx: LinkRx>(rx: &mut Rx) -> Result<ClientHello, StableConduitError> {
507 recv_handshake::<_, ClientHello>(rx).await
508}
509
510fn fresh_key() -> Result<ResumeKey, StableConduitError> {
511 let mut key = ResumeKey([0u8; 16]);
512 getrandom::fill(&mut key.0)
513 .map_err(|e| StableConduitError::Setup(format!("failed to generate resume key: {e}")))?;
514 Ok(key)
515}
516
517impl<F: MsgFamily, LS: LinkSource> Conduit for StableConduit<F, LS>
522where
523 <LS::Link as Link>::Tx: MaybeSend + 'static,
524 <LS::Link as Link>::Rx: MaybeSend + 'static,
525 LS: MaybeSend + 'static,
526{
527 type Msg = F;
528 type Tx = StableConduitTx<F, LS>;
529 type Rx = StableConduitRx<F, LS>;
530
531 fn split(self) -> (Self::Tx, Self::Rx) {
532 (
533 StableConduitTx {
534 shared: Arc::clone(&self.shared),
535 _phantom: PhantomData,
536 },
537 StableConduitRx {
538 shared: Arc::clone(&self.shared),
539 message_plan: self.message_plan,
540 _phantom: PhantomData,
541 },
542 )
543 }
544}
545
546pub struct StableConduitTx<F: MsgFamily, LS: LinkSource> {
551 shared: Arc<Shared<LS>>,
552 _phantom: PhantomData<fn(F)>,
553}
554
555impl<F: MsgFamily, LS: LinkSource> ConduitTx for StableConduitTx<F, LS>
556where
557 <LS::Link as Link>::Tx: MaybeSend + 'static,
558 <LS::Link as Link>::Rx: MaybeSend + 'static,
559 LS: MaybeSend + 'static,
560{
561 type Msg = F;
562 type Prepared = StablePreparedMessage;
563 type Error = StableConduitError;
564
565 fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
566 prepare_frame::<F, LS>(&self.shared, item)
567 }
568
569 async fn send_prepared(&self, mut prepared: Self::Prepared) -> Result<(), Self::Error> {
570 enum TxReservation<Tx> {
571 CheckedOut { tx: Tx },
572 Wait,
573 Reconnect { generation: u64 },
574 }
575
576 loop {
577 let reservation = {
578 let mut inner = self.shared.lock_inner()?;
579 match inner.tx.take() {
580 Some(tx) => {
581 inner.tx_checked_out = true;
582 TxReservation::CheckedOut { tx }
583 }
584 None if inner.tx_checked_out => TxReservation::Wait,
585 None => TxReservation::Reconnect {
586 generation: inner.link_generation,
587 },
588 }
589 };
590
591 let tx = match reservation {
592 TxReservation::CheckedOut { tx } => tx,
593 TxReservation::Wait => {
594 self.shared.tx_ready.notified().await;
595 continue;
596 }
597 TxReservation::Reconnect { generation } => {
598 self.shared.ensure_reconnected(generation).await?;
599 continue;
600 }
601 };
602
603 if prepared.framed.is_none() {
604 let framed = {
605 let mut inner = self.shared.lock_inner()?;
606 let seq = inner.next_send_seq;
607 inner.next_send_seq = PacketSeq(seq.0.wrapping_add(1));
608 let ack = inner
609 .last_received
610 .map(|max_delivered| PacketAck { max_delivered });
611 let frame_bytes = encode_frame_bytes(seq, ack, &prepared.item_bytes)?;
612 (seq, frame_bytes)
613 };
614 prepared.framed = Some(framed);
615 }
616
617 let (seq, frame_bytes) = prepared
618 .framed
619 .as_ref()
620 .expect("stable prepared messages should be framed before send");
621
622 let send_result = tx.send(frame_bytes.clone()).await;
623 let generation = {
624 let mut inner = self.shared.lock_inner()?;
625 let generation = inner.link_generation;
626 if inner.tx.is_none() {
627 inner.tx = Some(tx);
628 }
629 inner.tx_checked_out = false;
630 self.shared.tx_ready.notify_waiters();
631 generation
632 };
633
634 match send_result {
635 Ok(()) => {
636 self.shared
637 .lock_inner()?
638 .replay
639 .push(*seq, frame_bytes.clone());
640 return Ok(());
641 }
642 Err(_) => {
643 self.shared.ensure_reconnected(generation).await?;
644 }
645 }
646 }
647 }
648
649 async fn close(self) -> std::io::Result<()> {
650 let tx = {
651 let mut inner = self
652 .shared
653 .lock_inner()
654 .map_err(|e| std::io::Error::other(e.to_string()))?;
655 inner.tx.take()
656 };
657 if let Some(tx) = tx {
658 tx.close().await?;
659 }
660 Ok(())
661 }
662}
663
664fn encode_frame_bytes(
665 seq: PacketSeq,
666 ack: Option<PacketAck>,
667 item_bytes: &[u8],
668) -> Result<Vec<u8>, StableConduitError> {
669 let frame = Frame {
670 seq,
671 ack,
672 item: Payload::PostcardBytes(item_bytes),
673 };
674 #[allow(unsafe_code)]
675 let peek = unsafe {
676 Peek::unchecked_new(
677 PtrConst::new((&raw const frame).cast::<u8>()),
678 <Frame<'static> as Facet<'static>>::SHAPE,
679 )
680 };
681 let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(StableConduitError::Encode)?;
682 let mut frame_bytes = vec![0u8; plan.total_size()];
683 plan.write_into(&mut frame_bytes);
684 Ok(frame_bytes)
685}
686
687pub struct StablePreparedMessage {
688 item_bytes: Vec<u8>,
689 framed: Option<(PacketSeq, Vec<u8>)>,
690}
691
692fn prepare_frame<F: MsgFamily, LS: LinkSource>(
701 shared: &Arc<Shared<LS>>,
702 item: F::Msg<'_>,
703) -> Result<StablePreparedMessage, StableConduitError> {
704 let _ = shared;
705 let item_bytes = vox_postcard::to_vec(&item).map_err(StableConduitError::Encode)?;
706 Ok(StablePreparedMessage {
707 item_bytes,
708 framed: None,
709 })
710}
711pub struct StableConduitRx<F: MsgFamily, LS: LinkSource> {
716 shared: Arc<Shared<LS>>,
717 message_plan: Option<MessagePlan>,
718 _phantom: PhantomData<fn() -> F>,
719}
720
721impl<F: MsgFamily, LS: LinkSource> ConduitRx for StableConduitRx<F, LS>
722where
723 <LS::Link as Link>::Tx: MaybeSend + 'static,
724 <LS::Link as Link>::Rx: MaybeSend + 'static,
725 LS: MaybeSend + 'static,
726{
727 type Msg = F;
728 type Error = StableConduitError;
729
730 #[moire::instrument]
731 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
732 loop {
733 let (rx_opt, generation) = {
735 let mut inner = self.shared.lock_inner()?;
736 (inner.rx.take(), inner.link_generation)
737 }; let mut rx = match rx_opt {
739 Some(rx) => rx,
740 None => {
741 self.shared.ensure_reconnected(generation).await?;
742 continue;
743 }
744 };
745
746 let recv_result = rx.recv().await;
750
751 {
754 let mut inner = self.shared.lock_inner()?;
755 if inner.link_generation == generation && inner.rx.is_none() {
756 inner.rx = Some(rx);
757 }
758 }
759
760 let backing = match recv_result {
761 Ok(Some(b)) => b,
762 Ok(None) | Err(_) => {
763 self.shared.ensure_reconnected(generation).await?;
765 continue;
766 }
767 };
768
769 let frame: SelfRef<Frame<'static>> =
771 crate::deserialize_postcard(backing).map_err(StableConduitError::Decode)?;
772
773 let is_dup = {
777 let frame = frame.get();
778 let mut inner = self.shared.lock_inner()?;
779
780 if let Some(ack) = frame.ack {
781 inner.replay.trim(ack);
782 }
783
784 let dup = inner.last_received.is_some_and(|prev| frame.seq <= prev);
785 if !dup {
786 inner.last_received = Some(frame.seq);
787 }
788 dup
789 };
790
791 if is_dup {
792 continue;
793 }
794
795 let frame = frame.get();
798 let item_bytes = match &frame.item {
799 Payload::PostcardBytes(bytes) => bytes,
800 _ => unreachable!("deserialized Payload should always be Incoming"),
801 };
802 let item_backing = vox_types::Backing::Boxed(item_bytes.to_vec().into());
803 let msg = match &self.message_plan {
804 Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
805 item_backing,
806 &plan.plan,
807 &plan.registry,
808 ),
809 None => crate::deserialize_postcard::<F::Msg<'static>>(item_backing),
810 }
811 .map_err(StableConduitError::Decode)?;
812
813 return Ok(Some(msg));
814 }
815 }
816}
817
818#[derive(Debug)]
823pub enum StableConduitError {
824 Encode(vox_postcard::SerializeError),
825 Decode(vox_postcard::DeserializeError),
826 Io(std::io::Error),
827 LinkDead,
828 Setup(String),
829 SessionLost,
832}
833
834impl std::fmt::Display for StableConduitError {
835 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836 match self {
837 Self::Encode(e) => write!(f, "encode error: {e}"),
838 Self::Decode(e) => write!(f, "decode error: {e}"),
839 Self::Io(e) => write!(f, "io error: {e}"),
840 Self::LinkDead => write!(f, "link dead"),
841 Self::Setup(s) => write!(f, "setup error: {s}"),
842 Self::SessionLost => write!(f, "session lost: server rejected resume key"),
843 }
844 }
845}
846
847impl std::error::Error for StableConduitError {}
848
849#[cfg(test)]
854mod tests;