Skip to main content

codec_rs/
stream.rs

1// SPDX-License-Identifier: MIT
2//! Stream decoders for the two Codec wire formats.
3//!
4//! Adapt any `std::io::Read` (sync) or `tokio::io::AsyncRead` (async,
5//! behind the `tokio` feature) into an iterator/stream of [`CodecFrame`]s.
6
7use std::io::Read;
8
9use crate::frame::CodecFrame;
10
11/// Errors raised by the stream decoders.
12#[derive(Debug, thiserror::Error)]
13pub enum StreamError {
14    #[error("io error: {0}")]
15    Io(#[from] std::io::Error),
16    #[error("invalid msgpack frame: {0}")]
17    Msgpack(String),
18    #[error("invalid protobuf frame: {0}")]
19    Protobuf(String),
20}
21
22// ── Public sync iterator constructors ─────────────────────────────────────
23
24/// Yield frames from a stream of concatenated MessagePack maps.
25///
26/// Frame shape: `{"ids": [int...], "done": bool, "finish_reason"?: str}`.
27pub fn decode_msgpack_stream<R: Read>(reader: R) -> MsgpackFrameIter<R> {
28    MsgpackFrameIter {
29        reader,
30        buf: Vec::new(),
31        eof: false,
32        done_seen: false,
33    }
34}
35
36/// Yield frames from a stream of length-prefixed protobuf `CodecFrame`
37/// payloads. Wire: 4-byte big-endian length followed by the protobuf
38/// bytes.
39pub fn decode_protobuf_stream<R: Read>(reader: R) -> ProtobufFrameIter<R> {
40    ProtobufFrameIter {
41        reader,
42        buf: Vec::new(),
43        eof: false,
44        done_seen: false,
45    }
46}
47
48// ── MessagePack iterator ──────────────────────────────────────────────────
49
50/// Iterator over MessagePack frames pulled from `R`.
51pub struct MsgpackFrameIter<R: Read> {
52    reader: R,
53    buf: Vec<u8>,
54    eof: bool,
55    done_seen: bool,
56}
57
58impl<R: Read> Iterator for MsgpackFrameIter<R> {
59    type Item = Result<CodecFrame, StreamError>;
60
61    fn next(&mut self) -> Option<Self::Item> {
62        if self.done_seen {
63            return None;
64        }
65        loop {
66            // Try to find the end of the next msgpack value in `buf`.
67            match msgpack_end_offset(&self.buf, 0) {
68                Ok(end) => {
69                    // We have a complete frame: parse fields manually.
70                    let frame = match decode_msgpack_frame(&self.buf[..end]) {
71                        Ok(f) => f,
72                        Err(e) => return Some(Err(e)),
73                    };
74                    // Drop consumed bytes.
75                    self.buf.drain(..end);
76                    if frame.done {
77                        self.done_seen = true;
78                    }
79                    return Some(Ok(frame));
80                }
81                Err(MsgpackBoundaryError::Incomplete) => {
82                    if self.eof {
83                        if self.buf.is_empty() {
84                            return None;
85                        }
86                        return Some(Err(StreamError::Msgpack(
87                            "stream ended mid-frame".into(),
88                        )));
89                    }
90                    let mut tmp = [0u8; 16 * 1024];
91                    match self.reader.read(&mut tmp) {
92                        Ok(0) => {
93                            self.eof = true;
94                            if self.buf.is_empty() {
95                                return None;
96                            }
97                            // Loop one more time so we can produce the
98                            // mid-frame error.
99                        }
100                        Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
101                        Err(e) => return Some(Err(StreamError::Io(e))),
102                    }
103                }
104                Err(MsgpackBoundaryError::Invalid(msg)) => {
105                    return Some(Err(StreamError::Msgpack(msg)));
106                }
107            }
108        }
109    }
110}
111
112// Hand-rolled msgpack value-boundary scanner. Just enough of the
113// msgpack spec to bound a single value (one frame). Mirrors
114// `_msgpack_end_offset` in `packages/python/src/codecai/stream.py`.
115enum MsgpackBoundaryError {
116    Incomplete,
117    Invalid(String),
118}
119
120fn msgpack_end_offset(data: &[u8], pos: usize) -> Result<usize, MsgpackBoundaryError> {
121    if pos >= data.len() {
122        return Err(MsgpackBoundaryError::Incomplete);
123    }
124    let b = data[pos];
125
126    // positive fixint, negative fixint, fixstr (covered below), nil, false, true
127    if b <= 0x7F || b >= 0xE0 {
128        return Ok(pos + 1);
129    }
130    if b == 0xC0 || b == 0xC2 || b == 0xC3 {
131        return Ok(pos + 1);
132    }
133
134    // fixstr (0xA0-0xBF)
135    if (0xA0..=0xBF).contains(&b) {
136        return need(data, pos + 1 + (b & 0x1F) as usize);
137    }
138    // fixarray (0x90-0x9F)
139    if (0x90..=0x9F).contains(&b) {
140        let n = (b & 0x0F) as usize;
141        return array_end(data, pos + 1, n);
142    }
143    // fixmap (0x80-0x8F)
144    if (0x80..=0x8F).contains(&b) {
145        let n = (b & 0x0F) as usize;
146        return array_end(data, pos + 1, n * 2);
147    }
148
149    // bin/str/array/map with explicit length
150    if b == 0xC4 || b == 0xC5 || b == 0xC6 {
151        return len_prefixed(data, pos, len_size(b)?);
152    }
153    if b == 0xD9 || b == 0xDA || b == 0xDB {
154        return len_prefixed(data, pos, len_size(b)?);
155    }
156    if b == 0xDC || b == 0xDD {
157        let size = if b == 0xDC { 2 } else { 4 };
158        if pos + 1 + size > data.len() {
159            return Err(MsgpackBoundaryError::Incomplete);
160        }
161        let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
162        return array_end(data, pos + 1 + size, n as usize);
163    }
164    if b == 0xDE || b == 0xDF {
165        let size = if b == 0xDE { 2 } else { 4 };
166        if pos + 1 + size > data.len() {
167            return Err(MsgpackBoundaryError::Incomplete);
168        }
169        let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
170        return array_end(data, pos + 1 + size, (n as usize) * 2);
171    }
172
173    let width = match b {
174        0xCC => Some(1),
175        0xCD => Some(2),
176        0xCE => Some(4),
177        0xCF => Some(8),
178        0xD0 => Some(1),
179        0xD1 => Some(2),
180        0xD2 => Some(4),
181        0xD3 => Some(8),
182        0xCA => Some(4),
183        0xCB => Some(8),
184        0xD4 => Some(2),
185        0xD5 => Some(3),
186        0xD6 => Some(5),
187        0xD7 => Some(9),
188        0xD8 => Some(17),
189        _ => None,
190    };
191    if let Some(w) = width {
192        return need(data, pos + 1 + w);
193    }
194
195    if b == 0xC7 || b == 0xC8 || b == 0xC9 {
196        let size = len_size(b)?;
197        if pos + 1 + size + 1 > data.len() {
198            return Err(MsgpackBoundaryError::Incomplete);
199        }
200        let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
201        return need(data, pos + 1 + size + 1 + n as usize);
202    }
203
204    Err(MsgpackBoundaryError::Invalid(format!(
205        "unsupported byte 0x{b:02X} at offset {pos}"
206    )))
207}
208
209fn len_size(prefix: u8) -> Result<usize, MsgpackBoundaryError> {
210    match prefix {
211        0xC4 | 0xC7 | 0xD9 => Ok(1),
212        0xC5 | 0xC8 | 0xDA => Ok(2),
213        0xC6 | 0xC9 | 0xDB => Ok(4),
214        _ => Err(MsgpackBoundaryError::Invalid(format!(
215            "unexpected length-prefix byte 0x{prefix:02X}"
216        ))),
217    }
218}
219
220fn len_prefixed(data: &[u8], pos: usize, size: usize) -> Result<usize, MsgpackBoundaryError> {
221    if pos + 1 + size > data.len() {
222        return Err(MsgpackBoundaryError::Incomplete);
223    }
224    let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
225    need(data, pos + 1 + size + n as usize)
226}
227
228fn need(data: &[u8], end: usize) -> Result<usize, MsgpackBoundaryError> {
229    if end > data.len() {
230        Err(MsgpackBoundaryError::Incomplete)
231    } else {
232        Ok(end)
233    }
234}
235
236fn array_end(data: &[u8], mut pos: usize, n: usize) -> Result<usize, MsgpackBoundaryError> {
237    for _ in 0..n {
238        pos = msgpack_end_offset(data, pos)?;
239    }
240    Ok(pos)
241}
242
243fn read_be_u32(slice: &[u8]) -> Result<u32, MsgpackBoundaryError> {
244    let mut v: u32 = 0;
245    for &b in slice {
246        v = (v << 8) | b as u32;
247    }
248    Ok(v)
249}
250
251// Decode a single complete msgpack frame slice into a CodecFrame.
252fn decode_msgpack_frame(data: &[u8]) -> Result<CodecFrame, StreamError> {
253    let mut p = MsgpackParser::new(data);
254    let map_len = p.read_map_header()?;
255    let mut ids: Vec<u32> = Vec::new();
256    let mut done = false;
257    let mut finish_reason: Option<String> = None;
258    for _ in 0..map_len {
259        let key = p.read_str()?;
260        match key.as_str() {
261            "ids" => {
262                let n = p.read_array_header()?;
263                ids.reserve(n);
264                for _ in 0..n {
265                    ids.push(p.read_int_u32()?);
266                }
267            }
268            "done" => {
269                done = p.read_bool()?;
270            }
271            "finish_reason" => {
272                if p.try_read_nil() {
273                    finish_reason = None;
274                } else {
275                    finish_reason = Some(p.read_str()?);
276                }
277            }
278            _ => {
279                p.skip_value()?;
280            }
281        }
282    }
283    Ok(CodecFrame { ids, done, finish_reason })
284}
285
286struct MsgpackParser<'a> {
287    data: &'a [u8],
288    pos: usize,
289}
290
291impl<'a> MsgpackParser<'a> {
292    fn new(data: &'a [u8]) -> Self {
293        Self { data, pos: 0 }
294    }
295    fn next_byte(&mut self) -> Result<u8, StreamError> {
296        if self.pos >= self.data.len() {
297            return Err(StreamError::Msgpack("truncated".into()));
298        }
299        let b = self.data[self.pos];
300        self.pos += 1;
301        Ok(b)
302    }
303    fn take(&mut self, n: usize) -> Result<&'a [u8], StreamError> {
304        if self.pos + n > self.data.len() {
305            return Err(StreamError::Msgpack("truncated".into()));
306        }
307        let s = &self.data[self.pos..self.pos + n];
308        self.pos += n;
309        Ok(s)
310    }
311    fn read_map_header(&mut self) -> Result<usize, StreamError> {
312        let b = self.next_byte()?;
313        if (0x80..=0x8F).contains(&b) {
314            return Ok((b & 0x0F) as usize);
315        }
316        if b == 0xDE {
317            let s = self.take(2)?;
318            return Ok(((s[0] as usize) << 8) | s[1] as usize);
319        }
320        if b == 0xDF {
321            let s = self.take(4)?;
322            return Ok(((s[0] as usize) << 24)
323                | ((s[1] as usize) << 16)
324                | ((s[2] as usize) << 8)
325                | s[3] as usize);
326        }
327        Err(StreamError::Msgpack(format!(
328            "expected map header, got 0x{b:02X}"
329        )))
330    }
331    fn read_array_header(&mut self) -> Result<usize, StreamError> {
332        let b = self.next_byte()?;
333        if (0x90..=0x9F).contains(&b) {
334            return Ok((b & 0x0F) as usize);
335        }
336        if b == 0xDC {
337            let s = self.take(2)?;
338            return Ok(((s[0] as usize) << 8) | s[1] as usize);
339        }
340        if b == 0xDD {
341            let s = self.take(4)?;
342            return Ok(((s[0] as usize) << 24)
343                | ((s[1] as usize) << 16)
344                | ((s[2] as usize) << 8)
345                | s[3] as usize);
346        }
347        Err(StreamError::Msgpack(format!(
348            "expected array header, got 0x{b:02X}"
349        )))
350    }
351    fn read_str(&mut self) -> Result<String, StreamError> {
352        let b = self.next_byte()?;
353        let len = if (0xA0..=0xBF).contains(&b) {
354            (b & 0x1F) as usize
355        } else if b == 0xD9 {
356            self.next_byte()? as usize
357        } else if b == 0xDA {
358            let s = self.take(2)?;
359            ((s[0] as usize) << 8) | s[1] as usize
360        } else if b == 0xDB {
361            let s = self.take(4)?;
362            ((s[0] as usize) << 24)
363                | ((s[1] as usize) << 16)
364                | ((s[2] as usize) << 8)
365                | s[3] as usize
366        } else {
367            return Err(StreamError::Msgpack(format!(
368                "expected string, got 0x{b:02X}"
369            )));
370        };
371        let bytes = self.take(len)?;
372        std::str::from_utf8(bytes)
373            .map(|s| s.to_string())
374            .map_err(|_| StreamError::Msgpack("invalid utf-8 in string".into()))
375    }
376    fn read_int_u32(&mut self) -> Result<u32, StreamError> {
377        let b = self.next_byte()?;
378        // positive fixint
379        if b <= 0x7F {
380            return Ok(b as u32);
381        }
382        match b {
383            0xCC => Ok(self.next_byte()? as u32),
384            0xCD => {
385                let s = self.take(2)?;
386                Ok(((s[0] as u32) << 8) | s[1] as u32)
387            }
388            0xCE => {
389                let s = self.take(4)?;
390                Ok(((s[0] as u32) << 24)
391                    | ((s[1] as u32) << 16)
392                    | ((s[2] as u32) << 8)
393                    | s[3] as u32)
394            }
395            0xCF => {
396                let s = self.take(8)?;
397                let mut v: u64 = 0;
398                for &byte in s {
399                    v = (v << 8) | byte as u64;
400                }
401                Ok(v as u32)
402            }
403            // signed ints (we coerce to u32 — match .NET ReadInt32 behavior)
404            0xD0 => Ok((self.next_byte()? as i8) as u32),
405            0xD1 => {
406                let s = self.take(2)?;
407                Ok((((s[0] as i16) << 8) | s[1] as i16) as u32)
408            }
409            0xD2 => {
410                let s = self.take(4)?;
411                Ok((((s[0] as i32) << 24)
412                    | ((s[1] as i32) << 16)
413                    | ((s[2] as i32) << 8)
414                    | s[3] as i32) as u32)
415            }
416            _ => Err(StreamError::Msgpack(format!(
417                "expected int, got 0x{b:02X}"
418            ))),
419        }
420    }
421    fn read_bool(&mut self) -> Result<bool, StreamError> {
422        match self.next_byte()? {
423            0xC2 => Ok(false),
424            0xC3 => Ok(true),
425            other => Err(StreamError::Msgpack(format!(
426                "expected bool, got 0x{other:02X}"
427            ))),
428        }
429    }
430    fn try_read_nil(&mut self) -> bool {
431        if self.pos < self.data.len() && self.data[self.pos] == 0xC0 {
432            self.pos += 1;
433            true
434        } else {
435            false
436        }
437    }
438    fn skip_value(&mut self) -> Result<(), StreamError> {
439        match msgpack_end_offset(self.data, self.pos) {
440            Ok(end) => {
441                self.pos = end;
442                Ok(())
443            }
444            Err(MsgpackBoundaryError::Incomplete) => {
445                Err(StreamError::Msgpack("truncated value to skip".into()))
446            }
447            Err(MsgpackBoundaryError::Invalid(m)) => Err(StreamError::Msgpack(m)),
448        }
449    }
450}
451
452// ── Protobuf iterator ─────────────────────────────────────────────────────
453
454/// Iterator over length-prefixed protobuf frames pulled from `R`.
455pub struct ProtobufFrameIter<R: Read> {
456    reader: R,
457    buf: Vec<u8>,
458    eof: bool,
459    done_seen: bool,
460}
461
462impl<R: Read> Iterator for ProtobufFrameIter<R> {
463    type Item = Result<CodecFrame, StreamError>;
464
465    fn next(&mut self) -> Option<Self::Item> {
466        if self.done_seen {
467            return None;
468        }
469        // Need 4 bytes for length prefix.
470        while self.buf.len() < 4 {
471            if self.eof {
472                if self.buf.is_empty() {
473                    return None;
474                }
475                return Some(Err(StreamError::Protobuf(format!(
476                    "stream ended mid-frame ({} bytes left)",
477                    self.buf.len()
478                ))));
479            }
480            let mut tmp = [0u8; 16 * 1024];
481            match self.reader.read(&mut tmp) {
482                Ok(0) => {
483                    self.eof = true;
484                }
485                Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
486                Err(e) => return Some(Err(StreamError::Io(e))),
487            }
488        }
489
490        let frame_len = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]])
491            as usize;
492
493        while self.buf.len() < 4 + frame_len {
494            if self.eof {
495                return Some(Err(StreamError::Protobuf(format!(
496                    "stream ended mid-frame (need {frame_len} bytes)"
497                ))));
498            }
499            let mut tmp = [0u8; 16 * 1024];
500            match self.reader.read(&mut tmp) {
501                Ok(0) => {
502                    self.eof = true;
503                }
504                Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
505                Err(e) => return Some(Err(StreamError::Io(e))),
506            }
507        }
508
509        let payload: Vec<u8> = self.buf[4..4 + frame_len].to_vec();
510        self.buf.drain(..4 + frame_len);
511
512        let frame = match decode_protobuf_frame(&payload) {
513            Ok(f) => f,
514            Err(e) => return Some(Err(e)),
515        };
516        if frame.done {
517            self.done_seen = true;
518        }
519        Some(Ok(frame))
520    }
521}
522
523/// Decode a single CodecFrame protobuf payload (no length prefix).
524pub fn decode_protobuf_frame(data: &[u8]) -> Result<CodecFrame, StreamError> {
525    let mut ids: Vec<u32> = Vec::new();
526    let mut done = false;
527    let mut finish_reason: Option<String> = None;
528    let mut pos = 0usize;
529
530    while pos < data.len() {
531        let (tag, np) = read_varint(data, pos)?;
532        pos = np;
533        let field = (tag >> 3) as u32;
534        let wt = (tag & 0x07) as u32;
535
536        match wt {
537            0 => {
538                // varint
539                let (val, np2) = read_varint(data, pos)?;
540                pos = np2;
541                if field == 2 {
542                    done = val != 0;
543                }
544            }
545            1 => {
546                // 64-bit fixed
547                if pos + 8 > data.len() {
548                    return Err(StreamError::Protobuf("truncated 64-bit field".into()));
549                }
550                pos += 8;
551            }
552            2 => {
553                // length-delimited
554                let (len, np3) = read_varint(data, pos)?;
555                pos = np3;
556                let len = len as usize;
557                if pos + len > data.len() {
558                    return Err(StreamError::Protobuf("truncated length-delimited field".into()));
559                }
560                let chunk = &data[pos..pos + len];
561                pos += len;
562                if field == 1 {
563                    // packed repeated uint32 ids
564                    let mut p = 0usize;
565                    while p < chunk.len() {
566                        let (id, npp) = read_varint(chunk, p)?;
567                        p = npp;
568                        ids.push(id as u32);
569                    }
570                } else if field == 3 {
571                    finish_reason = Some(
572                        std::str::from_utf8(chunk)
573                            .map_err(|_| {
574                                StreamError::Protobuf("invalid utf-8 in finish_reason".into())
575                            })?
576                            .to_string(),
577                    );
578                }
579            }
580            5 => {
581                // 32-bit fixed
582                if pos + 4 > data.len() {
583                    return Err(StreamError::Protobuf("truncated 32-bit field".into()));
584                }
585                pos += 4;
586            }
587            other => {
588                return Err(StreamError::Protobuf(format!(
589                    "unknown wire type {other} in CodecFrame field {field}"
590                )));
591            }
592        }
593    }
594
595    Ok(CodecFrame { ids, done, finish_reason })
596}
597
598fn read_varint(data: &[u8], mut pos: usize) -> Result<(u64, usize), StreamError> {
599    let mut result: u64 = 0;
600    let mut shift: u32 = 0;
601    loop {
602        if pos >= data.len() {
603            return Err(StreamError::Protobuf("truncated varint".into()));
604        }
605        let b = data[pos];
606        pos += 1;
607        result |= ((b & 0x7F) as u64) << shift;
608        if (b & 0x80) == 0 {
609            return Ok((result, pos));
610        }
611        shift += 7;
612        if shift > 63 {
613            return Err(StreamError::Protobuf("varint too long".into()));
614        }
615    }
616}
617
618// ── Encoders for tests / parity helpers ──────────────────────────────────
619
620/// Encode `frame` as a single MessagePack map. Used by tests and any
621/// caller that wants to round-trip via the wire format.
622pub fn encode_msgpack_frame(frame: &CodecFrame) -> Vec<u8> {
623    let mut out: Vec<u8> = Vec::with_capacity(32 + frame.ids.len() * 5);
624    let field_count = if frame.finish_reason.is_some() { 3 } else { 2 };
625    write_map_header(&mut out, field_count);
626    write_str(&mut out, "ids");
627    write_array_header(&mut out, frame.ids.len());
628    for &id in &frame.ids {
629        write_uint_u32(&mut out, id);
630    }
631    write_str(&mut out, "done");
632    out.push(if frame.done { 0xC3 } else { 0xC2 });
633    if let Some(reason) = &frame.finish_reason {
634        write_str(&mut out, "finish_reason");
635        write_str(&mut out, reason);
636    }
637    out
638}
639
640/// Encode `frame` as a length-prefixed protobuf frame (4-byte BE length + payload).
641pub fn encode_protobuf_frame(frame: &CodecFrame) -> Vec<u8> {
642    let mut payload: Vec<u8> = Vec::new();
643    if !frame.ids.is_empty() {
644        let mut packed: Vec<u8> = Vec::new();
645        for &id in &frame.ids {
646            write_varint(&mut packed, id as u64);
647        }
648        payload.push(0x0A);
649        write_varint(&mut payload, packed.len() as u64);
650        payload.extend_from_slice(&packed);
651    }
652    payload.push(0x10);
653    payload.push(if frame.done { 1 } else { 0 });
654    if let Some(reason) = &frame.finish_reason {
655        let bytes = reason.as_bytes();
656        payload.push(0x1A);
657        write_varint(&mut payload, bytes.len() as u64);
658        payload.extend_from_slice(bytes);
659    }
660
661    let mut out = Vec::with_capacity(4 + payload.len());
662    let len = payload.len() as u32;
663    out.extend_from_slice(&len.to_be_bytes());
664    out.extend_from_slice(&payload);
665    out
666}
667
668fn write_map_header(out: &mut Vec<u8>, n: usize) {
669    if n <= 0x0F {
670        out.push(0x80 | n as u8);
671    } else if n <= 0xFFFF {
672        out.push(0xDE);
673        out.extend_from_slice(&(n as u16).to_be_bytes());
674    } else {
675        out.push(0xDF);
676        out.extend_from_slice(&(n as u32).to_be_bytes());
677    }
678}
679
680fn write_array_header(out: &mut Vec<u8>, n: usize) {
681    if n <= 0x0F {
682        out.push(0x90 | n as u8);
683    } else if n <= 0xFFFF {
684        out.push(0xDC);
685        out.extend_from_slice(&(n as u16).to_be_bytes());
686    } else {
687        out.push(0xDD);
688        out.extend_from_slice(&(n as u32).to_be_bytes());
689    }
690}
691
692fn write_str(out: &mut Vec<u8>, s: &str) {
693    let b = s.as_bytes();
694    let n = b.len();
695    if n <= 0x1F {
696        out.push(0xA0 | n as u8);
697    } else if n <= 0xFF {
698        out.push(0xD9);
699        out.push(n as u8);
700    } else if n <= 0xFFFF {
701        out.push(0xDA);
702        out.extend_from_slice(&(n as u16).to_be_bytes());
703    } else {
704        out.push(0xDB);
705        out.extend_from_slice(&(n as u32).to_be_bytes());
706    }
707    out.extend_from_slice(b);
708}
709
710fn write_uint_u32(out: &mut Vec<u8>, v: u32) {
711    if v <= 0x7F {
712        out.push(v as u8);
713    } else if v <= 0xFF {
714        out.push(0xCC);
715        out.push(v as u8);
716    } else if v <= 0xFFFF {
717        out.push(0xCD);
718        out.extend_from_slice(&(v as u16).to_be_bytes());
719    } else {
720        out.push(0xCE);
721        out.extend_from_slice(&v.to_be_bytes());
722    }
723}
724
725fn write_varint(out: &mut Vec<u8>, mut n: u64) {
726    loop {
727        let bits = (n & 0x7F) as u8;
728        n >>= 7;
729        if n == 0 {
730            out.push(bits);
731            return;
732        }
733        out.push(bits | 0x80);
734    }
735}
736
737// ── Async (tokio feature) ─────────────────────────────────────────────────
738
739/// Async variants of the stream decoders, gated behind the `tokio`
740/// feature. Returns `futures::Stream<Item = Result<CodecFrame>>`.
741#[cfg(feature = "tokio")]
742pub mod r#async {
743    use super::{decode_msgpack_frame, decode_protobuf_frame, msgpack_end_offset,
744                MsgpackBoundaryError, StreamError};
745    use crate::frame::CodecFrame;
746    use async_stream::try_stream;
747    use futures_util::Stream;
748    use tokio::io::{AsyncRead, AsyncReadExt};
749
750    /// Async msgpack frame stream from any [`tokio::io::AsyncRead`].
751    pub fn decode_msgpack_stream_async<R>(
752        mut reader: R,
753    ) -> impl Stream<Item = Result<CodecFrame, StreamError>>
754    where
755        R: AsyncRead + Unpin,
756    {
757        try_stream! {
758            let mut buf: Vec<u8> = Vec::new();
759            let mut tmp = [0u8; 16 * 1024];
760            let mut eof = false;
761            loop {
762                match msgpack_end_offset(&buf, 0) {
763                    Ok(end) => {
764                        let frame = decode_msgpack_frame(&buf[..end])?;
765                        buf.drain(..end);
766                        let done = frame.done;
767                        yield frame;
768                        if done {
769                            return;
770                        }
771                    }
772                    Err(MsgpackBoundaryError::Incomplete) => {
773                        if eof {
774                            if buf.is_empty() {
775                                return;
776                            }
777                            Err(StreamError::Msgpack("stream ended mid-frame".into()))?;
778                        }
779                        let n = reader.read(&mut tmp).await
780                            .map_err(StreamError::Io)?;
781                        if n == 0 {
782                            eof = true;
783                            if buf.is_empty() {
784                                return;
785                            }
786                        } else {
787                            buf.extend_from_slice(&tmp[..n]);
788                        }
789                    }
790                    Err(MsgpackBoundaryError::Invalid(m)) => {
791                        Err(StreamError::Msgpack(m))?;
792                    }
793                }
794            }
795        }
796    }
797
798    /// Async protobuf (length-prefixed) frame stream.
799    pub fn decode_protobuf_stream_async<R>(
800        mut reader: R,
801    ) -> impl Stream<Item = Result<CodecFrame, StreamError>>
802    where
803        R: AsyncRead + Unpin,
804    {
805        try_stream! {
806            let mut buf: Vec<u8> = Vec::new();
807            let mut tmp = [0u8; 16 * 1024];
808            let mut eof = false;
809            loop {
810                while buf.len() < 4 {
811                    if eof {
812                        if buf.is_empty() {
813                            return;
814                        }
815                        Err(StreamError::Protobuf(format!(
816                            "stream ended mid-frame ({} bytes left)", buf.len()
817                        )))?;
818                    }
819                    let n = reader.read(&mut tmp).await
820                        .map_err(StreamError::Io)?;
821                    if n == 0 {
822                        eof = true;
823                    } else {
824                        buf.extend_from_slice(&tmp[..n]);
825                    }
826                }
827
828                let frame_len = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
829                while buf.len() < 4 + frame_len {
830                    if eof {
831                        Err(StreamError::Protobuf(format!(
832                            "stream ended mid-frame (need {frame_len} bytes)"
833                        )))?;
834                    }
835                    let n = reader.read(&mut tmp).await
836                        .map_err(StreamError::Io)?;
837                    if n == 0 {
838                        eof = true;
839                    } else {
840                        buf.extend_from_slice(&tmp[..n]);
841                    }
842                }
843
844                let payload: Vec<u8> = buf[4..4 + frame_len].to_vec();
845                buf.drain(..4 + frame_len);
846
847                let frame = decode_protobuf_frame(&payload)?;
848                let done = frame.done;
849                yield frame;
850                if done {
851                    return;
852                }
853            }
854        }
855    }
856}