Skip to main content

vox_core/stable_conduit/
mod.rs

1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use facet::Facet;
6use facet_core::PtrConst;
7use facet_reflect::Peek;
8use vox_types::{
9    Conduit, ConduitRx, ConduitTx, Link, LinkRx, LinkTx, MaybeSend, MsgFamily, Payload, SelfRef,
10};
11
12use crate::MessagePlan;
13use zerocopy::little_endian::U32 as LeU32;
14
15mod replay_buffer;
16use replay_buffer::{PacketAck, PacketSeq, ReplayBuffer};
17
18// ---------------------------------------------------------------------------
19// Handshake wire types
20// ---------------------------------------------------------------------------
21
22/// 16-byte CSPRNG-generated session resumption key.
23#[derive(
24    Clone,
25    Copy,
26    zerocopy::FromBytes,
27    zerocopy::IntoBytes,
28    zerocopy::KnownLayout,
29    zerocopy::Immutable,
30)]
31#[repr(C)]
32struct ResumeKey([u8; 16]);
33
34const CLIENT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOCH");
35const SERVER_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOSH");
36
37// ClientHello flags
38const CH_HAS_RESUME_KEY: u8 = 0b0000_0001;
39const CH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
40
41// ServerHello flags
42const SH_REJECTED: u8 = 0b0000_0001;
43const SH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
44
45/// Client's opening handshake — fixed-size, cast directly from wire bytes.
46// r[impl stable.handshake.client-hello]
47#[derive(
48    Clone,
49    Copy,
50    zerocopy::FromBytes,
51    zerocopy::IntoBytes,
52    zerocopy::KnownLayout,
53    zerocopy::Immutable,
54)]
55#[repr(C)]
56pub struct ClientHello {
57    magic: LeU32,
58    flags: u8,
59    resume_key: ResumeKey,
60    last_received: LeU32,
61}
62
63/// Server's handshake response — fixed-size, cast directly from wire bytes.
64// r[impl stable.handshake.server-hello]
65// r[impl stable.reconnect.failure]
66#[derive(
67    Clone,
68    Copy,
69    zerocopy::FromBytes,
70    zerocopy::IntoBytes,
71    zerocopy::KnownLayout,
72    zerocopy::Immutable,
73)]
74#[repr(C)]
75struct ServerHello {
76    magic: LeU32,
77    flags: u8,
78    resume_key: ResumeKey,
79    last_received: LeU32,
80}
81
82/// Sequenced data frame. All post-handshake traffic is a `Frame`.
83///
84/// The `item` field is an opaque [`Payload`] — the message bytes are
85/// serialized/deserialized independently from the frame envelope.
86/// This decouples the frame format from the message schema.
87// r[impl stable.framing]
88// r[impl stable.framing.encoding]
89#[derive(Facet, Debug)]
90struct Frame<'a> {
91    seq: PacketSeq,
92    // r[impl stable.ack]
93    ack: Option<PacketAck>,
94    item: Payload<'a>,
95}
96
97vox_types::impl_reborrow!(Frame);
98
99// ---------------------------------------------------------------------------
100// Attachment / LinkSource
101// ---------------------------------------------------------------------------
102
103/// One transport attachment consumed by [`LinkSource::next_link`].
104///
105/// Use [`Attachment::initiator`] for the initiator side, or
106/// [`prepare_acceptor_attachment`] on inbound links for the acceptor side.
107pub struct Attachment<L> {
108    link: L,
109    client_hello: Option<ClientHello>,
110}
111
112impl<L> Attachment<L> {
113    /// Build an initiator-side attachment.
114    pub fn initiator(link: L) -> Self {
115        Self {
116            link,
117            client_hello: None,
118        }
119    }
120
121    pub(crate) fn into_link(self) -> L {
122        self.link
123    }
124}
125
126/// Link wrapper that re-combines pre-split Tx/Rx halves into a [`Link`].
127///
128/// This is used by [`prepare_acceptor_attachment`] after consuming the inbound
129/// `ClientHello` during stable-conduit setup.
130pub struct SplitLink<Tx, Rx> {
131    tx: Tx,
132    rx: Rx,
133}
134
135impl<Tx, Rx> Link for SplitLink<Tx, Rx>
136where
137    Tx: LinkTx,
138    Rx: LinkRx,
139{
140    type Tx = Tx;
141    type Rx = Rx;
142
143    fn split(self) -> (Self::Tx, Self::Rx) {
144        (self.tx, self.rx)
145    }
146}
147
148/// Prepare an acceptor-side attachment from an inbound link.
149///
150/// This consumes the leading stable `ClientHello` from the inbound link and
151/// returns an attachment suitable for acceptor-side [`StableConduit::new`].
152pub async fn prepare_acceptor_attachment<L: Link>(
153    link: L,
154) -> Result<Attachment<SplitLink<L::Tx, L::Rx>>, StableConduitError> {
155    let (tx, mut rx) = link.split();
156    let client_hello = recv_handshake::<_, ClientHello>(&mut rx).await?;
157    Ok(Attachment {
158        link: SplitLink { tx, rx },
159        client_hello: Some(client_hello),
160    })
161}
162
163// r[impl stable.link-source]
164pub trait LinkSource: MaybeSend + 'static {
165    type Link: Link + MaybeSend;
166
167    fn next_link(
168        &mut self,
169    ) -> impl Future<Output = std::io::Result<Attachment<Self::Link>>> + MaybeSend + '_;
170}
171
172/// A one-shot [`LinkSource`] backed by a single attachment.
173pub struct SingleAttachmentSource<L> {
174    attachment: Option<Attachment<L>>,
175}
176
177/// Build a one-shot [`LinkSource`] from a prepared attachment.
178pub fn single_attachment_source<L: Link + MaybeSend + 'static>(
179    attachment: Attachment<L>,
180) -> SingleAttachmentSource<L> {
181    SingleAttachmentSource {
182        attachment: Some(attachment),
183    }
184}
185
186/// Build a one-shot initiator-side [`LinkSource`] from a raw link.
187pub fn single_link_source<L: Link + MaybeSend + 'static>(link: L) -> SingleAttachmentSource<L> {
188    single_attachment_source(Attachment::initiator(link))
189}
190
191/// Build an already-exhausted [`LinkSource`]. Any call to `next_link` will
192/// fail immediately. Used when the first link is passed directly to
193/// [`StableConduit::with_first_link`] and no reconnection source is available.
194pub fn exhausted_source<L: Link + MaybeSend + 'static>() -> SingleAttachmentSource<L> {
195    SingleAttachmentSource { attachment: None }
196}
197
198impl<L: Link + MaybeSend + 'static> LinkSource for SingleAttachmentSource<L> {
199    type Link = L;
200
201    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
202        self.attachment.take().ok_or_else(|| {
203            std::io::Error::new(
204                std::io::ErrorKind::ConnectionRefused,
205                "single-use LinkSource exhausted",
206            )
207        })
208    }
209}
210
211// ---------------------------------------------------------------------------
212// StableConduit
213// ---------------------------------------------------------------------------
214
215// r[impl stable]
216// r[impl zerocopy.framing.conduit.stable]
217pub struct StableConduit<F: MsgFamily, LS: LinkSource> {
218    shared: Arc<Shared<LS>>,
219    message_plan: Option<MessagePlan>,
220    _phantom: PhantomData<fn(F) -> F>,
221}
222
223struct Shared<LS: LinkSource> {
224    inner: Mutex<Inner<LS>>,
225    reconnecting: AtomicBool,
226    reconnected: moire::sync::Notify,
227    tx_ready: moire::sync::Notify,
228}
229
230struct Inner<LS: LinkSource> {
231    source: Option<LS>,
232    /// Incremented every time the link is replaced. Used to detect whether
233    /// another task has already reconnected while we were waiting.
234    link_generation: u64,
235    tx: Option<<LS::Link as Link>::Tx>,
236    rx: Option<<LS::Link as Link>::Rx>,
237    tx_checked_out: bool,
238    resume_key: Option<ResumeKey>,
239    // r[impl stable.seq]
240    next_send_seq: PacketSeq,
241    last_received: Option<PacketSeq>,
242    // r[impl stable.replay-buffer]
243    /// Encoded item bytes buffered for replay on reconnect.
244    replay: ReplayBuffer,
245}
246
247impl<F: MsgFamily, LS: LinkSource> StableConduit<F, LS> {
248    pub async fn new(mut source: LS) -> Result<Self, StableConduitError> {
249        let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
250        let (link_tx, link_rx) = attachment.link.split();
251        Self::with_first_link(link_tx, link_rx, attachment.client_hello, source).await
252    }
253
254    /// Create a stable conduit with a pre-split first link.
255    ///
256    /// Use this when the first link has already been obtained and processed
257    /// (e.g. after a CBOR session handshake) before the stable conduit's own
258    /// resume handshake runs.
259    pub async fn with_first_link(
260        link_tx: <LS::Link as Link>::Tx,
261        mut link_rx: <LS::Link as Link>::Rx,
262        client_hello: Option<ClientHello>,
263        source: LS,
264    ) -> Result<Self, StableConduitError> {
265        let (resume_key, _peer_last_received) =
266            handshake::<LS::Link>(&link_tx, &mut link_rx, client_hello, None, None).await?;
267
268        let inner = Inner {
269            source: Some(source),
270            link_generation: 0,
271            tx: Some(link_tx),
272            rx: Some(link_rx),
273            tx_checked_out: false,
274            resume_key: Some(resume_key),
275            next_send_seq: PacketSeq(0),
276            last_received: None,
277            replay: ReplayBuffer::new(),
278        };
279
280        Ok(Self {
281            shared: Arc::new(Shared {
282                inner: Mutex::new(inner),
283                reconnecting: AtomicBool::new(false),
284                reconnected: moire::sync::Notify::new("stable_conduit.reconnected"),
285                tx_ready: moire::sync::Notify::new("stable_conduit.tx_ready"),
286            }),
287            message_plan: None,
288            _phantom: PhantomData,
289        })
290    }
291
292    /// Set the message plan for schema-aware deserialization of the payload
293    /// inside each frame.
294    pub fn with_message_plan(mut self, plan: MessagePlan) -> Self {
295        self.message_plan = Some(plan);
296        self
297    }
298}
299
300// ---------------------------------------------------------------------------
301// Reconnect
302// ---------------------------------------------------------------------------
303
304impl<LS: LinkSource> Shared<LS> {
305    fn lock_inner(&self) -> Result<MutexGuard<'_, Inner<LS>>, StableConduitError> {
306        self.inner
307            .lock()
308            .map_err(|_| StableConduitError::Setup("stable conduit mutex poisoned".into()))
309    }
310
311    async fn ensure_reconnected(&self, generation: u64) -> Result<(), StableConduitError> {
312        loop {
313            {
314                let inner = self.lock_inner()?;
315                if inner.link_generation != generation {
316                    return Ok(());
317                }
318            }
319
320            if self
321                .reconnecting
322                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
323                .is_ok()
324            {
325                let result = self.reconnect_once(generation).await;
326                self.reconnecting.store(false, Ordering::Release);
327                self.reconnected.notify_waiters();
328                return result;
329            }
330
331            self.reconnected.notified().await;
332        }
333    }
334
335    /// Obtain a new link from the source, re-handshake, and replay any
336    /// buffered items the peer missed.
337    // r[impl stable.reconnect]
338    // r[impl stable.reconnect.client-replay]
339    // r[impl stable.reconnect.server-replay]
340    // r[impl stable.replay-buffer.order]
341    async fn reconnect_once(&self, generation: u64) -> Result<(), StableConduitError> {
342        let (mut source, resume_key, last_received, replay_frames) = {
343            let mut inner = self.lock_inner()?;
344            if inner.link_generation != generation {
345                return Ok(());
346            }
347            let source = inner
348                .source
349                .take()
350                .ok_or_else(|| StableConduitError::Setup("link source unavailable".into()))?;
351            let replay_frames = inner
352                .replay
353                .iter()
354                .map(|(seq, bytes)| (*seq, bytes.clone()))
355                .collect::<Vec<_>>();
356            (source, inner.resume_key, inner.last_received, replay_frames)
357        };
358
359        let reconnect_result = async {
360            let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
361            let (new_tx, mut new_rx) = attachment.link.split();
362
363            let (new_resume_key, peer_last_received) = handshake::<LS::Link>(
364                &new_tx,
365                &mut new_rx,
366                attachment.client_hello,
367                resume_key,
368                last_received,
369            )
370            .await?;
371
372            // Replay frames the peer hasn't received yet, in original order.
373            // Frame bytes include the original seq/ack — stale acks are
374            // harmless since the peer ignores acks older than what it has seen.
375            for (seq, frame_bytes) in replay_frames {
376                if peer_last_received.is_some_and(|last| seq <= last) {
377                    continue;
378                }
379                new_tx
380                    .send(frame_bytes.clone())
381                    .await
382                    .map_err(StableConduitError::Io)?;
383            }
384
385            Ok::<_, StableConduitError>((new_tx, new_rx, new_resume_key))
386        }
387        .await;
388
389        let mut inner = self.lock_inner()?;
390        inner.source = Some(source);
391
392        if inner.link_generation != generation {
393            return Ok(());
394        }
395
396        let (new_tx, new_rx, new_resume_key) = reconnect_result?;
397
398        inner.link_generation = inner.link_generation.wrapping_add(1);
399        inner.tx = Some(new_tx);
400        inner.rx = Some(new_rx);
401        inner.tx_checked_out = false;
402        inner.resume_key = Some(new_resume_key);
403        self.tx_ready.notify_waiters();
404
405        Ok(())
406    }
407}
408
409/// Perform the handshake on a fresh link.
410///
411/// Returns `(our_resume_key, peer_last_received)`:
412///   - `our_resume_key`: the key to use for the next reconnect attempt
413///   - `peer_last_received`: the highest seq the peer has already seen,
414///     used to decide which replay-buffer entries to re-send
415// r[impl stable.handshake]
416async fn handshake<L: Link>(
417    tx: &L::Tx,
418    rx: &mut L::Rx,
419    client_hello: Option<ClientHello>,
420    resume_key: Option<ResumeKey>,
421    last_received: Option<PacketSeq>,
422) -> Result<(ResumeKey, Option<PacketSeq>), StableConduitError> {
423    match client_hello {
424        None => {
425            // r[impl stable.reconnect]
426            let mut flags = 0u8;
427            if resume_key.is_some() {
428                flags |= CH_HAS_RESUME_KEY;
429            }
430            if last_received.is_some() {
431                flags |= CH_HAS_LAST_RECEIVED;
432            }
433            let hello = ClientHello {
434                magic: LeU32::new(CLIENT_HELLO_MAGIC),
435                flags,
436                resume_key: resume_key.unwrap_or(ResumeKey([0u8; 16])),
437                last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
438            };
439            send_handshake(tx, &hello).await?;
440
441            let sh = recv_handshake::<_, ServerHello>(rx).await?;
442            if sh.magic.get() != SERVER_HELLO_MAGIC {
443                return Err(StableConduitError::Setup(
444                    "ServerHello magic mismatch".into(),
445                ));
446            }
447            // r[impl stable.reconnect.failure]
448            if sh.flags & SH_REJECTED != 0 {
449                return Err(StableConduitError::SessionLost);
450            }
451            let peer_last_received =
452                (sh.flags & SH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(sh.last_received.get()));
453            Ok((sh.resume_key, peer_last_received))
454        }
455        Some(ch) => {
456            // r[impl stable.resume-key]
457            let key = fresh_key()?;
458            let mut flags = 0u8;
459            if last_received.is_some() {
460                flags |= SH_HAS_LAST_RECEIVED;
461            }
462            let hello = ServerHello {
463                magic: LeU32::new(SERVER_HELLO_MAGIC),
464                flags,
465                resume_key: key,
466                last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
467            };
468            send_handshake(tx, &hello).await?;
469
470            let peer_last_received =
471                (ch.flags & CH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(ch.last_received.get()));
472            Ok((key, peer_last_received))
473        }
474    }
475}
476
477async fn send_handshake<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
478    tx: &LTx,
479    msg: &M,
480) -> Result<(), StableConduitError> {
481    tx.send(msg.as_bytes().to_vec())
482        .await
483        .map_err(StableConduitError::Io)
484}
485
486async fn recv_handshake<
487    LRx: LinkRx,
488    M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
489>(
490    rx: &mut LRx,
491) -> Result<M, StableConduitError> {
492    let backing = rx
493        .recv()
494        .await
495        .map_err(|_| StableConduitError::LinkDead)?
496        .ok_or(StableConduitError::LinkDead)?;
497    M::read_from_bytes(backing.as_bytes())
498        .map_err(|_| StableConduitError::Setup("handshake message size mismatch".into()))
499}
500
501/// Receive a stable conduit `ClientHello` from a link.
502///
503/// Used by the acceptor when the CBOR session handshake has already been
504/// completed on the link — the next bytes are the stable conduit's
505/// binary `ClientHello`.
506pub async fn recv_client_hello<Rx: LinkRx>(rx: &mut Rx) -> Result<ClientHello, StableConduitError> {
507    recv_handshake::<_, ClientHello>(rx).await
508}
509
510fn fresh_key() -> Result<ResumeKey, StableConduitError> {
511    let mut key = ResumeKey([0u8; 16]);
512    getrandom::fill(&mut key.0)
513        .map_err(|e| StableConduitError::Setup(format!("failed to generate resume key: {e}")))?;
514    Ok(key)
515}
516
517// ---------------------------------------------------------------------------
518// Conduit impl
519// ---------------------------------------------------------------------------
520
521impl<F: MsgFamily, LS: LinkSource> Conduit for StableConduit<F, LS>
522where
523    <LS::Link as Link>::Tx: MaybeSend + 'static,
524    <LS::Link as Link>::Rx: MaybeSend + 'static,
525    LS: MaybeSend + 'static,
526{
527    type Msg = F;
528    type Tx = StableConduitTx<F, LS>;
529    type Rx = StableConduitRx<F, LS>;
530
531    fn split(self) -> (Self::Tx, Self::Rx) {
532        (
533            StableConduitTx {
534                shared: Arc::clone(&self.shared),
535                _phantom: PhantomData,
536            },
537            StableConduitRx {
538                shared: Arc::clone(&self.shared),
539                message_plan: self.message_plan,
540                _phantom: PhantomData,
541            },
542        )
543    }
544}
545
546// ---------------------------------------------------------------------------
547// Tx
548// ---------------------------------------------------------------------------
549
550pub struct StableConduitTx<F: MsgFamily, LS: LinkSource> {
551    shared: Arc<Shared<LS>>,
552    _phantom: PhantomData<fn(F)>,
553}
554
555impl<F: MsgFamily, LS: LinkSource> ConduitTx for StableConduitTx<F, LS>
556where
557    <LS::Link as Link>::Tx: MaybeSend + 'static,
558    <LS::Link as Link>::Rx: MaybeSend + 'static,
559    LS: MaybeSend + 'static,
560{
561    type Msg = F;
562    type Prepared = StablePreparedMessage;
563    type Error = StableConduitError;
564
565    fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
566        prepare_frame::<F, LS>(&self.shared, item)
567    }
568
569    async fn send_prepared(&self, mut prepared: Self::Prepared) -> Result<(), Self::Error> {
570        enum TxReservation<Tx> {
571            CheckedOut { tx: Tx },
572            Wait,
573            Reconnect { generation: u64 },
574        }
575
576        loop {
577            let reservation = {
578                let mut inner = self.shared.lock_inner()?;
579                match inner.tx.take() {
580                    Some(tx) => {
581                        inner.tx_checked_out = true;
582                        TxReservation::CheckedOut { tx }
583                    }
584                    None if inner.tx_checked_out => TxReservation::Wait,
585                    None => TxReservation::Reconnect {
586                        generation: inner.link_generation,
587                    },
588                }
589            };
590
591            let tx = match reservation {
592                TxReservation::CheckedOut { tx } => tx,
593                TxReservation::Wait => {
594                    self.shared.tx_ready.notified().await;
595                    continue;
596                }
597                TxReservation::Reconnect { generation } => {
598                    self.shared.ensure_reconnected(generation).await?;
599                    continue;
600                }
601            };
602
603            if prepared.framed.is_none() {
604                let framed = {
605                    let mut inner = self.shared.lock_inner()?;
606                    let seq = inner.next_send_seq;
607                    inner.next_send_seq = PacketSeq(seq.0.wrapping_add(1));
608                    let ack = inner
609                        .last_received
610                        .map(|max_delivered| PacketAck { max_delivered });
611                    let frame_bytes = encode_frame_bytes(seq, ack, &prepared.item_bytes)?;
612                    (seq, frame_bytes)
613                };
614                prepared.framed = Some(framed);
615            }
616
617            let (seq, frame_bytes) = prepared
618                .framed
619                .as_ref()
620                .expect("stable prepared messages should be framed before send");
621
622            let send_result = tx.send(frame_bytes.clone()).await;
623            let generation = {
624                let mut inner = self.shared.lock_inner()?;
625                let generation = inner.link_generation;
626                if inner.tx.is_none() {
627                    inner.tx = Some(tx);
628                }
629                inner.tx_checked_out = false;
630                self.shared.tx_ready.notify_waiters();
631                generation
632            };
633
634            match send_result {
635                Ok(()) => {
636                    self.shared
637                        .lock_inner()?
638                        .replay
639                        .push(*seq, frame_bytes.clone());
640                    return Ok(());
641                }
642                Err(_) => {
643                    self.shared.ensure_reconnected(generation).await?;
644                }
645            }
646        }
647    }
648
649    async fn close(self) -> std::io::Result<()> {
650        let tx = {
651            let mut inner = self
652                .shared
653                .lock_inner()
654                .map_err(|e| std::io::Error::other(e.to_string()))?;
655            inner.tx.take()
656        };
657        if let Some(tx) = tx {
658            tx.close().await?;
659        }
660        Ok(())
661    }
662}
663
664fn encode_frame_bytes(
665    seq: PacketSeq,
666    ack: Option<PacketAck>,
667    item_bytes: &[u8],
668) -> Result<Vec<u8>, StableConduitError> {
669    let frame = Frame {
670        seq,
671        ack,
672        item: Payload::PostcardBytes(item_bytes),
673    };
674    #[allow(unsafe_code)]
675    let peek = unsafe {
676        Peek::unchecked_new(
677            PtrConst::new((&raw const frame).cast::<u8>()),
678            <Frame<'static> as Facet<'static>>::SHAPE,
679        )
680    };
681    let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(StableConduitError::Encode)?;
682    let mut frame_bytes = vec![0u8; plan.total_size()];
683    plan.write_into(&mut frame_bytes);
684    Ok(frame_bytes)
685}
686
687pub struct StablePreparedMessage {
688    item_bytes: Vec<u8>,
689    framed: Option<(PacketSeq, Vec<u8>)>,
690}
691
692// r[impl zerocopy.framing.single-pass]
693// r[impl zerocopy.framing.no-double-serialize]
694// r[impl zerocopy.scatter]
695// r[impl zerocopy.scatter.plan]
696// r[impl zerocopy.scatter.plan.size]
697// r[impl zerocopy.scatter.write]
698// r[impl zerocopy.scatter.lifetime]
699// r[impl zerocopy.scatter.replay]
700fn prepare_frame<F: MsgFamily, LS: LinkSource>(
701    shared: &Arc<Shared<LS>>,
702    item: F::Msg<'_>,
703) -> Result<StablePreparedMessage, StableConduitError> {
704    let _ = shared;
705    let item_bytes = vox_postcard::to_vec(&item).map_err(StableConduitError::Encode)?;
706    Ok(StablePreparedMessage {
707        item_bytes,
708        framed: None,
709    })
710}
711// ---------------------------------------------------------------------------
712// Rx
713// ---------------------------------------------------------------------------
714
715pub struct StableConduitRx<F: MsgFamily, LS: LinkSource> {
716    shared: Arc<Shared<LS>>,
717    message_plan: Option<MessagePlan>,
718    _phantom: PhantomData<fn() -> F>,
719}
720
721impl<F: MsgFamily, LS: LinkSource> ConduitRx for StableConduitRx<F, LS>
722where
723    <LS::Link as Link>::Tx: MaybeSend + 'static,
724    <LS::Link as Link>::Rx: MaybeSend + 'static,
725    LS: MaybeSend + 'static,
726{
727    type Msg = F;
728    type Error = StableConduitError;
729
730    #[moire::instrument]
731    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
732        loop {
733            // Phase 1: take current Rx out of shared state, then await without locks held.
734            let (rx_opt, generation) = {
735                let mut inner = self.shared.lock_inner()?;
736                (inner.rx.take(), inner.link_generation)
737            }; // lock released here — no guard held across any await below
738            let mut rx = match rx_opt {
739                Some(rx) => rx,
740                None => {
741                    self.shared.ensure_reconnected(generation).await?;
742                    continue;
743                }
744            };
745
746            // Any link termination — graceful EOF or error — triggers reconnect.
747            // The session ends only when the LinkSource itself fails (no more
748            // links available), which surfaces as Err.
749            let recv_result = rx.recv().await;
750
751            // Put Rx back only if we're still on the same generation and no newer
752            // Rx has been installed by reconnect.
753            {
754                let mut inner = self.shared.lock_inner()?;
755                if inner.link_generation == generation && inner.rx.is_none() {
756                    inner.rx = Some(rx);
757                }
758            }
759
760            let backing = match recv_result {
761                Ok(Some(b)) => b,
762                Ok(None) | Err(_) => {
763                    // r[impl stable.reconnect]
764                    self.shared.ensure_reconnected(generation).await?;
765                    continue;
766                }
767            };
768
769            // Phase 2: deserialize the frame envelope (seq/ack + opaque payload bytes).
770            let frame: SelfRef<Frame<'static>> =
771                crate::deserialize_postcard(backing).map_err(StableConduitError::Decode)?;
772
773            // Phase 3: update shared state; skip duplicates.
774            // r[impl stable.seq.monotonic]
775            // r[impl stable.ack.trim]
776            let is_dup = {
777                let frame = frame.get();
778                let mut inner = self.shared.lock_inner()?;
779
780                if let Some(ack) = frame.ack {
781                    inner.replay.trim(ack);
782                }
783
784                let dup = inner.last_received.is_some_and(|prev| frame.seq <= prev);
785                if !dup {
786                    inner.last_received = Some(frame.seq);
787                }
788                dup
789            };
790
791            if is_dup {
792                continue;
793            }
794
795            // Phase 4: deserialize the message from the payload bytes
796            // using the message plan for schema-aware translation.
797            let frame = frame.get();
798            let item_bytes = match &frame.item {
799                Payload::PostcardBytes(bytes) => bytes,
800                _ => unreachable!("deserialized Payload should always be Incoming"),
801            };
802            let item_backing = vox_types::Backing::Boxed(item_bytes.to_vec().into());
803            let msg = match &self.message_plan {
804                Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
805                    item_backing,
806                    &plan.plan,
807                    &plan.registry,
808                ),
809                None => crate::deserialize_postcard::<F::Msg<'static>>(item_backing),
810            }
811            .map_err(StableConduitError::Decode)?;
812
813            return Ok(Some(msg));
814        }
815    }
816}
817
818// ---------------------------------------------------------------------------
819// Error
820// ---------------------------------------------------------------------------
821
822#[derive(Debug)]
823pub enum StableConduitError {
824    Encode(vox_postcard::SerializeError),
825    Decode(vox_postcard::DeserializeError),
826    Io(std::io::Error),
827    LinkDead,
828    Setup(String),
829    /// The server rejected our resume_key; the session is permanently lost.
830    // r[impl stable.reconnect.failure]
831    SessionLost,
832}
833
834impl std::fmt::Display for StableConduitError {
835    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836        match self {
837            Self::Encode(e) => write!(f, "encode error: {e}"),
838            Self::Decode(e) => write!(f, "decode error: {e}"),
839            Self::Io(e) => write!(f, "io error: {e}"),
840            Self::LinkDead => write!(f, "link dead"),
841            Self::Setup(s) => write!(f, "setup error: {s}"),
842            Self::SessionLost => write!(f, "session lost: server rejected resume key"),
843        }
844    }
845}
846
847impl std::error::Error for StableConduitError {}
848
849// ---------------------------------------------------------------------------
850// Tests
851// ---------------------------------------------------------------------------
852
853#[cfg(test)]
854mod tests;