Skip to main content

braid_http/protocol/
multiplex.rs

1//! Multiplexing protocol constants and framing for Braid-HTTP.
2//!
3//! Implements the framing logic for Braid Multiplexing Protocol v1.0.
4
5/// The version of the multiplexing protocol implemented.
6pub const MULTIPLEX_VERSION: &str = "1.0";
7
8/// Header used to specify the multiplexing version.
9pub const HEADER_MULTIPLEX_VERSION: &str = "Multiplex-Version";
10
11/// Header used to specify the multiplexing ID and request ID.
12pub const HEADER_MULTIPLEX_THROUGH: &str = "Multiplex-Through";
13
14/// Braid-specific status code for "Responded via multiplexer"
15pub const STATUS_MULTIPLEX_REDIRECT: u16 = 293;
16
17/// Events in the multiplexing stream.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum MultiplexEvent {
20    /// "start response <request_id>"
21    StartResponse(String),
22    /// "<N> bytes for response <request_id>"
23    Data(String, Vec<u8>),
24    /// "close response <request_id>"
25    CloseResponse(String),
26}
27
28impl std::fmt::Display for MultiplexEvent {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            MultiplexEvent::StartResponse(id) => write!(f, "start response {}\r\n", id),
32            MultiplexEvent::Data(id, data) => {
33                write!(f, "{} bytes for response {}\r\n", data.len(), id)
34            }
35            MultiplexEvent::CloseResponse(id) => write!(f, "close response {}\r\n", id),
36        }
37    }
38}
39
40/// State machine for parsing the multiplexing protocol.
41#[derive(Debug, Default, Clone)]
42enum ParserState {
43    #[default]
44    Header,
45    Data {
46        id: String,
47        remaining: usize,
48    },
49}
50
51/// A parser for Braid Multiplexing Protocol streams.
52pub struct MultiplexParser {
53    buffer: Vec<u8>,
54    state: ParserState,
55}
56
57impl Default for MultiplexParser {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl MultiplexParser {
64    /// Creates a new MultiplexParser.
65    pub fn new() -> Self {
66        Self {
67            buffer: Vec::new(),
68            state: ParserState::Header,
69        }
70    }
71
72    /// Feeds data into the parser and returns any complete events found.
73    pub fn feed(&mut self, data: &[u8]) -> Result<Vec<MultiplexEvent>, String> {
74        self.buffer.extend_from_slice(data);
75        let mut events = Vec::new();
76
77        loop {
78            match self.state.clone() {
79                ParserState::Header => {
80                    // Look for \r\n
81                    let mut found_newline = false;
82                    let mut line_end = 0;
83
84                    for i in 0..self.buffer.len() {
85                        if self.buffer[i] == b'\n' {
86                            line_end = i + 1;
87                            found_newline = true;
88                            break;
89                        }
90                    }
91
92                    if !found_newline {
93                        break;
94                    }
95
96                    let line_bytes = &self.buffer[..line_end];
97                    let line = String::from_utf8_lossy(line_bytes);
98
99                    if let Some(suffix) = line.strip_prefix("start response ") {
100                        let id = suffix.trim().to_string();
101                        events.push(MultiplexEvent::StartResponse(id));
102                        self.consume(line_end);
103                    } else if line.contains(" bytes for response ") {
104                        let parts: Vec<&str> = line.splitn(2, " bytes for response ").collect();
105                        if parts.len() == 2 {
106                            let size_str = parts[0].trim_start_matches(['\r', '\n']).trim();
107                            if let Ok(size) = size_str.parse::<usize>() {
108                                let id = parts[1].trim().to_string();
109                                self.state = ParserState::Data {
110                                    id,
111                                    remaining: size,
112                                };
113                                self.consume(line_end);
114                            } else {
115                                return Err(format!("Invalid size in multiplex header: {}", line));
116                            }
117                        } else {
118                            return Err(format!("Invalid multiplex header: {}", line));
119                        }
120                    } else if let Some(suffix) = line.strip_prefix("close response ") {
121                        let id = suffix.trim().to_string();
122                        events.push(MultiplexEvent::CloseResponse(id));
123                        self.consume(line_end);
124                    } else if line.trim().is_empty() {
125                        self.consume(line_end);
126                    } else {
127                        return Err(format!("Unknown multiplex header: {}", line));
128                    }
129                }
130                ParserState::Data { id, remaining } => {
131                    if self.buffer.len() >= remaining {
132                        let data = self.buffer[..remaining].to_vec();
133                        events.push(MultiplexEvent::Data(id, data));
134                        self.consume(remaining);
135                        self.state = ParserState::Header;
136                    } else {
137                        break;
138                    }
139                }
140            }
141        }
142
143        Ok(events)
144    }
145
146    fn consume(&mut self, n: usize) {
147        self.buffer.drain(0..n);
148    }
149}