1use std::io::Read;
8
9use crate::frame::CodecFrame;
10
11#[derive(Debug, thiserror::Error)]
13pub enum StreamError {
14 #[error("io error: {0}")]
15 Io(#[from] std::io::Error),
16 #[error("invalid msgpack frame: {0}")]
17 Msgpack(String),
18 #[error("invalid protobuf frame: {0}")]
19 Protobuf(String),
20}
21
22pub fn decode_msgpack_stream<R: Read>(reader: R) -> MsgpackFrameIter<R> {
28 MsgpackFrameIter {
29 reader,
30 buf: Vec::new(),
31 eof: false,
32 done_seen: false,
33 }
34}
35
36pub fn decode_protobuf_stream<R: Read>(reader: R) -> ProtobufFrameIter<R> {
40 ProtobufFrameIter {
41 reader,
42 buf: Vec::new(),
43 eof: false,
44 done_seen: false,
45 }
46}
47
48pub struct MsgpackFrameIter<R: Read> {
52 reader: R,
53 buf: Vec<u8>,
54 eof: bool,
55 done_seen: bool,
56}
57
58impl<R: Read> Iterator for MsgpackFrameIter<R> {
59 type Item = Result<CodecFrame, StreamError>;
60
61 fn next(&mut self) -> Option<Self::Item> {
62 if self.done_seen {
63 return None;
64 }
65 loop {
66 match msgpack_end_offset(&self.buf, 0) {
68 Ok(end) => {
69 let frame = match decode_msgpack_frame(&self.buf[..end]) {
71 Ok(f) => f,
72 Err(e) => return Some(Err(e)),
73 };
74 self.buf.drain(..end);
76 if frame.done {
77 self.done_seen = true;
78 }
79 return Some(Ok(frame));
80 }
81 Err(MsgpackBoundaryError::Incomplete) => {
82 if self.eof {
83 if self.buf.is_empty() {
84 return None;
85 }
86 return Some(Err(StreamError::Msgpack(
87 "stream ended mid-frame".into(),
88 )));
89 }
90 let mut tmp = [0u8; 16 * 1024];
91 match self.reader.read(&mut tmp) {
92 Ok(0) => {
93 self.eof = true;
94 if self.buf.is_empty() {
95 return None;
96 }
97 }
100 Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
101 Err(e) => return Some(Err(StreamError::Io(e))),
102 }
103 }
104 Err(MsgpackBoundaryError::Invalid(msg)) => {
105 return Some(Err(StreamError::Msgpack(msg)));
106 }
107 }
108 }
109 }
110}
111
112enum MsgpackBoundaryError {
116 Incomplete,
117 Invalid(String),
118}
119
120fn msgpack_end_offset(data: &[u8], pos: usize) -> Result<usize, MsgpackBoundaryError> {
121 if pos >= data.len() {
122 return Err(MsgpackBoundaryError::Incomplete);
123 }
124 let b = data[pos];
125
126 if b <= 0x7F || b >= 0xE0 {
128 return Ok(pos + 1);
129 }
130 if b == 0xC0 || b == 0xC2 || b == 0xC3 {
131 return Ok(pos + 1);
132 }
133
134 if (0xA0..=0xBF).contains(&b) {
136 return need(data, pos + 1 + (b & 0x1F) as usize);
137 }
138 if (0x90..=0x9F).contains(&b) {
140 let n = (b & 0x0F) as usize;
141 return array_end(data, pos + 1, n);
142 }
143 if (0x80..=0x8F).contains(&b) {
145 let n = (b & 0x0F) as usize;
146 return array_end(data, pos + 1, n * 2);
147 }
148
149 if b == 0xC4 || b == 0xC5 || b == 0xC6 {
151 return len_prefixed(data, pos, len_size(b)?);
152 }
153 if b == 0xD9 || b == 0xDA || b == 0xDB {
154 return len_prefixed(data, pos, len_size(b)?);
155 }
156 if b == 0xDC || b == 0xDD {
157 let size = if b == 0xDC { 2 } else { 4 };
158 if pos + 1 + size > data.len() {
159 return Err(MsgpackBoundaryError::Incomplete);
160 }
161 let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
162 return array_end(data, pos + 1 + size, n as usize);
163 }
164 if b == 0xDE || b == 0xDF {
165 let size = if b == 0xDE { 2 } else { 4 };
166 if pos + 1 + size > data.len() {
167 return Err(MsgpackBoundaryError::Incomplete);
168 }
169 let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
170 return array_end(data, pos + 1 + size, (n as usize) * 2);
171 }
172
173 let width = match b {
174 0xCC => Some(1),
175 0xCD => Some(2),
176 0xCE => Some(4),
177 0xCF => Some(8),
178 0xD0 => Some(1),
179 0xD1 => Some(2),
180 0xD2 => Some(4),
181 0xD3 => Some(8),
182 0xCA => Some(4),
183 0xCB => Some(8),
184 0xD4 => Some(2),
185 0xD5 => Some(3),
186 0xD6 => Some(5),
187 0xD7 => Some(9),
188 0xD8 => Some(17),
189 _ => None,
190 };
191 if let Some(w) = width {
192 return need(data, pos + 1 + w);
193 }
194
195 if b == 0xC7 || b == 0xC8 || b == 0xC9 {
196 let size = len_size(b)?;
197 if pos + 1 + size + 1 > data.len() {
198 return Err(MsgpackBoundaryError::Incomplete);
199 }
200 let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
201 return need(data, pos + 1 + size + 1 + n as usize);
202 }
203
204 Err(MsgpackBoundaryError::Invalid(format!(
205 "unsupported byte 0x{b:02X} at offset {pos}"
206 )))
207}
208
209fn len_size(prefix: u8) -> Result<usize, MsgpackBoundaryError> {
210 match prefix {
211 0xC4 | 0xC7 | 0xD9 => Ok(1),
212 0xC5 | 0xC8 | 0xDA => Ok(2),
213 0xC6 | 0xC9 | 0xDB => Ok(4),
214 _ => Err(MsgpackBoundaryError::Invalid(format!(
215 "unexpected length-prefix byte 0x{prefix:02X}"
216 ))),
217 }
218}
219
220fn len_prefixed(data: &[u8], pos: usize, size: usize) -> Result<usize, MsgpackBoundaryError> {
221 if pos + 1 + size > data.len() {
222 return Err(MsgpackBoundaryError::Incomplete);
223 }
224 let n = read_be_u32(&data[pos + 1..pos + 1 + size])?;
225 need(data, pos + 1 + size + n as usize)
226}
227
228fn need(data: &[u8], end: usize) -> Result<usize, MsgpackBoundaryError> {
229 if end > data.len() {
230 Err(MsgpackBoundaryError::Incomplete)
231 } else {
232 Ok(end)
233 }
234}
235
236fn array_end(data: &[u8], mut pos: usize, n: usize) -> Result<usize, MsgpackBoundaryError> {
237 for _ in 0..n {
238 pos = msgpack_end_offset(data, pos)?;
239 }
240 Ok(pos)
241}
242
243fn read_be_u32(slice: &[u8]) -> Result<u32, MsgpackBoundaryError> {
244 let mut v: u32 = 0;
245 for &b in slice {
246 v = (v << 8) | b as u32;
247 }
248 Ok(v)
249}
250
251fn decode_msgpack_frame(data: &[u8]) -> Result<CodecFrame, StreamError> {
253 let mut p = MsgpackParser::new(data);
254 let map_len = p.read_map_header()?;
255 let mut ids: Vec<u32> = Vec::new();
256 let mut done = false;
257 let mut finish_reason: Option<String> = None;
258 for _ in 0..map_len {
259 let key = p.read_str()?;
260 match key.as_str() {
261 "ids" => {
262 let n = p.read_array_header()?;
263 ids.reserve(n);
264 for _ in 0..n {
265 ids.push(p.read_int_u32()?);
266 }
267 }
268 "done" => {
269 done = p.read_bool()?;
270 }
271 "finish_reason" => {
272 if p.try_read_nil() {
273 finish_reason = None;
274 } else {
275 finish_reason = Some(p.read_str()?);
276 }
277 }
278 _ => {
279 p.skip_value()?;
280 }
281 }
282 }
283 Ok(CodecFrame { ids, done, finish_reason })
284}
285
286struct MsgpackParser<'a> {
287 data: &'a [u8],
288 pos: usize,
289}
290
291impl<'a> MsgpackParser<'a> {
292 fn new(data: &'a [u8]) -> Self {
293 Self { data, pos: 0 }
294 }
295 fn next_byte(&mut self) -> Result<u8, StreamError> {
296 if self.pos >= self.data.len() {
297 return Err(StreamError::Msgpack("truncated".into()));
298 }
299 let b = self.data[self.pos];
300 self.pos += 1;
301 Ok(b)
302 }
303 fn take(&mut self, n: usize) -> Result<&'a [u8], StreamError> {
304 if self.pos + n > self.data.len() {
305 return Err(StreamError::Msgpack("truncated".into()));
306 }
307 let s = &self.data[self.pos..self.pos + n];
308 self.pos += n;
309 Ok(s)
310 }
311 fn read_map_header(&mut self) -> Result<usize, StreamError> {
312 let b = self.next_byte()?;
313 if (0x80..=0x8F).contains(&b) {
314 return Ok((b & 0x0F) as usize);
315 }
316 if b == 0xDE {
317 let s = self.take(2)?;
318 return Ok(((s[0] as usize) << 8) | s[1] as usize);
319 }
320 if b == 0xDF {
321 let s = self.take(4)?;
322 return Ok(((s[0] as usize) << 24)
323 | ((s[1] as usize) << 16)
324 | ((s[2] as usize) << 8)
325 | s[3] as usize);
326 }
327 Err(StreamError::Msgpack(format!(
328 "expected map header, got 0x{b:02X}"
329 )))
330 }
331 fn read_array_header(&mut self) -> Result<usize, StreamError> {
332 let b = self.next_byte()?;
333 if (0x90..=0x9F).contains(&b) {
334 return Ok((b & 0x0F) as usize);
335 }
336 if b == 0xDC {
337 let s = self.take(2)?;
338 return Ok(((s[0] as usize) << 8) | s[1] as usize);
339 }
340 if b == 0xDD {
341 let s = self.take(4)?;
342 return Ok(((s[0] as usize) << 24)
343 | ((s[1] as usize) << 16)
344 | ((s[2] as usize) << 8)
345 | s[3] as usize);
346 }
347 Err(StreamError::Msgpack(format!(
348 "expected array header, got 0x{b:02X}"
349 )))
350 }
351 fn read_str(&mut self) -> Result<String, StreamError> {
352 let b = self.next_byte()?;
353 let len = if (0xA0..=0xBF).contains(&b) {
354 (b & 0x1F) as usize
355 } else if b == 0xD9 {
356 self.next_byte()? as usize
357 } else if b == 0xDA {
358 let s = self.take(2)?;
359 ((s[0] as usize) << 8) | s[1] as usize
360 } else if b == 0xDB {
361 let s = self.take(4)?;
362 ((s[0] as usize) << 24)
363 | ((s[1] as usize) << 16)
364 | ((s[2] as usize) << 8)
365 | s[3] as usize
366 } else {
367 return Err(StreamError::Msgpack(format!(
368 "expected string, got 0x{b:02X}"
369 )));
370 };
371 let bytes = self.take(len)?;
372 std::str::from_utf8(bytes)
373 .map(|s| s.to_string())
374 .map_err(|_| StreamError::Msgpack("invalid utf-8 in string".into()))
375 }
376 fn read_int_u32(&mut self) -> Result<u32, StreamError> {
377 let b = self.next_byte()?;
378 if b <= 0x7F {
380 return Ok(b as u32);
381 }
382 match b {
383 0xCC => Ok(self.next_byte()? as u32),
384 0xCD => {
385 let s = self.take(2)?;
386 Ok(((s[0] as u32) << 8) | s[1] as u32)
387 }
388 0xCE => {
389 let s = self.take(4)?;
390 Ok(((s[0] as u32) << 24)
391 | ((s[1] as u32) << 16)
392 | ((s[2] as u32) << 8)
393 | s[3] as u32)
394 }
395 0xCF => {
396 let s = self.take(8)?;
397 let mut v: u64 = 0;
398 for &byte in s {
399 v = (v << 8) | byte as u64;
400 }
401 Ok(v as u32)
402 }
403 0xD0 => Ok((self.next_byte()? as i8) as u32),
405 0xD1 => {
406 let s = self.take(2)?;
407 Ok((((s[0] as i16) << 8) | s[1] as i16) as u32)
408 }
409 0xD2 => {
410 let s = self.take(4)?;
411 Ok((((s[0] as i32) << 24)
412 | ((s[1] as i32) << 16)
413 | ((s[2] as i32) << 8)
414 | s[3] as i32) as u32)
415 }
416 _ => Err(StreamError::Msgpack(format!(
417 "expected int, got 0x{b:02X}"
418 ))),
419 }
420 }
421 fn read_bool(&mut self) -> Result<bool, StreamError> {
422 match self.next_byte()? {
423 0xC2 => Ok(false),
424 0xC3 => Ok(true),
425 other => Err(StreamError::Msgpack(format!(
426 "expected bool, got 0x{other:02X}"
427 ))),
428 }
429 }
430 fn try_read_nil(&mut self) -> bool {
431 if self.pos < self.data.len() && self.data[self.pos] == 0xC0 {
432 self.pos += 1;
433 true
434 } else {
435 false
436 }
437 }
438 fn skip_value(&mut self) -> Result<(), StreamError> {
439 match msgpack_end_offset(self.data, self.pos) {
440 Ok(end) => {
441 self.pos = end;
442 Ok(())
443 }
444 Err(MsgpackBoundaryError::Incomplete) => {
445 Err(StreamError::Msgpack("truncated value to skip".into()))
446 }
447 Err(MsgpackBoundaryError::Invalid(m)) => Err(StreamError::Msgpack(m)),
448 }
449 }
450}
451
452pub struct ProtobufFrameIter<R: Read> {
456 reader: R,
457 buf: Vec<u8>,
458 eof: bool,
459 done_seen: bool,
460}
461
462impl<R: Read> Iterator for ProtobufFrameIter<R> {
463 type Item = Result<CodecFrame, StreamError>;
464
465 fn next(&mut self) -> Option<Self::Item> {
466 if self.done_seen {
467 return None;
468 }
469 while self.buf.len() < 4 {
471 if self.eof {
472 if self.buf.is_empty() {
473 return None;
474 }
475 return Some(Err(StreamError::Protobuf(format!(
476 "stream ended mid-frame ({} bytes left)",
477 self.buf.len()
478 ))));
479 }
480 let mut tmp = [0u8; 16 * 1024];
481 match self.reader.read(&mut tmp) {
482 Ok(0) => {
483 self.eof = true;
484 }
485 Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
486 Err(e) => return Some(Err(StreamError::Io(e))),
487 }
488 }
489
490 let frame_len = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]])
491 as usize;
492
493 while self.buf.len() < 4 + frame_len {
494 if self.eof {
495 return Some(Err(StreamError::Protobuf(format!(
496 "stream ended mid-frame (need {frame_len} bytes)"
497 ))));
498 }
499 let mut tmp = [0u8; 16 * 1024];
500 match self.reader.read(&mut tmp) {
501 Ok(0) => {
502 self.eof = true;
503 }
504 Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
505 Err(e) => return Some(Err(StreamError::Io(e))),
506 }
507 }
508
509 let payload: Vec<u8> = self.buf[4..4 + frame_len].to_vec();
510 self.buf.drain(..4 + frame_len);
511
512 let frame = match decode_protobuf_frame(&payload) {
513 Ok(f) => f,
514 Err(e) => return Some(Err(e)),
515 };
516 if frame.done {
517 self.done_seen = true;
518 }
519 Some(Ok(frame))
520 }
521}
522
523pub fn decode_protobuf_frame(data: &[u8]) -> Result<CodecFrame, StreamError> {
525 let mut ids: Vec<u32> = Vec::new();
526 let mut done = false;
527 let mut finish_reason: Option<String> = None;
528 let mut pos = 0usize;
529
530 while pos < data.len() {
531 let (tag, np) = read_varint(data, pos)?;
532 pos = np;
533 let field = (tag >> 3) as u32;
534 let wt = (tag & 0x07) as u32;
535
536 match wt {
537 0 => {
538 let (val, np2) = read_varint(data, pos)?;
540 pos = np2;
541 if field == 2 {
542 done = val != 0;
543 }
544 }
545 1 => {
546 if pos + 8 > data.len() {
548 return Err(StreamError::Protobuf("truncated 64-bit field".into()));
549 }
550 pos += 8;
551 }
552 2 => {
553 let (len, np3) = read_varint(data, pos)?;
555 pos = np3;
556 let len = len as usize;
557 if pos + len > data.len() {
558 return Err(StreamError::Protobuf("truncated length-delimited field".into()));
559 }
560 let chunk = &data[pos..pos + len];
561 pos += len;
562 if field == 1 {
563 let mut p = 0usize;
565 while p < chunk.len() {
566 let (id, npp) = read_varint(chunk, p)?;
567 p = npp;
568 ids.push(id as u32);
569 }
570 } else if field == 3 {
571 finish_reason = Some(
572 std::str::from_utf8(chunk)
573 .map_err(|_| {
574 StreamError::Protobuf("invalid utf-8 in finish_reason".into())
575 })?
576 .to_string(),
577 );
578 }
579 }
580 5 => {
581 if pos + 4 > data.len() {
583 return Err(StreamError::Protobuf("truncated 32-bit field".into()));
584 }
585 pos += 4;
586 }
587 other => {
588 return Err(StreamError::Protobuf(format!(
589 "unknown wire type {other} in CodecFrame field {field}"
590 )));
591 }
592 }
593 }
594
595 Ok(CodecFrame { ids, done, finish_reason })
596}
597
598fn read_varint(data: &[u8], mut pos: usize) -> Result<(u64, usize), StreamError> {
599 let mut result: u64 = 0;
600 let mut shift: u32 = 0;
601 loop {
602 if pos >= data.len() {
603 return Err(StreamError::Protobuf("truncated varint".into()));
604 }
605 let b = data[pos];
606 pos += 1;
607 result |= ((b & 0x7F) as u64) << shift;
608 if (b & 0x80) == 0 {
609 return Ok((result, pos));
610 }
611 shift += 7;
612 if shift > 63 {
613 return Err(StreamError::Protobuf("varint too long".into()));
614 }
615 }
616}
617
618pub fn encode_msgpack_frame(frame: &CodecFrame) -> Vec<u8> {
623 let mut out: Vec<u8> = Vec::with_capacity(32 + frame.ids.len() * 5);
624 let field_count = if frame.finish_reason.is_some() { 3 } else { 2 };
625 write_map_header(&mut out, field_count);
626 write_str(&mut out, "ids");
627 write_array_header(&mut out, frame.ids.len());
628 for &id in &frame.ids {
629 write_uint_u32(&mut out, id);
630 }
631 write_str(&mut out, "done");
632 out.push(if frame.done { 0xC3 } else { 0xC2 });
633 if let Some(reason) = &frame.finish_reason {
634 write_str(&mut out, "finish_reason");
635 write_str(&mut out, reason);
636 }
637 out
638}
639
640pub fn encode_protobuf_frame(frame: &CodecFrame) -> Vec<u8> {
642 let mut payload: Vec<u8> = Vec::new();
643 if !frame.ids.is_empty() {
644 let mut packed: Vec<u8> = Vec::new();
645 for &id in &frame.ids {
646 write_varint(&mut packed, id as u64);
647 }
648 payload.push(0x0A);
649 write_varint(&mut payload, packed.len() as u64);
650 payload.extend_from_slice(&packed);
651 }
652 payload.push(0x10);
653 payload.push(if frame.done { 1 } else { 0 });
654 if let Some(reason) = &frame.finish_reason {
655 let bytes = reason.as_bytes();
656 payload.push(0x1A);
657 write_varint(&mut payload, bytes.len() as u64);
658 payload.extend_from_slice(bytes);
659 }
660
661 let mut out = Vec::with_capacity(4 + payload.len());
662 let len = payload.len() as u32;
663 out.extend_from_slice(&len.to_be_bytes());
664 out.extend_from_slice(&payload);
665 out
666}
667
668fn write_map_header(out: &mut Vec<u8>, n: usize) {
669 if n <= 0x0F {
670 out.push(0x80 | n as u8);
671 } else if n <= 0xFFFF {
672 out.push(0xDE);
673 out.extend_from_slice(&(n as u16).to_be_bytes());
674 } else {
675 out.push(0xDF);
676 out.extend_from_slice(&(n as u32).to_be_bytes());
677 }
678}
679
680fn write_array_header(out: &mut Vec<u8>, n: usize) {
681 if n <= 0x0F {
682 out.push(0x90 | n as u8);
683 } else if n <= 0xFFFF {
684 out.push(0xDC);
685 out.extend_from_slice(&(n as u16).to_be_bytes());
686 } else {
687 out.push(0xDD);
688 out.extend_from_slice(&(n as u32).to_be_bytes());
689 }
690}
691
692fn write_str(out: &mut Vec<u8>, s: &str) {
693 let b = s.as_bytes();
694 let n = b.len();
695 if n <= 0x1F {
696 out.push(0xA0 | n as u8);
697 } else if n <= 0xFF {
698 out.push(0xD9);
699 out.push(n as u8);
700 } else if n <= 0xFFFF {
701 out.push(0xDA);
702 out.extend_from_slice(&(n as u16).to_be_bytes());
703 } else {
704 out.push(0xDB);
705 out.extend_from_slice(&(n as u32).to_be_bytes());
706 }
707 out.extend_from_slice(b);
708}
709
710fn write_uint_u32(out: &mut Vec<u8>, v: u32) {
711 if v <= 0x7F {
712 out.push(v as u8);
713 } else if v <= 0xFF {
714 out.push(0xCC);
715 out.push(v as u8);
716 } else if v <= 0xFFFF {
717 out.push(0xCD);
718 out.extend_from_slice(&(v as u16).to_be_bytes());
719 } else {
720 out.push(0xCE);
721 out.extend_from_slice(&v.to_be_bytes());
722 }
723}
724
725fn write_varint(out: &mut Vec<u8>, mut n: u64) {
726 loop {
727 let bits = (n & 0x7F) as u8;
728 n >>= 7;
729 if n == 0 {
730 out.push(bits);
731 return;
732 }
733 out.push(bits | 0x80);
734 }
735}
736
737#[cfg(feature = "tokio")]
742pub mod r#async {
743 use super::{decode_msgpack_frame, decode_protobuf_frame, msgpack_end_offset,
744 MsgpackBoundaryError, StreamError};
745 use crate::frame::CodecFrame;
746 use async_stream::try_stream;
747 use futures_util::Stream;
748 use tokio::io::{AsyncRead, AsyncReadExt};
749
750 pub fn decode_msgpack_stream_async<R>(
752 mut reader: R,
753 ) -> impl Stream<Item = Result<CodecFrame, StreamError>>
754 where
755 R: AsyncRead + Unpin,
756 {
757 try_stream! {
758 let mut buf: Vec<u8> = Vec::new();
759 let mut tmp = [0u8; 16 * 1024];
760 let mut eof = false;
761 loop {
762 match msgpack_end_offset(&buf, 0) {
763 Ok(end) => {
764 let frame = decode_msgpack_frame(&buf[..end])?;
765 buf.drain(..end);
766 let done = frame.done;
767 yield frame;
768 if done {
769 return;
770 }
771 }
772 Err(MsgpackBoundaryError::Incomplete) => {
773 if eof {
774 if buf.is_empty() {
775 return;
776 }
777 Err(StreamError::Msgpack("stream ended mid-frame".into()))?;
778 }
779 let n = reader.read(&mut tmp).await
780 .map_err(StreamError::Io)?;
781 if n == 0 {
782 eof = true;
783 if buf.is_empty() {
784 return;
785 }
786 } else {
787 buf.extend_from_slice(&tmp[..n]);
788 }
789 }
790 Err(MsgpackBoundaryError::Invalid(m)) => {
791 Err(StreamError::Msgpack(m))?;
792 }
793 }
794 }
795 }
796 }
797
798 pub fn decode_protobuf_stream_async<R>(
800 mut reader: R,
801 ) -> impl Stream<Item = Result<CodecFrame, StreamError>>
802 where
803 R: AsyncRead + Unpin,
804 {
805 try_stream! {
806 let mut buf: Vec<u8> = Vec::new();
807 let mut tmp = [0u8; 16 * 1024];
808 let mut eof = false;
809 loop {
810 while buf.len() < 4 {
811 if eof {
812 if buf.is_empty() {
813 return;
814 }
815 Err(StreamError::Protobuf(format!(
816 "stream ended mid-frame ({} bytes left)", buf.len()
817 )))?;
818 }
819 let n = reader.read(&mut tmp).await
820 .map_err(StreamError::Io)?;
821 if n == 0 {
822 eof = true;
823 } else {
824 buf.extend_from_slice(&tmp[..n]);
825 }
826 }
827
828 let frame_len = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
829 while buf.len() < 4 + frame_len {
830 if eof {
831 Err(StreamError::Protobuf(format!(
832 "stream ended mid-frame (need {frame_len} bytes)"
833 )))?;
834 }
835 let n = reader.read(&mut tmp).await
836 .map_err(StreamError::Io)?;
837 if n == 0 {
838 eof = true;
839 } else {
840 buf.extend_from_slice(&tmp[..n]);
841 }
842 }
843
844 let payload: Vec<u8> = buf[4..4 + frame_len].to_vec();
845 buf.drain(..4 + frame_len);
846
847 let frame = decode_protobuf_frame(&payload)?;
848 let done = frame.done;
849 yield frame;
850 if done {
851 return;
852 }
853 }
854 }
855 }
856}