1use vox_types::{Link, LinkRx, LinkTx, LinkTxPermit, SplitLink, TransportMode, WriteSlot};
2use zerocopy::FromBytes;
3use zerocopy::little_endian::U32 as LeU32;
4
5const TRANSPORT_HELLO_MAGIC: u32 = u32::from_le_bytes(*b"VOTH");
6const TRANSPORT_ACCEPT_MAGIC: u32 = u32::from_le_bytes(*b"VOTA");
7const TRANSPORT_REJECT_MAGIC: u32 = u32::from_le_bytes(*b"VOTR");
8const TRANSPORT_VERSION: u8 = 9;
9const REJECT_UNSUPPORTED_MODE: u8 = 1;
10
11fn transport_mode_as_u8(mode: TransportMode) -> u8 {
12 match mode {
13 TransportMode::Bare => 0,
14 TransportMode::Stable => 1,
15 }
16}
17
18fn transport_mode_from_u8(value: u8) -> Result<TransportMode, TransportPrologueError> {
19 match value {
20 0 => Ok(TransportMode::Bare),
21 1 => Ok(TransportMode::Stable),
22 _ => Err(TransportPrologueError::Protocol(format!(
23 "unknown conduit mode {value}"
24 ))),
25 }
26}
27
28#[derive(Debug)]
29pub enum TransportPrologueError {
30 Io(std::io::Error),
31 LinkDead,
32 Protocol(String),
33 Rejected(TransportRejectReason),
34}
35
36impl std::fmt::Display for TransportPrologueError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::Io(error) => write!(f, "io error: {error}"),
40 Self::LinkDead => write!(f, "link closed during transport prologue"),
41 Self::Protocol(message) => write!(f, "protocol error: {message}"),
42 Self::Rejected(reason) => write!(f, "transport rejected: {reason}"),
43 }
44 }
45}
46
47impl std::error::Error for TransportPrologueError {}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum TransportRejectReason {
51 UnsupportedMode,
52}
53
54impl std::fmt::Display for TransportRejectReason {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 Self::UnsupportedMode => write!(f, "unsupported conduit mode"),
58 }
59 }
60}
61
62#[derive(
63 Clone,
64 Copy,
65 zerocopy::FromBytes,
66 zerocopy::IntoBytes,
67 zerocopy::KnownLayout,
68 zerocopy::Immutable,
69)]
70#[repr(C)]
71struct TransportHello {
72 magic: LeU32,
73 version: u8,
74 requested_mode: u8,
75 reserved: [u8; 2],
76}
77
78#[derive(
79 Clone,
80 Copy,
81 zerocopy::FromBytes,
82 zerocopy::IntoBytes,
83 zerocopy::KnownLayout,
84 zerocopy::Immutable,
85)]
86#[repr(C)]
87struct TransportAccept {
88 magic: LeU32,
89 version: u8,
90 selected_mode: u8,
91 reserved: [u8; 2],
92}
93
94#[derive(
95 Clone,
96 Copy,
97 zerocopy::FromBytes,
98 zerocopy::IntoBytes,
99 zerocopy::KnownLayout,
100 zerocopy::Immutable,
101)]
102#[repr(C)]
103struct TransportReject {
104 magic: LeU32,
105 version: u8,
106 reason: u8,
107 reserved: [u8; 2],
108}
109
110pub async fn initiate_transport<L: Link>(
111 link: L,
112 requested_mode: TransportMode,
113) -> Result<SplitLink<L::Tx, L::Rx>, TransportPrologueError> {
114 if !L::supports_transport_mode(requested_mode) {
115 return Err(TransportPrologueError::Rejected(
116 TransportRejectReason::UnsupportedMode,
117 ));
118 }
119
120 let (tx, mut rx) = link.split();
121 let hello = TransportHello {
122 magic: LeU32::new(TRANSPORT_HELLO_MAGIC),
123 version: TRANSPORT_VERSION,
124 requested_mode: transport_mode_as_u8(requested_mode),
125 reserved: [0; 2],
126 };
127 send_message(&tx, &hello).await?;
128
129 let raw = recv_bytes(&mut rx).await?;
130 let bytes = raw.as_bytes();
131 let magic = bytes
132 .get(..4)
133 .and_then(|prefix| prefix.try_into().ok())
134 .map(u32::from_le_bytes)
135 .ok_or_else(|| {
136 TransportPrologueError::Protocol("transport prologue message size mismatch".into())
137 })?;
138
139 if magic == TRANSPORT_ACCEPT_MAGIC {
140 let accept = TransportAccept::read_from_bytes(bytes).map_err(|_| {
141 TransportPrologueError::Protocol("transport prologue message size mismatch".into())
142 })?;
143 if accept.version != TRANSPORT_VERSION {
144 return Err(TransportPrologueError::Protocol(format!(
145 "unsupported transport version {}",
146 accept.version
147 )));
148 }
149 let selected_mode = transport_mode_from_u8(accept.selected_mode)?;
150 if selected_mode != requested_mode {
151 return Err(TransportPrologueError::Protocol(format!(
152 "transport selected {selected_mode:?}, requested {requested_mode:?}"
153 )));
154 }
155 return Ok(SplitLink { tx, rx });
156 }
157
158 if magic == TRANSPORT_REJECT_MAGIC {
159 let reject = TransportReject::read_from_bytes(bytes).map_err(|_| {
160 TransportPrologueError::Protocol("transport prologue message size mismatch".into())
161 })?;
162 if reject.version != TRANSPORT_VERSION {
163 return Err(TransportPrologueError::Protocol(format!(
164 "unsupported transport version {}",
165 reject.version
166 )));
167 }
168 let reason = match reject.reason {
169 REJECT_UNSUPPORTED_MODE => TransportRejectReason::UnsupportedMode,
170 other => {
171 return Err(TransportPrologueError::Protocol(format!(
172 "unknown transport reject reason {other}"
173 )));
174 }
175 };
176 return Err(TransportPrologueError::Rejected(reason));
177 }
178
179 Err(TransportPrologueError::Protocol(
180 "expected TransportAccept or TransportReject".into(),
181 ))
182}
183
184pub async fn accept_transport<L: Link>(
185 link: L,
186) -> Result<(TransportMode, SplitLink<L::Tx, L::Rx>), TransportPrologueError> {
187 let (tx, mut rx) = link.split();
188 let hello = recv_message::<_, TransportHello>(&mut rx).await?;
189 if hello.magic.get() != TRANSPORT_HELLO_MAGIC {
190 return Err(TransportPrologueError::Protocol(
191 "transport hello magic mismatch".into(),
192 ));
193 }
194 if hello.version != TRANSPORT_VERSION {
195 return Err(TransportPrologueError::Protocol(format!(
196 "unsupported transport version {}",
197 hello.version
198 )));
199 }
200 let requested_mode = transport_mode_from_u8(hello.requested_mode)?;
201 if !L::supports_transport_mode(requested_mode) {
202 reject_transport(&tx, TransportRejectReason::UnsupportedMode).await?;
203 return Err(TransportPrologueError::Rejected(
204 TransportRejectReason::UnsupportedMode,
205 ));
206 }
207
208 let accept = TransportAccept {
209 magic: LeU32::new(TRANSPORT_ACCEPT_MAGIC),
210 version: TRANSPORT_VERSION,
211 selected_mode: transport_mode_as_u8(requested_mode),
212 reserved: [0; 2],
213 };
214 send_message(&tx, &accept).await?;
215 Ok((requested_mode, SplitLink { tx, rx }))
216}
217
218pub async fn reject_transport<L: LinkTx>(
219 tx: &L,
220 reason: TransportRejectReason,
221) -> Result<(), TransportPrologueError> {
222 let code = match reason {
223 TransportRejectReason::UnsupportedMode => REJECT_UNSUPPORTED_MODE,
224 };
225 let reject = TransportReject {
226 magic: LeU32::new(TRANSPORT_REJECT_MAGIC),
227 version: TRANSPORT_VERSION,
228 reason: code,
229 reserved: [0; 2],
230 };
231 send_message(tx, &reject).await
232}
233
234async fn send_message<LTx: LinkTx, M: zerocopy::IntoBytes + zerocopy::Immutable>(
235 tx: <x,
236 message: &M,
237) -> Result<(), TransportPrologueError> {
238 let bytes = message.as_bytes();
239 let permit = tx.reserve().await.map_err(TransportPrologueError::Io)?;
240 let mut slot = permit
241 .alloc(bytes.len())
242 .map_err(TransportPrologueError::Io)?;
243 slot.as_mut_slice().copy_from_slice(bytes);
244 slot.commit();
245 Ok(())
246}
247
248async fn recv_message<
249 LRx: LinkRx,
250 M: zerocopy::FromBytes + zerocopy::KnownLayout + zerocopy::Immutable,
251>(
252 rx: &mut LRx,
253) -> Result<M, TransportPrologueError> {
254 let raw = recv_bytes(rx).await?;
255 M::read_from_bytes(raw.as_bytes()).map_err(|_| {
256 TransportPrologueError::Protocol("transport prologue message size mismatch".into())
257 })
258}
259
260async fn recv_bytes<LRx: LinkRx>(
261 rx: &mut LRx,
262) -> Result<vox_types::Backing, TransportPrologueError> {
263 rx.recv()
264 .await
265 .map_err(|error| {
266 TransportPrologueError::Protocol(format!("transport recv failed: {error}"))
267 })?
268 .ok_or(TransportPrologueError::LinkDead)
269}