Skip to main content

vox_core/
transport_prologue.rs

1use vox_types::{Link, LinkRx, LinkTx, LinkTxPermit, SplitLink, TransportMode, WriteSlot};
2use zerocopy::FromBytes;
3use zerocopy::little_endian::U32 as LeU32;
4
5const TRANSPORT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOTH");
6const TRANSPORT_ACCEPT_MAGIC: u32 = u32::from_le_bytes(*b"VOTA");
7const TRANSPORT_REJECT_MAGIC: u32 = u32::from_le_bytes(*b"VOTR");
8const TRANSPORT_VERSION: u8 = 9;
9const REJECT_UNSUPPORTED_MODE: u8 = 1;
10
11fn transport_mode_as_u8(mode: TransportMode) -> u8 {
12    match mode {
13        TransportMode::Bare => 0,
14        TransportMode::Stable => 1,
15    }
16}
17
18fn transport_mode_from_u8(value: u8) -> Result<TransportMode, TransportPrologueError> {
19    match value {
20        0 => Ok(TransportMode::Bare),
21        1 => Ok(TransportMode::Stable),
22        _ => Err(TransportPrologueError::Protocol(format!(
23            "unknown conduit mode {value}"
24        ))),
25    }
26}
27
28#[derive(Debug)]
29pub enum TransportPrologueError {
30    Io(std::io::Error),
31    LinkDead,
32    Protocol(String),
33    Rejected(TransportRejectReason),
34}
35
36impl std::fmt::Display for TransportPrologueError {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::Io(error) => write!(f, "io error: {error}"),
40            Self::LinkDead => write!(f, "link closed during transport prologue"),
41            Self::Protocol(message) => write!(f, "protocol error: {message}"),
42            Self::Rejected(reason) => write!(f, "transport rejected: {reason}"),
43        }
44    }
45}
46
47impl std::error::Error for TransportPrologueError {}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum TransportRejectReason {
51    UnsupportedMode,
52}
53
54impl std::fmt::Display for TransportRejectReason {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        match self {
57            Self::UnsupportedMode => write!(f, "unsupported conduit mode"),
58        }
59    }
60}
61
62#[derive(
63    Clone,
64    Copy,
65    zerocopy::FromBytes,
66    zerocopy::IntoBytes,
67    zerocopy::KnownLayout,
68    zerocopy::Immutable,
69)]
70#[repr(C)]
71struct TransportHello {
72    magic: LeU32,
73    version: u8,
74    requested_mode: u8,
75    reserved: [u8; 2],
76}
77
78#[derive(
79    Clone,
80    Copy,
81    zerocopy::FromBytes,
82    zerocopy::IntoBytes,
83    zerocopy::KnownLayout,
84    zerocopy::Immutable,
85)]
86#[repr(C)]
87struct TransportAccept {
88    magic: LeU32,
89    version: u8,
90    selected_mode: u8,
91    reserved: [u8; 2],
92}
93
94#[derive(
95    Clone,
96    Copy,
97    zerocopy::FromBytes,
98    zerocopy::IntoBytes,
99    zerocopy::KnownLayout,
100    zerocopy::Immutable,
101)]
102#[repr(C)]
103struct TransportReject {
104    magic: LeU32,
105    version: u8,
106    reason: u8,
107    reserved: [u8; 2],
108}
109
110pub async fn initiate_transport<L: Link>(
111    link: L,
112    requested_mode: TransportMode,
113) -> Result<SplitLink<L::Tx, L::Rx>, TransportPrologueError> {
114    if !L::supports_transport_mode(requested_mode) {
115        return Err(TransportPrologueError::Rejected(
116            TransportRejectReason::UnsupportedMode,
117        ));
118    }
119
120    let (tx, mut rx) = link.split();
121    let hello = TransportHello {
122        magic: LeU32::new(TRANSPORT_HELLO_MAGIC),
123        version: TRANSPORT_VERSION,
124        requested_mode: transport_mode_as_u8(requested_mode),
125        reserved: [0; 2],
126    };
127    send_message(&tx, &hello).await?;
128
129    let raw = recv_bytes(&mut rx).await?;
130    let bytes = raw.as_bytes();
131    let magic = bytes
132        .get(..4)
133        .and_then(|prefix| prefix.try_into().ok())
134        .map(u32::from_le_bytes)
135        .ok_or_else(|| {
136            TransportPrologueError::Protocol("transport prologue message size mismatch".into())
137        })?;
138
139    if magic == TRANSPORT_ACCEPT_MAGIC {
140        let accept = TransportAccept::read_from_bytes(bytes).map_err(|_| {
141            TransportPrologueError::Protocol("transport prologue message size mismatch".into())
142        })?;
143        if accept.version != TRANSPORT_VERSION {
144            return Err(TransportPrologueError::Protocol(format!(
145                "unsupported transport version {}",
146                accept.version
147            )));
148        }
149        let selected_mode = transport_mode_from_u8(accept.selected_mode)?;
150        if selected_mode != requested_mode {
151            return Err(TransportPrologueError::Protocol(format!(
152                "transport selected {selected_mode:?}, requested {requested_mode:?}"
153            )));
154        }
155        return Ok(SplitLink { tx, rx });
156    }
157
158    if magic == TRANSPORT_REJECT_MAGIC {
159        let reject = TransportReject::read_from_bytes(bytes).map_err(|_| {
160            TransportPrologueError::Protocol("transport prologue message size mismatch".into())
161        })?;
162        if reject.version != TRANSPORT_VERSION {
163            return Err(TransportPrologueError::Protocol(format!(
164                "unsupported transport version {}",
165                reject.version
166            )));
167        }
168        let reason = match reject.reason {
169            REJECT_UNSUPPORTED_MODE => TransportRejectReason::UnsupportedMode,
170            other => {
171                return Err(TransportPrologueError::Protocol(format!(
172                    "unknown transport reject reason {other}"
173                )));
174            }
175        };
176        return Err(TransportPrologueError::Rejected(reason));
177    }
178
179    Err(TransportPrologueError::Protocol(
180        "expected TransportAccept or TransportReject".into(),
181    ))
182}
183
184pub async fn accept_transport<L: Link>(
185    link: L,
186) -> Result<(TransportMode, SplitLink<L::Tx, L::Rx>), TransportPrologueError> {
187    let (tx, mut rx) = link.split();
188    let hello = recv_message::<_, TransportHello>(&mut rx).await?;
189    if hello.magic.get() != TRANSPORT_HELLO_MAGIC {
190        return Err(TransportPrologueError::Protocol(
191            "transport hello magic mismatch".into(),
192        ));
193    }
194    if hello.version != TRANSPORT_VERSION {
195        return Err(TransportPrologueError::Protocol(format!(
196            "unsupported transport version {}",
197            hello.version
198        )));
199    }
200    let requested_mode = transport_mode_from_u8(hello.requested_mode)?;
201    if !L::supports_transport_mode(requested_mode) {
202        reject_transport(&tx, TransportRejectReason::UnsupportedMode).await?;
203        return Err(TransportPrologueError::Rejected(
204            TransportRejectReason::UnsupportedMode,
205        ));
206    }
207
208    let accept = TransportAccept {
209        magic: LeU32::new(TRANSPORT_ACCEPT_MAGIC),
210        version: TRANSPORT_VERSION,
211        selected_mode: transport_mode_as_u8(requested_mode),
212        reserved: [0; 2],
213    };
214    send_message(&tx, &accept).await?;
215    Ok((requested_mode, SplitLink { tx, rx }))
216}
217
218pub async fn reject_transport<L: LinkTx>(
219    tx: &L,
220    reason: TransportRejectReason,
221) -> Result<(), TransportPrologueError> {
222    let code = match reason {
223        TransportRejectReason::UnsupportedMode => REJECT_UNSUPPORTED_MODE,
224    };
225    let reject = TransportReject {
226        magic: LeU32::new(TRANSPORT_REJECT_MAGIC),
227        version: TRANSPORT_VERSION,
228        reason: code,
229        reserved: [0; 2],
230    };
231    send_message(tx, &reject).await
232}
233
234async fn send_message<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
235    tx: &LTx,
236    message: &M,
237) -> Result<(), TransportPrologueError> {
238    let bytes = message.as_bytes();
239    let permit = tx.reserve().await.map_err(TransportPrologueError::Io)?;
240    let mut slot = permit
241        .alloc(bytes.len())
242        .map_err(TransportPrologueError::Io)?;
243    slot.as_mut_slice().copy_from_slice(bytes);
244    slot.commit();
245    Ok(())
246}
247
248async fn recv_message<
249    LRx: LinkRx,
250    M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
251>(
252    rx: &mut LRx,
253) -> Result<M, TransportPrologueError> {
254    let raw = recv_bytes(rx).await?;
255    M::read_from_bytes(raw.as_bytes()).map_err(|_| {
256        TransportPrologueError::Protocol("transport prologue message size mismatch".into())
257    })
258}
259
260async fn recv_bytes<LRx: LinkRx>(
261    rx: &mut LRx,
262) -> Result<vox_types::Backing, TransportPrologueError> {
263    rx.recv()
264        .await
265        .map_err(|error| {
266            TransportPrologueError::Protocol(format!("transport recv failed: {error}"))
267        })?
268        .ok_or(TransportPrologueError::LinkDead)
269}