sim_lib_server/transport/
framing.rs1use std::io::{ErrorKind, Read, Write};
2use std::sync::Arc;
3
4use sim_codec_binary::{decode_frame, encode_frame};
5use sim_kernel::{Cx, Error, Expr, Result, Symbol};
6
7use crate::{EvalSite, FrameEnvelope, FrameKind, ServerFrame};
8
9mod header;
10
11pub(crate) use header::{endpoint_key, frame_from_header_expr, frame_header_expr};
12
13const FRAME_PREFIX_BYTES: usize = 8;
14
15pub(crate) fn negotiate_codec(
16 cx: &mut Cx,
17 transport: &mut dyn super::ConnectionTransport,
18 offered_codecs: &[Symbol],
19) -> Result<Symbol> {
20 let _ = select_default_codec(offered_codecs)?;
21 let request = ServerFrame {
22 version: 1,
23 codec: Symbol::qualified("codec", "binary"),
24 msg_id: Some(1),
25 correlate: None,
26 kind: FrameKind::Negotiate {
27 codecs: offered_codecs.to_vec(),
28 },
29 envelope: FrameEnvelope::default(),
30 payload: Vec::new(),
31 };
32 transport.send_frame(cx, request)?;
33 let reply = transport
34 .recv_frame(cx, None)?
35 .ok_or_else(|| Error::HostError("transport negotiation returned no reply".to_owned()))?;
36 match reply.kind {
37 FrameKind::Negotiate { codecs } => codecs
38 .first()
39 .cloned()
40 .ok_or_else(|| Error::HostError("transport negotiation returned no codec".to_owned())),
41 other => Err(Error::HostError(format!(
42 "transport negotiation expected negotiate reply, found {}",
43 other.as_symbol()
44 ))),
45 }
46}
47
48pub(crate) fn select_default_codec(offered_codecs: &[Symbol]) -> Result<Symbol> {
49 if offered_codecs.is_empty() {
50 return Err(Error::Eval(
51 "transport negotiation requires at least one offered codec".to_owned(),
52 ));
53 }
54 offered_codecs
55 .iter()
56 .find(|codec| **codec == Symbol::qualified("codec", "binary"))
57 .cloned()
58 .or_else(|| offered_codecs.first().cloned())
59 .ok_or_else(|| {
60 Error::Eval("transport negotiation requires at least one offered codec".to_owned())
61 })
62}
63
64pub fn encode_transport_frame(frame: &ServerFrame) -> Result<Vec<u8>> {
70 let header = encode_frame(&frame_header_expr(frame))?.0;
71 if header.len() > super::MAX_TRANSPORT_FRAME_BYTES
72 || frame.payload.len() > super::MAX_TRANSPORT_FRAME_BYTES
73 {
74 return Err(Error::HostError(
75 "transport frame exceeds size limit".to_owned(),
76 ));
77 }
78
79 let header_len = u32::try_from(header.len())
80 .map_err(|_| Error::HostError("transport header exceeds u32".to_owned()))?;
81 let payload_len = u32::try_from(frame.payload.len())
82 .map_err(|_| Error::HostError("transport payload exceeds u32".to_owned()))?;
83
84 let mut bytes = Vec::with_capacity(FRAME_PREFIX_BYTES + header.len() + frame.payload.len());
85 bytes.extend_from_slice(&header_len.to_be_bytes());
86 bytes.extend_from_slice(&payload_len.to_be_bytes());
87 bytes.extend_from_slice(&header);
88 bytes.extend_from_slice(&frame.payload);
89 Ok(bytes)
90}
91
92#[cfg_attr(not(test), allow(dead_code))]
93pub(crate) fn write_frame_to<W: Write>(writer: &mut W, frame: &ServerFrame) -> Result<()> {
94 let bytes = encode_transport_frame(frame)?;
95 writer.write_all(&bytes).map_err(io_to_host)?;
96 writer.flush().map_err(io_to_host)
97}
98
99pub fn decode_transport_frame(bytes: &[u8]) -> Result<ServerFrame> {
105 if bytes.len() < FRAME_PREFIX_BYTES {
106 return Err(Error::HostError(
107 "truncated transport frame prefix".to_owned(),
108 ));
109 }
110
111 let header_len = u32::from_be_bytes(bytes[0..4].try_into().expect("prefix slice")) as usize;
112 let payload_len = u32::from_be_bytes(bytes[4..8].try_into().expect("prefix slice")) as usize;
113 if header_len > super::MAX_TRANSPORT_FRAME_BYTES
114 || payload_len > super::MAX_TRANSPORT_FRAME_BYTES
115 {
116 return Err(Error::HostError(
117 "transport frame exceeds size limit".to_owned(),
118 ));
119 }
120
121 let total_len = FRAME_PREFIX_BYTES
122 .checked_add(header_len)
123 .and_then(|value| value.checked_add(payload_len))
124 .ok_or_else(|| Error::HostError("transport frame length overflow".to_owned()))?;
125 if bytes.len() < total_len {
126 return Err(Error::HostError(
127 "truncated transport frame body".to_owned(),
128 ));
129 }
130
131 let header_bytes = &bytes[FRAME_PREFIX_BYTES..FRAME_PREFIX_BYTES + header_len];
132 let payload = bytes
133 [FRAME_PREFIX_BYTES + header_len..FRAME_PREFIX_BYTES + header_len + payload_len]
134 .to_vec();
135 let (_, header) = decode_frame(sim_kernel::CodecId(0), header_bytes)?;
136 frame_from_header_expr(&header, payload)
137}
138
139#[cfg_attr(not(test), allow(dead_code))]
140pub(crate) fn read_frame_from<R: Read>(reader: &mut R) -> Result<Option<ServerFrame>> {
141 let mut prefix = [0u8; FRAME_PREFIX_BYTES];
142 match read_exact_or_eof(reader, &mut prefix)? {
143 ReadOutcome::Eof => return Ok(None),
144 ReadOutcome::Filled => {}
145 }
146
147 let header_len = u32::from_be_bytes(prefix[0..4].try_into().expect("prefix slice")) as usize;
148 let payload_len = u32::from_be_bytes(prefix[4..8].try_into().expect("prefix slice")) as usize;
149 if header_len > super::MAX_TRANSPORT_FRAME_BYTES
150 || payload_len > super::MAX_TRANSPORT_FRAME_BYTES
151 {
152 return Err(Error::HostError(
153 "transport frame exceeds size limit".to_owned(),
154 ));
155 }
156
157 let body_len = header_len
158 .checked_add(payload_len)
159 .ok_or_else(|| Error::HostError("transport frame length overflow".to_owned()))?;
160 let mut body = vec![0u8; body_len];
161 reader.read_exact(&mut body).map_err(io_to_host)?;
162
163 let mut frame = Vec::with_capacity(FRAME_PREFIX_BYTES + body.len());
164 frame.extend_from_slice(&prefix);
165 frame.extend_from_slice(&body);
166 decode_transport_frame(&frame).map(Some)
167}
168
169pub(crate) fn answer_or_negotiate(
170 cx: &mut Cx,
171 site: &Arc<dyn EvalSite>,
172 frame: ServerFrame,
173) -> Result<ServerFrame> {
174 match &frame.kind {
175 FrameKind::Negotiate { codecs } => {
176 let selected = codecs
177 .iter()
178 .find(|codec| site.codecs().iter().any(|installed| installed == *codec))
179 .cloned()
180 .ok_or_else(|| {
181 Error::Eval("transport negotiation found no shared codec".to_owned())
182 })?;
183 Ok(ServerFrame {
184 version: frame.version,
185 codec: selected.clone(),
186 msg_id: None,
187 correlate: frame.msg_id,
188 kind: FrameKind::Negotiate {
189 codecs: vec![selected],
190 },
191 envelope: FrameEnvelope::default(),
192 payload: Vec::new(),
193 })
194 }
195 _ => site.answer(cx, frame),
196 }
197}
198
199pub(crate) fn route_frame_bytes(
200 cx: &mut Cx,
201 site: &Arc<dyn EvalSite>,
202 bytes: &[u8],
203) -> Result<Vec<u8>> {
204 let frame = decode_transport_frame(bytes)?;
205 let reply = answer_or_negotiate(cx, site, frame)?;
206 encode_transport_frame(&reply)
207}
208
209pub(crate) fn update_negotiated_codec_from_reply(
210 runtime: &Arc<crate::ServerRuntime>,
211 session_id: u64,
212 frame: &ServerFrame,
213 reply: &ServerFrame,
214) -> Result<()> {
215 if !matches!(frame.kind, FrameKind::Negotiate { .. }) {
216 return Ok(());
217 }
218 let FrameKind::Negotiate { codecs } = &reply.kind else {
219 return Ok(());
220 };
221 let Some(selected) = codecs.first() else {
222 return Ok(());
223 };
224 runtime.update_session_codec(session_id, selected.clone())
225}
226
227pub(crate) fn error_frame_from_error(
228 cx: &mut Cx,
229 frame: &ServerFrame,
230 error: &Error,
231) -> Result<ServerFrame> {
232 let codec = match &frame.envelope.reply_codec_hint {
233 Some(hint) if cx.registry().codec_by_symbol(hint).is_some() => hint.clone(),
234 _ => frame.codec.clone(),
235 };
236 let mut reply = ServerFrame::from_expr(
237 cx,
238 codec,
239 FrameKind::Error,
240 &Expr::String(error.to_string()),
241 frame.envelope.consistency,
242 Vec::new(),
243 false,
244 )?;
245 reply.correlate = frame.msg_id;
246 Ok(reply)
247}
248
249pub(crate) fn io_to_host(error: std::io::Error) -> Error {
250 Error::HostError(format!("io {:?}: {}", error.kind(), error))
251}
252
253pub(crate) fn is_timeout(error: &Error) -> bool {
254 matches!(error, Error::HostError(message) if message.contains("TimedOut") || message.contains("WouldBlock"))
255}
256
257pub(crate) enum ReadOutcome {
258 Eof,
259 Filled,
260}
261
262pub(crate) fn read_exact_or_eof<R: Read>(
263 reader: &mut R,
264 mut buffer: &mut [u8],
265) -> Result<ReadOutcome> {
266 let mut read_any = false;
267 while !buffer.is_empty() {
268 match reader.read(buffer) {
269 Ok(0) if !read_any => return Ok(ReadOutcome::Eof),
270 Ok(0) => {
271 return Err(Error::HostError(
272 "truncated transport frame prefix".to_owned(),
273 ));
274 }
275 Ok(read) => {
276 read_any = true;
277 let (_, rest) = buffer.split_at_mut(read);
278 buffer = rest;
279 }
280 Err(error) if error.kind() == ErrorKind::Interrupted => {}
281 Err(error) => return Err(io_to_host(error)),
282 }
283 }
284 Ok(ReadOutcome::Filled)
285}