Skip to main content

vox_core/stable_conduit/
mod.rs

1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use facet::Facet;
6use facet_core::PtrConst;
7use facet_reflect::Peek;
8use vox_types::{
9    Conduit, ConduitRx, ConduitTx, ConduitTxPermit, Link, LinkRx, LinkTx, LinkTxPermit, MsgFamily,
10    Payload, SelfRef, WriteSlot,
11};
12
13use crate::MessagePlan;
14use zerocopy::little_endian::U32 as LeU32;
15
16mod replay_buffer;
17use replay_buffer::{PacketAck, PacketSeq, ReplayBuffer};
18
19// ---------------------------------------------------------------------------
20// Handshake wire types
21// ---------------------------------------------------------------------------
22
23/// 16-byte CSPRNG-generated session resumption key.
24#[derive(
25    Clone,
26    Copy,
27    zerocopy::FromBytes,
28    zerocopy::IntoBytes,
29    zerocopy::KnownLayout,
30    zerocopy::Immutable,
31)]
32#[repr(C)]
33struct ResumeKey([u8; 16]);
34
35const CLIENT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOCH");
36const SERVER_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOSH");
37
38// ClientHello flags
39const CH_HAS_RESUME_KEY: u8 = 0b0000_0001;
40const CH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
41
42// ServerHello flags
43const SH_REJECTED: u8 = 0b0000_0001;
44const SH_HAS_LAST_RECEIVED: u8 = 0b0000_0010;
45
46/// Client's opening handshake — fixed-size, cast directly from wire bytes.
47// r[impl stable.handshake.client-hello]
48#[derive(
49    Clone,
50    Copy,
51    zerocopy::FromBytes,
52    zerocopy::IntoBytes,
53    zerocopy::KnownLayout,
54    zerocopy::Immutable,
55)]
56#[repr(C)]
57pub struct ClientHello {
58    magic: LeU32,
59    flags: u8,
60    resume_key: ResumeKey,
61    last_received: LeU32,
62}
63
64/// Server's handshake response — fixed-size, cast directly from wire bytes.
65// r[impl stable.handshake.server-hello]
66// r[impl stable.reconnect.failure]
67#[derive(
68    Clone,
69    Copy,
70    zerocopy::FromBytes,
71    zerocopy::IntoBytes,
72    zerocopy::KnownLayout,
73    zerocopy::Immutable,
74)]
75#[repr(C)]
76struct ServerHello {
77    magic: LeU32,
78    flags: u8,
79    resume_key: ResumeKey,
80    last_received: LeU32,
81}
82
83/// Sequenced data frame. All post-handshake traffic is a `Frame`.
84///
85/// The `item` field is an opaque [`Payload`] — the message bytes are
86/// serialized/deserialized independently from the frame envelope.
87/// This decouples the frame format from the message schema.
88// r[impl stable.framing]
89// r[impl stable.framing.encoding]
90#[derive(Facet, Debug)]
91struct Frame<'a> {
92    seq: PacketSeq,
93    // r[impl stable.ack]
94    ack: Option<PacketAck>,
95    item: Payload<'a>,
96}
97
98// ---------------------------------------------------------------------------
99// Attachment / LinkSource
100// ---------------------------------------------------------------------------
101
102/// One transport attachment consumed by [`LinkSource::next_link`].
103///
104/// Use [`Attachment::initiator`] for the initiator side, or
105/// [`prepare_acceptor_attachment`] on inbound links for the acceptor side.
106pub struct Attachment<L> {
107    link: L,
108    client_hello: Option<ClientHello>,
109}
110
111impl<L> Attachment<L> {
112    /// Build an initiator-side attachment.
113    pub fn initiator(link: L) -> Self {
114        Self {
115            link,
116            client_hello: None,
117        }
118    }
119
120    pub(crate) fn into_link(self) -> L {
121        self.link
122    }
123}
124
125/// Link wrapper that re-combines pre-split Tx/Rx halves into a [`Link`].
126///
127/// This is used by [`prepare_acceptor_attachment`] after consuming the inbound
128/// `ClientHello` during stable-conduit setup.
129pub struct SplitLink<Tx, Rx> {
130    tx: Tx,
131    rx: Rx,
132}
133
134impl<Tx, Rx> Link for SplitLink<Tx, Rx>
135where
136    Tx: LinkTx,
137    Rx: LinkRx,
138{
139    type Tx = Tx;
140    type Rx = Rx;
141
142    fn split(self) -> (Self::Tx, Self::Rx) {
143        (self.tx, self.rx)
144    }
145}
146
147/// Prepare an acceptor-side attachment from an inbound link.
148///
149/// This consumes the leading stable `ClientHello` from the inbound link and
150/// returns an attachment suitable for acceptor-side [`StableConduit::new`].
151pub async fn prepare_acceptor_attachment<L: Link>(
152    link: L,
153) -> Result<Attachment<SplitLink<L::Tx, L::Rx>>, StableConduitError> {
154    let (tx, mut rx) = link.split();
155    let client_hello = recv_handshake::<_, ClientHello>(&mut rx).await?;
156    Ok(Attachment {
157        link: SplitLink { tx, rx },
158        client_hello: Some(client_hello),
159    })
160}
161
162// r[impl stable.link-source]
163pub trait LinkSource: Send + 'static {
164    type Link: Link + Send;
165
166    fn next_link(
167        &mut self,
168    ) -> impl Future<Output = std::io::Result<Attachment<Self::Link>>> + Send + '_;
169}
170
171/// A one-shot [`LinkSource`] backed by a single attachment.
172pub struct SingleAttachmentSource<L> {
173    attachment: Option<Attachment<L>>,
174}
175
176/// Build a one-shot [`LinkSource`] from a prepared attachment.
177pub fn single_attachment_source<L: Link + Send + 'static>(
178    attachment: Attachment<L>,
179) -> SingleAttachmentSource<L> {
180    SingleAttachmentSource {
181        attachment: Some(attachment),
182    }
183}
184
185/// Build a one-shot initiator-side [`LinkSource`] from a raw link.
186pub fn single_link_source<L: Link + Send + 'static>(link: L) -> SingleAttachmentSource<L> {
187    single_attachment_source(Attachment::initiator(link))
188}
189
190/// Build an already-exhausted [`LinkSource`]. Any call to `next_link` will
191/// fail immediately. Used when the first link is passed directly to
192/// [`StableConduit::with_first_link`] and no reconnection source is available.
193pub fn exhausted_source<L: Link + Send + 'static>() -> SingleAttachmentSource<L> {
194    SingleAttachmentSource { attachment: None }
195}
196
197impl<L: Link + Send + 'static> LinkSource for SingleAttachmentSource<L> {
198    type Link = L;
199
200    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
201        self.attachment.take().ok_or_else(|| {
202            std::io::Error::new(
203                std::io::ErrorKind::ConnectionRefused,
204                "single-use LinkSource exhausted",
205            )
206        })
207    }
208}
209
210// ---------------------------------------------------------------------------
211// StableConduit
212// ---------------------------------------------------------------------------
213
214// r[impl stable]
215// r[impl zerocopy.framing.conduit.stable]
216pub struct StableConduit<F: MsgFamily, LS: LinkSource> {
217    shared: Arc<Shared<LS>>,
218    message_plan: Option<MessagePlan>,
219    _phantom: PhantomData<fn(F) -> F>,
220}
221
222struct Shared<LS: LinkSource> {
223    inner: Mutex<Inner<LS>>,
224    reconnecting: AtomicBool,
225    reconnected: moire::sync::Notify,
226    tx_ready: moire::sync::Notify,
227}
228
229struct Inner<LS: LinkSource> {
230    source: Option<LS>,
231    /// Incremented every time the link is replaced. Used to detect whether
232    /// another task has already reconnected while we were waiting.
233    link_generation: u64,
234    tx: Option<<LS::Link as Link>::Tx>,
235    rx: Option<<LS::Link as Link>::Rx>,
236    tx_checked_out: bool,
237    resume_key: Option<ResumeKey>,
238    // r[impl stable.seq]
239    next_send_seq: PacketSeq,
240    last_received: Option<PacketSeq>,
241    // r[impl stable.replay-buffer]
242    /// Encoded item bytes buffered for replay on reconnect.
243    replay: ReplayBuffer,
244}
245
246impl<F: MsgFamily, LS: LinkSource> StableConduit<F, LS> {
247    pub async fn new(mut source: LS) -> Result<Self, StableConduitError> {
248        let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
249        let (link_tx, link_rx) = attachment.link.split();
250        Self::with_first_link(link_tx, link_rx, attachment.client_hello, source).await
251    }
252
253    /// Create a stable conduit with a pre-split first link.
254    ///
255    /// Use this when the first link has already been obtained and processed
256    /// (e.g. after a CBOR session handshake) before the stable conduit's own
257    /// resume handshake runs.
258    pub async fn with_first_link(
259        link_tx: <LS::Link as Link>::Tx,
260        mut link_rx: <LS::Link as Link>::Rx,
261        client_hello: Option<ClientHello>,
262        source: LS,
263    ) -> Result<Self, StableConduitError> {
264        let (resume_key, _peer_last_received) =
265            handshake::<LS::Link>(&link_tx, &mut link_rx, client_hello, None, None).await?;
266
267        let inner = Inner {
268            source: Some(source),
269            link_generation: 0,
270            tx: Some(link_tx),
271            rx: Some(link_rx),
272            tx_checked_out: false,
273            resume_key: Some(resume_key),
274            next_send_seq: PacketSeq(0),
275            last_received: None,
276            replay: ReplayBuffer::new(),
277        };
278
279        Ok(Self {
280            shared: Arc::new(Shared {
281                inner: Mutex::new(inner),
282                reconnecting: AtomicBool::new(false),
283                reconnected: moire::sync::Notify::new("stable_conduit.reconnected"),
284                tx_ready: moire::sync::Notify::new("stable_conduit.tx_ready"),
285            }),
286            message_plan: None,
287            _phantom: PhantomData,
288        })
289    }
290
291    /// Set the message plan for schema-aware deserialization of the payload
292    /// inside each frame.
293    pub fn with_message_plan(mut self, plan: MessagePlan) -> Self {
294        self.message_plan = Some(plan);
295        self
296    }
297}
298
299// ---------------------------------------------------------------------------
300// Reconnect
301// ---------------------------------------------------------------------------
302
303impl<LS: LinkSource> Shared<LS> {
304    fn lock_inner(&self) -> Result<MutexGuard<'_, Inner<LS>>, StableConduitError> {
305        self.inner
306            .lock()
307            .map_err(|_| StableConduitError::Setup("stable conduit mutex poisoned".into()))
308    }
309
310    async fn ensure_reconnected(&self, generation: u64) -> Result<(), StableConduitError> {
311        loop {
312            {
313                let inner = self.lock_inner()?;
314                if inner.link_generation != generation {
315                    return Ok(());
316                }
317            }
318
319            if self
320                .reconnecting
321                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
322                .is_ok()
323            {
324                let result = self.reconnect_once(generation).await;
325                self.reconnecting.store(false, Ordering::Release);
326                self.reconnected.notify_waiters();
327                return result;
328            }
329
330            self.reconnected.notified().await;
331        }
332    }
333
334    /// Obtain a new link from the source, re-handshake, and replay any
335    /// buffered items the peer missed.
336    // r[impl stable.reconnect]
337    // r[impl stable.reconnect.client-replay]
338    // r[impl stable.reconnect.server-replay]
339    // r[impl stable.replay-buffer.order]
340    async fn reconnect_once(&self, generation: u64) -> Result<(), StableConduitError> {
341        let (mut source, resume_key, last_received, replay_frames) = {
342            let mut inner = self.lock_inner()?;
343            if inner.link_generation != generation {
344                return Ok(());
345            }
346            let source = inner
347                .source
348                .take()
349                .ok_or_else(|| StableConduitError::Setup("link source unavailable".into()))?;
350            let replay_frames = inner
351                .replay
352                .iter()
353                .map(|(seq, bytes)| (*seq, bytes.clone()))
354                .collect::<Vec<_>>();
355            (source, inner.resume_key, inner.last_received, replay_frames)
356        };
357
358        let reconnect_result = async {
359            let attachment = source.next_link().await.map_err(StableConduitError::Io)?;
360            let (new_tx, mut new_rx) = attachment.link.split();
361
362            let (new_resume_key, peer_last_received) = handshake::<LS::Link>(
363                &new_tx,
364                &mut new_rx,
365                attachment.client_hello,
366                resume_key,
367                last_received,
368            )
369            .await?;
370
371            // Replay frames the peer hasn't received yet, in original order.
372            // Frame bytes include the original seq/ack — stale acks are
373            // harmless since the peer ignores acks older than what it has seen.
374            for (seq, frame_bytes) in replay_frames {
375                if peer_last_received.is_some_and(|last| seq <= last) {
376                    continue;
377                }
378                let permit = new_tx.reserve().await.map_err(StableConduitError::Io)?;
379                let mut slot = permit
380                    .alloc(frame_bytes.len())
381                    .map_err(StableConduitError::Io)?;
382                slot.as_mut_slice().copy_from_slice(&frame_bytes);
383                slot.commit();
384            }
385
386            Ok::<_, StableConduitError>((new_tx, new_rx, new_resume_key))
387        }
388        .await;
389
390        let mut inner = self.lock_inner()?;
391        inner.source = Some(source);
392
393        if inner.link_generation != generation {
394            return Ok(());
395        }
396
397        let (new_tx, new_rx, new_resume_key) = reconnect_result?;
398
399        inner.link_generation = inner.link_generation.wrapping_add(1);
400        inner.tx = Some(new_tx);
401        inner.rx = Some(new_rx);
402        inner.tx_checked_out = false;
403        inner.resume_key = Some(new_resume_key);
404        self.tx_ready.notify_waiters();
405
406        Ok(())
407    }
408}
409
410/// Perform the handshake on a fresh link.
411///
412/// Returns `(our_resume_key, peer_last_received)`:
413///   - `our_resume_key`: the key to use for the next reconnect attempt
414///   - `peer_last_received`: the highest seq the peer has already seen,
415///     used to decide which replay-buffer entries to re-send
416// r[impl stable.handshake]
417async fn handshake<L: Link>(
418    tx: &L::Tx,
419    rx: &mut L::Rx,
420    client_hello: Option<ClientHello>,
421    resume_key: Option<ResumeKey>,
422    last_received: Option<PacketSeq>,
423) -> Result<(ResumeKey, Option<PacketSeq>), StableConduitError> {
424    match client_hello {
425        None => {
426            // r[impl stable.reconnect]
427            let mut flags = 0u8;
428            if resume_key.is_some() {
429                flags |= CH_HAS_RESUME_KEY;
430            }
431            if last_received.is_some() {
432                flags |= CH_HAS_LAST_RECEIVED;
433            }
434            let hello = ClientHello {
435                magic: LeU32::new(CLIENT_HELLO_MAGIC),
436                flags,
437                resume_key: resume_key.unwrap_or(ResumeKey([0u8; 16])),
438                last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
439            };
440            send_handshake(tx, &hello).await?;
441
442            let sh = recv_handshake::<_, ServerHello>(rx).await?;
443            if sh.magic.get() != SERVER_HELLO_MAGIC {
444                return Err(StableConduitError::Setup(
445                    "ServerHello magic mismatch".into(),
446                ));
447            }
448            // r[impl stable.reconnect.failure]
449            if sh.flags & SH_REJECTED != 0 {
450                return Err(StableConduitError::SessionLost);
451            }
452            let peer_last_received =
453                (sh.flags & SH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(sh.last_received.get()));
454            Ok((sh.resume_key, peer_last_received))
455        }
456        Some(ch) => {
457            // r[impl stable.resume-key]
458            let key = fresh_key()?;
459            let mut flags = 0u8;
460            if last_received.is_some() {
461                flags |= SH_HAS_LAST_RECEIVED;
462            }
463            let hello = ServerHello {
464                magic: LeU32::new(SERVER_HELLO_MAGIC),
465                flags,
466                resume_key: key,
467                last_received: LeU32::new(last_received.map_or(0, |s| s.0)),
468            };
469            send_handshake(tx, &hello).await?;
470
471            let peer_last_received =
472                (ch.flags & CH_HAS_LAST_RECEIVED != 0).then(|| PacketSeq(ch.last_received.get()));
473            Ok((key, peer_last_received))
474        }
475    }
476}
477
478async fn send_handshake<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
479    tx: &LTx,
480    msg: &M,
481) -> Result<(), StableConduitError> {
482    let bytes = msg.as_bytes();
483    let permit = tx.reserve().await.map_err(StableConduitError::Io)?;
484    let mut slot = permit.alloc(bytes.len()).map_err(StableConduitError::Io)?;
485    slot.as_mut_slice().copy_from_slice(bytes);
486    slot.commit();
487    Ok(())
488}
489
490async fn recv_handshake<
491    LRx: LinkRx,
492    M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
493>(
494    rx: &mut LRx,
495) -> Result<M, StableConduitError> {
496    let backing = rx
497        .recv()
498        .await
499        .map_err(|_| StableConduitError::LinkDead)?
500        .ok_or(StableConduitError::LinkDead)?;
501    M::read_from_bytes(backing.as_bytes())
502        .map_err(|_| StableConduitError::Setup("handshake message size mismatch".into()))
503}
504
505/// Receive a stable conduit `ClientHello` from a link.
506///
507/// Used by the acceptor when the CBOR session handshake has already been
508/// completed on the link — the next bytes are the stable conduit's
509/// binary `ClientHello`.
510pub async fn recv_client_hello<Rx: LinkRx>(rx: &mut Rx) -> Result<ClientHello, StableConduitError> {
511    recv_handshake::<_, ClientHello>(rx).await
512}
513
514fn fresh_key() -> Result<ResumeKey, StableConduitError> {
515    let mut key = ResumeKey([0u8; 16]);
516    getrandom::fill(&mut key.0)
517        .map_err(|e| StableConduitError::Setup(format!("failed to generate resume key: {e}")))?;
518    Ok(key)
519}
520
521// ---------------------------------------------------------------------------
522// Conduit impl
523// ---------------------------------------------------------------------------
524
525impl<F: MsgFamily, LS: LinkSource> Conduit for StableConduit<F, LS>
526where
527    <LS::Link as Link>::Tx: Send + 'static,
528    <LS::Link as Link>::Rx: Send + 'static,
529    LS: Send + 'static,
530{
531    type Msg = F;
532    type Tx = StableConduitTx<F, LS>;
533    type Rx = StableConduitRx<F, LS>;
534
535    fn split(self) -> (Self::Tx, Self::Rx) {
536        (
537            StableConduitTx {
538                shared: Arc::clone(&self.shared),
539                _phantom: PhantomData,
540            },
541            StableConduitRx {
542                shared: Arc::clone(&self.shared),
543                message_plan: self.message_plan,
544                _phantom: PhantomData,
545            },
546        )
547    }
548}
549
550// ---------------------------------------------------------------------------
551// Tx
552// ---------------------------------------------------------------------------
553
554pub struct StableConduitTx<F: MsgFamily, LS: LinkSource> {
555    shared: Arc<Shared<LS>>,
556    _phantom: PhantomData<fn(F)>,
557}
558
559impl<F: MsgFamily, LS: LinkSource> ConduitTx for StableConduitTx<F, LS>
560where
561    <LS::Link as Link>::Tx: Send + 'static,
562    <LS::Link as Link>::Rx: Send + 'static,
563    LS: Send + 'static,
564{
565    type Msg = F;
566    type Permit<'a>
567        = StableConduitPermit<F, LS>
568    where
569        Self: 'a;
570
571    async fn reserve(&self) -> std::io::Result<Self::Permit<'_>> {
572        enum TxReservation<Tx> {
573            CheckedOut { tx: Tx, generation: u64 },
574            Wait,
575            Reconnect { generation: u64 },
576        }
577
578        loop {
579            let reservation = {
580                let mut inner = self
581                    .shared
582                    .lock_inner()
583                    .map_err(|e| std::io::Error::other(e.to_string()))?;
584                match inner.tx.take() {
585                    Some(tx) => {
586                        inner.tx_checked_out = true;
587                        TxReservation::CheckedOut {
588                            tx,
589                            generation: inner.link_generation,
590                        }
591                    }
592                    None if inner.tx_checked_out => TxReservation::Wait,
593                    None => TxReservation::Reconnect {
594                        generation: inner.link_generation,
595                    },
596                }
597            };
598
599            let (tx, generation) = match reservation {
600                TxReservation::CheckedOut { tx, generation } => (tx, generation),
601                TxReservation::Wait => {
602                    self.shared.tx_ready.notified().await;
603                    continue;
604                }
605                TxReservation::Reconnect { generation } => {
606                    self.shared
607                        .ensure_reconnected(generation)
608                        .await
609                        .map_err(|e| std::io::Error::other(e.to_string()))?;
610                    continue;
611                }
612            };
613
614            match tx.reserve().await {
615                Ok(link_permit) => {
616                    let restore_ok = {
617                        let mut inner = self
618                            .shared
619                            .lock_inner()
620                            .map_err(|e| std::io::Error::other(e.to_string()))?;
621                        let restore_ok = inner.link_generation == generation && inner.tx.is_none();
622                        if restore_ok {
623                            inner.tx = Some(tx);
624                        }
625                        inner.tx_checked_out = false;
626                        self.shared.tx_ready.notify_waiters();
627                        restore_ok
628                    };
629
630                    if !restore_ok {
631                        drop(link_permit);
632                        continue;
633                    }
634
635                    return Ok(StableConduitPermit {
636                        shared: Arc::clone(&self.shared),
637                        link_permit,
638                        generation,
639                        _phantom: PhantomData,
640                    });
641                }
642                Err(_) => {
643                    {
644                        let mut inner = self
645                            .shared
646                            .lock_inner()
647                            .map_err(|e| std::io::Error::other(e.to_string()))?;
648                        if inner.link_generation == generation {
649                            inner.tx_checked_out = false;
650                        }
651                        self.shared.tx_ready.notify_waiters();
652                    }
653                    self.shared
654                        .ensure_reconnected(generation)
655                        .await
656                        .map_err(|e| std::io::Error::other(e.to_string()))?;
657                }
658            }
659        }
660    }
661
662    async fn close(self) -> std::io::Result<()> {
663        let tx = {
664            let mut inner = self
665                .shared
666                .lock_inner()
667                .map_err(|e| std::io::Error::other(e.to_string()))?;
668            inner.tx.take()
669        };
670        if let Some(tx) = tx {
671            tx.close().await?;
672        }
673        Ok(())
674    }
675}
676
677// ---------------------------------------------------------------------------
678// Permit
679// ---------------------------------------------------------------------------
680
681pub struct StableConduitPermit<F: MsgFamily, LS: LinkSource> {
682    shared: Arc<Shared<LS>>,
683    link_permit: <<LS::Link as Link>::Tx as LinkTx>::Permit,
684    generation: u64,
685    _phantom: PhantomData<fn(F)>,
686}
687
688impl<F: MsgFamily, LS: LinkSource> ConduitTxPermit for StableConduitPermit<F, LS> {
689    type Msg = F;
690    type Error = StableConduitError;
691
692    // r[impl zerocopy.framing.single-pass]
693    // r[impl zerocopy.framing.no-double-serialize]
694    // r[impl zerocopy.scatter]
695    // r[impl zerocopy.scatter.plan]
696    // r[impl zerocopy.scatter.plan.size]
697    // r[impl zerocopy.scatter.write]
698    // r[impl zerocopy.scatter.lifetime]
699    // r[impl zerocopy.scatter.replay]
700    fn send(self, item: F::Msg<'_>) -> Result<(), StableConduitError> {
701        let StableConduitPermit {
702            shared,
703            link_permit,
704            generation,
705            _phantom: _,
706        } = self;
707
708        let (seq, ack) = {
709            let mut inner = shared.lock_inner()?;
710            if inner.link_generation != generation {
711                return Err(StableConduitError::LinkDead);
712            }
713            let seq = inner.next_send_seq;
714            inner.next_send_seq = PacketSeq(seq.0.wrapping_add(1));
715            let ack = inner
716                .last_received
717                .map(|max_delivered| PacketAck { max_delivered });
718            (seq, ack)
719        };
720
721        // Wrap the message as an outgoing Payload — the opaque adapter
722        // serializes its bytes inline, giving us one scatter pass for the
723        // whole frame (header + message bytes).
724        let msg_shape = F::shape();
725        // SAFETY: item is a valid F::Msg<'_> and msg_shape matches it.
726        #[allow(unsafe_code)]
727        let payload = unsafe {
728            Payload::outgoing_unchecked(PtrConst::new((&raw const item).cast::<u8>()), msg_shape)
729        };
730
731        let frame = Frame {
732            seq,
733            ack,
734            item: payload,
735        };
736
737        // SAFETY: Frame<'_> shape is lifetime-independent.
738        #[allow(unsafe_code)]
739        let peek = unsafe {
740            Peek::unchecked_new(
741                PtrConst::new((&raw const frame).cast::<u8>()),
742                <Frame<'static> as Facet<'static>>::SHAPE,
743            )
744        };
745        let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(StableConduitError::Encode)?;
746
747        let mut slot = link_permit
748            .alloc(plan.total_size())
749            .map_err(StableConduitError::Io)?;
750        let slot_bytes = slot.as_mut_slice();
751        plan.write_into(slot_bytes);
752
753        // Keep an owned copy for replay after reconnect.
754        shared.lock_inner()?.replay.push(seq, slot_bytes.to_vec());
755        slot.commit();
756
757        Ok(())
758    }
759}
760
761// ---------------------------------------------------------------------------
762// Rx
763// ---------------------------------------------------------------------------
764
765pub struct StableConduitRx<F: MsgFamily, LS: LinkSource> {
766    shared: Arc<Shared<LS>>,
767    message_plan: Option<MessagePlan>,
768    _phantom: PhantomData<fn() -> F>,
769}
770
771impl<F: MsgFamily, LS: LinkSource> ConduitRx for StableConduitRx<F, LS>
772where
773    <LS::Link as Link>::Tx: Send + 'static,
774    <LS::Link as Link>::Rx: Send + 'static,
775    LS: Send + 'static,
776{
777    type Msg = F;
778    type Error = StableConduitError;
779
780    #[moire::instrument]
781    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
782        loop {
783            // Phase 1: take current Rx out of shared state, then await without locks held.
784            let (rx_opt, generation) = {
785                let mut inner = self.shared.lock_inner()?;
786                (inner.rx.take(), inner.link_generation)
787            }; // lock released here — no guard held across any await below
788            let mut rx = match rx_opt {
789                Some(rx) => rx,
790                None => {
791                    self.shared.ensure_reconnected(generation).await?;
792                    continue;
793                }
794            };
795
796            // Any link termination — graceful EOF or error — triggers reconnect.
797            // The session ends only when the LinkSource itself fails (no more
798            // links available), which surfaces as Err.
799            let recv_result = rx.recv().await;
800
801            // Put Rx back only if we're still on the same generation and no newer
802            // Rx has been installed by reconnect.
803            {
804                let mut inner = self.shared.lock_inner()?;
805                if inner.link_generation == generation && inner.rx.is_none() {
806                    inner.rx = Some(rx);
807                }
808            }
809
810            let backing = match recv_result {
811                Ok(Some(b)) => b,
812                Ok(None) | Err(_) => {
813                    // r[impl stable.reconnect]
814                    self.shared.ensure_reconnected(generation).await?;
815                    continue;
816                }
817            };
818
819            // Phase 2: deserialize the frame envelope (seq/ack + opaque payload bytes).
820            let frame: SelfRef<Frame<'static>> =
821                crate::deserialize_postcard(backing).map_err(StableConduitError::Decode)?;
822
823            // Phase 3: update shared state; skip duplicates.
824            // r[impl stable.seq.monotonic]
825            // r[impl stable.ack.trim]
826            let is_dup = {
827                let mut inner = self.shared.lock_inner()?;
828
829                if let Some(ack) = frame.ack {
830                    inner.replay.trim(ack);
831                }
832
833                let dup = inner.last_received.is_some_and(|prev| frame.seq <= prev);
834                if !dup {
835                    inner.last_received = Some(frame.seq);
836                }
837                dup
838            };
839
840            if is_dup {
841                continue;
842            }
843
844            // Phase 4: deserialize the message from the payload bytes
845            // using the message plan for schema-aware translation.
846            let item_bytes = match &frame.item {
847                Payload::PostcardBytes(bytes) => bytes,
848                _ => unreachable!("deserialized Payload should always be Incoming"),
849            };
850            let item_backing = vox_types::Backing::Boxed(item_bytes.to_vec().into());
851            let msg = match &self.message_plan {
852                Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
853                    item_backing,
854                    &plan.plan,
855                    &plan.registry,
856                ),
857                None => crate::deserialize_postcard::<F::Msg<'static>>(item_backing),
858            }
859            .map_err(StableConduitError::Decode)?;
860
861            return Ok(Some(msg));
862        }
863    }
864}
865
866// ---------------------------------------------------------------------------
867// Error
868// ---------------------------------------------------------------------------
869
870#[derive(Debug)]
871pub enum StableConduitError {
872    Encode(vox_postcard::SerializeError),
873    Decode(vox_postcard::DeserializeError),
874    Io(std::io::Error),
875    LinkDead,
876    Setup(String),
877    /// The server rejected our resume_key; the session is permanently lost.
878    // r[impl stable.reconnect.failure]
879    SessionLost,
880}
881
882impl std::fmt::Display for StableConduitError {
883    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
884        match self {
885            Self::Encode(e) => write!(f, "encode error: {e}"),
886            Self::Decode(e) => write!(f, "decode error: {e}"),
887            Self::Io(e) => write!(f, "io error: {e}"),
888            Self::LinkDead => write!(f, "link dead"),
889            Self::Setup(s) => write!(f, "setup error: {s}"),
890            Self::SessionLost => write!(f, "session lost: server rejected resume key"),
891        }
892    }
893}
894
895impl std::error::Error for StableConduitError {}
896
897// ---------------------------------------------------------------------------
898// Tests
899// ---------------------------------------------------------------------------
900
901#[cfg(test)]
902mod tests;