Skip to main content

sim_lib_server/transport/
framing.rs

1use std::io::{ErrorKind, Read, Write};
2use std::sync::Arc;
3
4use sim_codec_binary::{decode_frame, encode_frame};
5use sim_kernel::{Cx, Error, Expr, Result, Symbol};
6
7use crate::{EvalSite, FrameEnvelope, FrameKind, ServerFrame};
8
9mod header;
10
11pub(crate) use header::{endpoint_key, frame_from_header_expr, frame_header_expr};
12
13const FRAME_PREFIX_BYTES: usize = 8;
14
15pub(crate) fn negotiate_codec(
16    cx: &mut Cx,
17    transport: &mut dyn super::ConnectionTransport,
18    offered_codecs: &[Symbol],
19) -> Result<Symbol> {
20    let _ = select_default_codec(offered_codecs)?;
21    let request = ServerFrame {
22        version: 1,
23        codec: Symbol::qualified("codec", "binary"),
24        msg_id: Some(1),
25        correlate: None,
26        kind: FrameKind::Negotiate {
27            codecs: offered_codecs.to_vec(),
28        },
29        envelope: FrameEnvelope::default(),
30        payload: Vec::new(),
31    };
32    transport.send_frame(cx, request)?;
33    let reply = transport
34        .recv_frame(cx, None)?
35        .ok_or_else(|| Error::HostError("transport negotiation returned no reply".to_owned()))?;
36    match reply.kind {
37        FrameKind::Negotiate { codecs } => codecs
38            .first()
39            .cloned()
40            .ok_or_else(|| Error::HostError("transport negotiation returned no codec".to_owned())),
41        other => Err(Error::HostError(format!(
42            "transport negotiation expected negotiate reply, found {}",
43            other.as_symbol()
44        ))),
45    }
46}
47
48pub(crate) fn select_default_codec(offered_codecs: &[Symbol]) -> Result<Symbol> {
49    if offered_codecs.is_empty() {
50        return Err(Error::Eval(
51            "transport negotiation requires at least one offered codec".to_owned(),
52        ));
53    }
54    offered_codecs
55        .iter()
56        .find(|codec| **codec == Symbol::qualified("codec", "binary"))
57        .cloned()
58        .or_else(|| offered_codecs.first().cloned())
59        .ok_or_else(|| {
60            Error::Eval("transport negotiation requires at least one offered codec".to_owned())
61        })
62}
63
64/// Encodes a frame into its length-prefixed transport byte form.
65///
66/// Lays out a fixed prefix of big-endian header and payload lengths followed by
67/// the encoded header and the raw payload; errors if either part exceeds the
68/// transport size limit.
69pub fn encode_transport_frame(frame: &ServerFrame) -> Result<Vec<u8>> {
70    let header = encode_frame(&frame_header_expr(frame))?.0;
71    if header.len() > super::MAX_TRANSPORT_FRAME_BYTES
72        || frame.payload.len() > super::MAX_TRANSPORT_FRAME_BYTES
73    {
74        return Err(Error::HostError(
75            "transport frame exceeds size limit".to_owned(),
76        ));
77    }
78
79    let header_len = u32::try_from(header.len())
80        .map_err(|_| Error::HostError("transport header exceeds u32".to_owned()))?;
81    let payload_len = u32::try_from(frame.payload.len())
82        .map_err(|_| Error::HostError("transport payload exceeds u32".to_owned()))?;
83
84    let mut bytes = Vec::with_capacity(FRAME_PREFIX_BYTES + header.len() + frame.payload.len());
85    bytes.extend_from_slice(&header_len.to_be_bytes());
86    bytes.extend_from_slice(&payload_len.to_be_bytes());
87    bytes.extend_from_slice(&header);
88    bytes.extend_from_slice(&frame.payload);
89    Ok(bytes)
90}
91
92#[cfg_attr(not(test), allow(dead_code))]
93pub(crate) fn write_frame_to<W: Write>(writer: &mut W, frame: &ServerFrame) -> Result<()> {
94    let bytes = encode_transport_frame(frame)?;
95    writer.write_all(&bytes).map_err(io_to_host)?;
96    writer.flush().map_err(io_to_host)
97}
98
99/// Decodes a length-prefixed transport frame from `bytes`.
100///
101/// Reads the header and payload lengths from the prefix, validates them against
102/// the transport size limit, and reconstructs the [`ServerFrame`]; errors on a
103/// truncated or oversized buffer.
104pub fn decode_transport_frame(bytes: &[u8]) -> Result<ServerFrame> {
105    if bytes.len() < FRAME_PREFIX_BYTES {
106        return Err(Error::HostError(
107            "truncated transport frame prefix".to_owned(),
108        ));
109    }
110
111    let header_len = u32::from_be_bytes(bytes[0..4].try_into().expect("prefix slice")) as usize;
112    let payload_len = u32::from_be_bytes(bytes[4..8].try_into().expect("prefix slice")) as usize;
113    if header_len > super::MAX_TRANSPORT_FRAME_BYTES
114        || payload_len > super::MAX_TRANSPORT_FRAME_BYTES
115    {
116        return Err(Error::HostError(
117            "transport frame exceeds size limit".to_owned(),
118        ));
119    }
120
121    let total_len = FRAME_PREFIX_BYTES
122        .checked_add(header_len)
123        .and_then(|value| value.checked_add(payload_len))
124        .ok_or_else(|| Error::HostError("transport frame length overflow".to_owned()))?;
125    if bytes.len() < total_len {
126        return Err(Error::HostError(
127            "truncated transport frame body".to_owned(),
128        ));
129    }
130
131    let header_bytes = &bytes[FRAME_PREFIX_BYTES..FRAME_PREFIX_BYTES + header_len];
132    let payload = bytes
133        [FRAME_PREFIX_BYTES + header_len..FRAME_PREFIX_BYTES + header_len + payload_len]
134        .to_vec();
135    let (_, header) = decode_frame(sim_kernel::CodecId(0), header_bytes)?;
136    frame_from_header_expr(&header, payload)
137}
138
139#[cfg_attr(not(test), allow(dead_code))]
140pub(crate) fn read_frame_from<R: Read>(reader: &mut R) -> Result<Option<ServerFrame>> {
141    let mut prefix = [0u8; FRAME_PREFIX_BYTES];
142    match read_exact_or_eof(reader, &mut prefix)? {
143        ReadOutcome::Eof => return Ok(None),
144        ReadOutcome::Filled => {}
145    }
146
147    let header_len = u32::from_be_bytes(prefix[0..4].try_into().expect("prefix slice")) as usize;
148    let payload_len = u32::from_be_bytes(prefix[4..8].try_into().expect("prefix slice")) as usize;
149    if header_len > super::MAX_TRANSPORT_FRAME_BYTES
150        || payload_len > super::MAX_TRANSPORT_FRAME_BYTES
151    {
152        return Err(Error::HostError(
153            "transport frame exceeds size limit".to_owned(),
154        ));
155    }
156
157    let body_len = header_len
158        .checked_add(payload_len)
159        .ok_or_else(|| Error::HostError("transport frame length overflow".to_owned()))?;
160    let mut body = vec![0u8; body_len];
161    reader.read_exact(&mut body).map_err(io_to_host)?;
162
163    let mut frame = Vec::with_capacity(FRAME_PREFIX_BYTES + body.len());
164    frame.extend_from_slice(&prefix);
165    frame.extend_from_slice(&body);
166    decode_transport_frame(&frame).map(Some)
167}
168
169pub(crate) fn answer_or_negotiate(
170    cx: &mut Cx,
171    site: &Arc<dyn EvalSite>,
172    frame: ServerFrame,
173) -> Result<ServerFrame> {
174    match &frame.kind {
175        FrameKind::Negotiate { codecs } => {
176            let selected = codecs
177                .iter()
178                .find(|codec| site.codecs().iter().any(|installed| installed == *codec))
179                .cloned()
180                .ok_or_else(|| {
181                    Error::Eval("transport negotiation found no shared codec".to_owned())
182                })?;
183            Ok(ServerFrame {
184                version: frame.version,
185                codec: selected.clone(),
186                msg_id: None,
187                correlate: frame.msg_id,
188                kind: FrameKind::Negotiate {
189                    codecs: vec![selected],
190                },
191                envelope: FrameEnvelope::default(),
192                payload: Vec::new(),
193            })
194        }
195        _ => site.answer(cx, frame),
196    }
197}
198
199pub(crate) fn route_frame_bytes(
200    cx: &mut Cx,
201    site: &Arc<dyn EvalSite>,
202    bytes: &[u8],
203) -> Result<Vec<u8>> {
204    let frame = decode_transport_frame(bytes)?;
205    let reply = answer_or_negotiate(cx, site, frame)?;
206    encode_transport_frame(&reply)
207}
208
209pub(crate) fn update_negotiated_codec_from_reply(
210    runtime: &Arc<crate::ServerRuntime>,
211    session_id: u64,
212    frame: &ServerFrame,
213    reply: &ServerFrame,
214) -> Result<()> {
215    if !matches!(frame.kind, FrameKind::Negotiate { .. }) {
216        return Ok(());
217    }
218    let FrameKind::Negotiate { codecs } = &reply.kind else {
219        return Ok(());
220    };
221    let Some(selected) = codecs.first() else {
222        return Ok(());
223    };
224    runtime.update_session_codec(session_id, selected.clone())
225}
226
227pub(crate) fn error_frame_from_error(
228    cx: &mut Cx,
229    frame: &ServerFrame,
230    error: &Error,
231) -> Result<ServerFrame> {
232    let codec = match &frame.envelope.reply_codec_hint {
233        Some(hint) if cx.registry().codec_by_symbol(hint).is_some() => hint.clone(),
234        _ => frame.codec.clone(),
235    };
236    let mut reply = ServerFrame::from_expr(
237        cx,
238        codec,
239        FrameKind::Error,
240        &Expr::String(error.to_string()),
241        frame.envelope.consistency,
242        Vec::new(),
243        false,
244    )?;
245    reply.correlate = frame.msg_id;
246    Ok(reply)
247}
248
249pub(crate) fn io_to_host(error: std::io::Error) -> Error {
250    Error::HostError(format!("io {:?}: {}", error.kind(), error))
251}
252
253pub(crate) fn is_timeout(error: &Error) -> bool {
254    matches!(error, Error::HostError(message) if message.contains("TimedOut") || message.contains("WouldBlock"))
255}
256
257pub(crate) enum ReadOutcome {
258    Eof,
259    Filled,
260}
261
262pub(crate) fn read_exact_or_eof<R: Read>(
263    reader: &mut R,
264    mut buffer: &mut [u8],
265) -> Result<ReadOutcome> {
266    let mut read_any = false;
267    while !buffer.is_empty() {
268        match reader.read(buffer) {
269            Ok(0) if !read_any => return Ok(ReadOutcome::Eof),
270            Ok(0) => {
271                return Err(Error::HostError(
272                    "truncated transport frame prefix".to_owned(),
273                ));
274            }
275            Ok(read) => {
276                read_any = true;
277                let (_, rest) = buffer.split_at_mut(read);
278                buffer = rest;
279            }
280            Err(error) if error.kind() == ErrorKind::Interrupted => {}
281            Err(error) => return Err(io_to_host(error)),
282        }
283    }
284    Ok(ReadOutcome::Filled)
285}