use bytes::Bytes;
use futures_core::Stream;
use http_body_util::BodyExt;
use hyper::body::Incoming;
use std::pin::Pin;
use crate::libpod::error::PodmanError;
#[derive(Debug)]
pub enum LogOutput {
StdOut { message: Bytes },
StdErr { message: Bytes },
}
pub type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, PodmanError>> + Send>>;
pub(crate) fn parse_frame(buf: &[u8]) -> Option<(u8, Bytes, usize)> {
if buf.len() < 8 {
return None;
}
let size = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
if buf.len() < 8 + size {
return None;
}
let stream_type = buf[0];
let payload = Bytes::from(buf[8..8 + size].to_vec());
Some((stream_type, payload, 8 + size))
}
pub(crate) fn take_json_line(buf: &mut Vec<u8>) -> Option<Vec<u8>> {
let nl = buf.iter().position(|&b| b == b'\n')?;
let line: Vec<u8> = buf.drain(..nl + 1).take(nl).collect();
Some(line)
}
pub fn parse_multiplexed(body: Incoming) -> BoxStream<LogOutput> {
Box::pin(futures_util::stream::try_unfold(
(body, Vec::<u8>::new()),
|(mut body, mut buf)| async move {
loop {
if let Some((stream_type, payload, consumed)) = parse_frame(&buf) {
buf.drain(..consumed);
let output = match stream_type {
1 => LogOutput::StdOut { message: payload },
2 => LogOutput::StdErr { message: payload },
_ => continue, };
return Ok(Some((output, (body, buf))));
}
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
buf.extend_from_slice(&data);
}
}
Some(Err(e)) => return Err(PodmanError::from(e)),
None => return Ok(None),
}
}
},
))
}
pub fn parse_raw(body: Incoming) -> BoxStream<LogOutput> {
Box::pin(futures_util::stream::try_unfold(
body,
|mut body| async move {
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
if !data.is_empty() {
return Ok(Some((LogOutput::StdOut { message: data }, body)));
}
}
}
Some(Err(e)) => return Err(PodmanError::from(e)),
None => return Ok(None),
}
}
},
))
}
pub fn parse_json_lines<T: serde::de::DeserializeOwned + Send + 'static>(
body: Incoming,
) -> BoxStream<T> {
Box::pin(futures_util::stream::try_unfold(
(body, Vec::<u8>::new()),
|(mut body, mut buf)| async move {
loop {
if let Some(line) = take_json_line(&mut buf) {
if line.is_empty() {
continue;
}
let item: T = serde_json::from_slice(&line).map_err(PodmanError::Json)?;
return Ok(Some((item, (body, buf))));
}
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
buf.extend_from_slice(&data);
}
}
Some(Err(e)) => return Err(PodmanError::from(e)),
None => {
let line = std::mem::take(&mut buf);
if !line.is_empty() {
let item: T =
serde_json::from_slice(&line).map_err(PodmanError::Json)?;
return Ok(Some((item, (body, buf))));
}
return Ok(None);
}
}
}
},
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_frame_incomplete_header() {
assert!(parse_frame(&[0x01, 0x00, 0x00, 0x00]).is_none());
}
#[test]
fn parse_frame_header_present_payload_missing() {
let buf = [
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, b'a', b'b', b'c',
];
assert!(parse_frame(&buf).is_none());
}
#[test]
fn parse_frame_stdout_complete() {
let payload = b"hello";
let mut buf = vec![0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05];
buf.extend_from_slice(payload);
let (stype, data, consumed) = parse_frame(&buf).unwrap();
assert_eq!(stype, 1);
assert_eq!(data.as_ref(), b"hello");
assert_eq!(consumed, 13);
}
#[test]
fn parse_frame_stderr_complete() {
let payload = b"err";
let mut buf = vec![0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03];
buf.extend_from_slice(payload);
let (stype, data, consumed) = parse_frame(&buf).unwrap();
assert_eq!(stype, 2);
assert_eq!(data.as_ref(), b"err");
assert_eq!(consumed, 11);
}
#[test]
fn parse_frame_zero_length_payload() {
let buf = [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
let (stype, data, consumed) = parse_frame(&buf).unwrap();
assert_eq!(stype, 1);
assert!(data.is_empty());
assert_eq!(consumed, 8);
}
#[test]
fn parse_frame_leaves_remainder() {
let mut buf = vec![0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, b'h', b'i'];
buf.extend_from_slice(b"leftover");
let (_, data, consumed) = parse_frame(&buf).unwrap();
assert_eq!(data.as_ref(), b"hi");
assert_eq!(consumed, 10);
assert_eq!(&buf[consumed..], b"leftover");
}
#[test]
fn take_json_line_no_newline() {
let mut buf = b"partial line".to_vec();
assert!(take_json_line(&mut buf).is_none());
assert_eq!(buf, b"partial line");
}
#[test]
fn take_json_line_with_newline() {
let mut buf = b"line1\nline2".to_vec();
let line = take_json_line(&mut buf).unwrap();
assert_eq!(line, b"line1");
assert_eq!(buf, b"line2");
}
#[test]
fn take_json_line_empty_line() {
let mut buf = b"\nnext".to_vec();
let line = take_json_line(&mut buf).unwrap();
assert!(line.is_empty());
assert_eq!(buf, b"next");
}
#[test]
fn take_json_line_multiple_lines() {
let mut buf = b"a\nb\nc".to_vec();
assert_eq!(take_json_line(&mut buf).unwrap(), b"a");
assert_eq!(take_json_line(&mut buf).unwrap(), b"b");
assert!(take_json_line(&mut buf).is_none());
}
}