braid_http/protocol/
multiplex.rs1pub const MULTIPLEX_VERSION: &str = "1.0";
7
8pub const HEADER_MULTIPLEX_VERSION: &str = "Multiplex-Version";
10
11pub const HEADER_MULTIPLEX_THROUGH: &str = "Multiplex-Through";
13
14pub const STATUS_MULTIPLEX_REDIRECT: u16 = 293;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum MultiplexEvent {
20 StartResponse(String),
22 Data(String, Vec<u8>),
24 CloseResponse(String),
26}
27
28impl std::fmt::Display for MultiplexEvent {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 MultiplexEvent::StartResponse(id) => write!(f, "start response {}\r\n", id),
32 MultiplexEvent::Data(id, data) => {
33 write!(f, "{} bytes for response {}\r\n", data.len(), id)
34 }
35 MultiplexEvent::CloseResponse(id) => write!(f, "close response {}\r\n", id),
36 }
37 }
38}
39
40#[derive(Debug, Default, Clone)]
42enum ParserState {
43 #[default]
44 Header,
45 Data {
46 id: String,
47 remaining: usize,
48 },
49}
50
51pub struct MultiplexParser {
53 buffer: Vec<u8>,
54 state: ParserState,
55}
56
57impl Default for MultiplexParser {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl MultiplexParser {
64 pub fn new() -> Self {
66 Self {
67 buffer: Vec::new(),
68 state: ParserState::Header,
69 }
70 }
71
72 pub fn feed(&mut self, data: &[u8]) -> Result<Vec<MultiplexEvent>, String> {
74 self.buffer.extend_from_slice(data);
75 let mut events = Vec::new();
76
77 loop {
78 match self.state.clone() {
79 ParserState::Header => {
80 let mut found_newline = false;
82 let mut line_end = 0;
83
84 for i in 0..self.buffer.len() {
85 if self.buffer[i] == b'\n' {
86 line_end = i + 1;
87 found_newline = true;
88 break;
89 }
90 }
91
92 if !found_newline {
93 break;
94 }
95
96 let line_bytes = &self.buffer[..line_end];
97 let line = String::from_utf8_lossy(line_bytes);
98
99 if let Some(suffix) = line.strip_prefix("start response ") {
100 let id = suffix.trim().to_string();
101 events.push(MultiplexEvent::StartResponse(id));
102 self.consume(line_end);
103 } else if line.contains(" bytes for response ") {
104 let parts: Vec<&str> = line.splitn(2, " bytes for response ").collect();
105 if parts.len() == 2 {
106 let size_str = parts[0].trim_start_matches(['\r', '\n']).trim();
107 if let Ok(size) = size_str.parse::<usize>() {
108 let id = parts[1].trim().to_string();
109 self.state = ParserState::Data {
110 id,
111 remaining: size,
112 };
113 self.consume(line_end);
114 } else {
115 return Err(format!("Invalid size in multiplex header: {}", line));
116 }
117 } else {
118 return Err(format!("Invalid multiplex header: {}", line));
119 }
120 } else if let Some(suffix) = line.strip_prefix("close response ") {
121 let id = suffix.trim().to_string();
122 events.push(MultiplexEvent::CloseResponse(id));
123 self.consume(line_end);
124 } else if line.trim().is_empty() {
125 self.consume(line_end);
126 } else {
127 return Err(format!("Unknown multiplex header: {}", line));
128 }
129 }
130 ParserState::Data { id, remaining } => {
131 if self.buffer.len() >= remaining {
132 let data = self.buffer[..remaining].to_vec();
133 events.push(MultiplexEvent::Data(id, data));
134 self.consume(remaining);
135 self.state = ParserState::Header;
136 } else {
137 break;
138 }
139 }
140 }
141 }
142
143 Ok(events)
144 }
145
146 fn consume(&mut self, n: usize) {
147 self.buffer.drain(0..n);
148 }
149}