use log::*;
use log::Level::Debug;
const MAX_FRAME_LEN: i64 = 1048576;
#[derive(Debug)]
pub struct WsState {
opcode: u8,
masking_key: [u8; 4],
frame_len: usize,
frame_pos: usize,
frame_state: WsParserState,
frame_len_idx: u8,
frame_len_data: i64, masking_key_len_idx: usize,
}
impl WsState {
pub fn new() -> WsState {
WsState {
opcode: 0,
masking_key: [0; 4],
frame_len: 0,
frame_pos: 0,
frame_state: WsParserState::SwStart,
frame_len_idx: 0,
frame_len_data: 0,
masking_key_len_idx: 0,
}
}
pub fn frame_len(&self) -> usize {
self.frame_len
}
fn reset(&mut self) {
self.opcode = 0;
self.masking_key = [0; 4];
self.frame_len = 0;
self.frame_pos = 0;
self.frame_state = WsParserState::SwStart;
self.frame_len_idx = 0;
self.frame_len_data = 0;
self.masking_key_len_idx = 0;
}
}
fn ws_skip(buffer: &mut [u8], pos: usize, end: usize, num: usize) {
buffer.copy_within(pos..end, pos - num);
}
fn ws_demunge_buffer(ws: &mut WsState, buffer: &mut [u8], pos: usize, end: usize) -> usize {
let mut p = pos;
while p < end && ws.frame_pos < ws.frame_len {
buffer[p] = buffer[p] ^ ws.masking_key[ws.frame_pos % 4];
p += 1;
ws.frame_pos += 1;
}
return p;
}
fn ws_demunge_frame(ws: &mut WsState, buffer: &mut [u8], pos: usize, end: usize, frame_lengths: &mut Box<Vec<usize>>) -> Result<usize, ()> {
let p = ws_demunge_buffer(ws, buffer, pos, end);
if ws.frame_pos == ws.frame_len {
if ws.opcode == 0x81 {
if p != pos {
if ws.frame_len == 1 && buffer[p - 1] == b'\n' {
debug!("ws ecg");
}
else if buffer[p - 1] != b'\0' {
warn!("ws not a stomp message, ends with '{}'", buffer[p - 1]);
if log_enabled!(Debug) {
debug!("ws not a stomp message buffer={:?} {:?}", buffer, String::from_utf8_lossy(&buffer[pos..end]));
}
return Err(());
}
else {
frame_lengths.push(ws.frame_len);
}
}
}
}
return Ok(p - pos);
}
#[derive(Debug, Clone, PartialEq)]
pub enum WsParserState {
SwStart = 0,
SwPayloadLen1,
SwPayloadLenExt,
SwMaskingKey,
SwPayload,
SwPayloadSkip
}
pub fn ws_demunge(ws: &mut WsState, buffer: &mut Box<[u8]>, pos: usize, mut end: usize, mut frame_lengths: &mut Box<Vec<usize>>) -> Result<(usize, usize), ()> {
let mut is_single_frame = 0;
let mut p: usize;
let mut ch: u8;
let mut opcode: u8;
let mut skipped: usize; let mut processed: usize;
let mut state = ws.frame_state.clone();
skipped = 0;
if pos == 0 {
is_single_frame += 1;
}
p = pos;
while p < end { ch = buffer[p];
match state {
WsParserState::SwStart => {
ws.reset();
ws.opcode = ch;
if (ch & 0x80) == 0x80 {
is_single_frame += 1;
}
state = WsParserState::SwPayloadLen1;
skipped += 1;
p += 1;
continue;
},
WsParserState::SwPayloadLen1 => {
if (ch & 0x80) != 0x80 {
warn!("BUG client data must be masked");
return Err(());
}
if (ch & 0x7f) == 126 {
ws.frame_len_idx = 2;
state = WsParserState::SwPayloadLenExt;
} else if (ch & 0x7f) == 127 {
ws.frame_len_idx = 8;
state = WsParserState::SwPayloadLenExt;
} else {
ws.frame_len = (ch & 0x7f) as usize;
state = WsParserState::SwMaskingKey;
}
skipped += 1;
p += 1;
continue;
},
WsParserState::SwPayloadLenExt => {
ws.frame_len_idx -= 1;
ws.frame_len_data |= (ch as i64) << (8 * ws.frame_len_idx) as i64;
if ws.frame_len_idx == 0 {
if ws.frame_len_data < 0 || ws.frame_len_data > MAX_FRAME_LEN {
debug!("ws frame flup {}", ws.frame_len_data);
return Err(());
}
ws.frame_len = ws.frame_len_data as usize;
state = WsParserState::SwMaskingKey;
}
skipped += 1;
p += 1;
continue;
},
WsParserState::SwMaskingKey => {
ws.masking_key[ws.masking_key_len_idx] = ch;
ws.masking_key_len_idx += 1;
if ws.masking_key_len_idx == 4 {
state = WsParserState::SwPayload;
}
skipped += 1;
p += 1;
continue;
},
WsParserState::SwPayload => {
if ws.frame_len > MAX_FRAME_LEN as usize {
debug!("ws frame flup {}", ws.frame_len);
return Err(());
}
opcode = ws.opcode & 0x0f;
if opcode == 0x08 {
warn!("ws connection close");
return Err(());
}
if opcode == 0x02 {
warn!("ws binary not supported");
return Err(());
}
match ws_demunge_frame(ws, buffer, p, end, &mut frame_lengths) {
Ok(sz) => {
processed = sz;
},
Err(_) => {
warn!("ws_demunge_frame() err");
return Err(());
}
}
if opcode == 0x09 || opcode == 0x0a {
debug!("unhandled ws ping");
}
if opcode > 0x02 {
debug!("unknown opcode skip processed data");
ws_skip(buffer, p + processed, end, processed + skipped);
p -= skipped;
end -= processed + skipped;
skipped = 0;
is_single_frame = 0;
if ws.frame_pos == ws.frame_len {
state = WsParserState::SwStart;
}
else if ws.frame_pos < ws.frame_len {
state = WsParserState::SwPayloadSkip;
}
else {
error!("BUG overread ws buf");
return Err(());
}
continue;
}
if is_single_frame == 2 && buffer.len() - pos - skipped == ws.frame_len {
ws.frame_state = WsParserState::SwStart;
return Ok((pos + skipped, end));
}
else {
ws_skip(buffer, p, end, skipped);
p = p - skipped + processed;
end -= skipped;
skipped = 0;
is_single_frame = 0;
if ws.frame_pos == ws.frame_len {
state = WsParserState::SwStart;
}
continue;
}
},
WsParserState::SwPayloadSkip => {
match ws_demunge_frame(ws, buffer, p, end, &mut frame_lengths) {
Ok(sz) => {
processed = sz;
},
Err(_) => {
warn!("ws_demunge_frame() error");
return Err(());
}
}
ws_skip(buffer, p, end, processed);
p = p - processed;
end -= processed;
if ws.frame_pos == ws.frame_len {
state = WsParserState::SwStart;
}
continue;
}
}
}
ws.frame_state = state;
if p == pos {
return Ok((pos, end));
}
return Ok((pos + skipped, end));
}
#[cfg(test)]
mod tests {
use super::*;
const LEN: usize = 20;
fn setup_frame_hdr(buffer: &mut Box<[u8]>, len: usize) {
buffer[0] = 0x81;
buffer[1] = len as u8 | 0x80; buffer[2] = 0x00;
buffer[3] = 0x00;
buffer[4] = 0x00;
buffer[5] = 0x00;
}
fn setup_frame_data(buffer: &mut Box<[u8]>) {
let mut p = 6;
while p < LEN + 5 {
buffer[p] = b'a' + p as u8;
p += 1;
}
}
fn set_mask(buffer: &mut Box<[u8]>) {
buffer[2] = 1;
buffer[3] = 2;
buffer[4] = 3;
buffer[5] = 4;
}
fn mask_frame_data(buffer: &mut Box<[u8]>) {
let mut p = 6;
let mut m = 1;
while p < LEN + 6 {
buffer[p] = buffer[p] ^ m;
p += 1;
m += 1;
if m > 4 {
m = 1;
}
}
}
#[test]
fn test_happy_noop_mask() {
let mut frame_lengths = Box::new(vec![]);
let mut buffer: Box<[u8]> = Box::new([0; 6 + LEN]);
let pos: usize = 0;
let end = buffer.len();
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
let mut ws = WsState::new();
match ws_demunge(&mut ws, &mut buffer, pos, end, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={} buffer={:?}", new_pos, new_end, String::from_utf8_lossy(&buffer[new_pos..new_end]));
assert_eq!(pos + 6, new_pos, "new_pos test");
assert_eq!(LEN, new_end - new_pos, "len test");
assert_eq!(b"ghijklmnopqrstuvwxy\0", &buffer[new_pos..new_end]);
},
_ => panic!("parse error"),
}
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
let mut buffer1: Box<[u8]> = Box::new([0; 16]);
buffer1.copy_from_slice(&buffer[0..16]);
let pos1 = 0;
let end1 = buffer1.len();
let mut buffer2: Box<[u8]> = Box::new([0; 10]);
buffer2.copy_from_slice(&buffer[16..26]);
let pos2 = 0;
let end2 = buffer2.len();
match ws_demunge(&mut ws, &mut buffer1, pos1, end1, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
assert_eq!(0, new_pos);
assert_eq!(10, new_end);
},
_ => panic!("parse error"),
}
match ws_demunge(&mut ws, &mut buffer2, pos2, end2, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("new_pos={}, new_end={} buffer={:?}", new_pos, new_end, String::from_utf8_lossy(&buffer2[0..10]));
println!("WsState {:?}", ws)
},
_ => panic!("parse error"),
}
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 1);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 2);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 3);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 4);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 5);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 6);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 7);
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
re_test(&mut ws, buffer.clone(), 0, 26, 8);
}
fn re_test(mut ws: &mut WsState, buffer: Box<[u8]>, _pos: usize, _len: usize, split_at: usize) {
let mut frame_lengths = Box::new(vec![]);
let mut buffer1: Box<[u8]> = vec![0; split_at].into_boxed_slice();
buffer1.copy_from_slice(&buffer[0..split_at]);
let pos1 = 0;
let end1 = buffer1.len();
let mut buffer2: Box<[u8]> = vec![0; buffer.len() - split_at].into_boxed_slice();
buffer2.copy_from_slice(&buffer[split_at..buffer.len()]);
let pos2 = 0;
let end2 = buffer2.len();
match ws_demunge(&mut ws, &mut buffer1, pos1, end1, &mut frame_lengths) {
Ok((_new_pos, _new_end)) => {
},
_ => panic!("parse error"),
}
match ws_demunge(&mut ws, &mut buffer2, pos2, end2, &mut frame_lengths) {
Ok((_new_pos, _new_end)) => {
},
_ => panic!("parse error"),
}
}
#[test]
fn test_happy_with_mask() {
let mut frame_lengths = Box::new(vec![]);
let mut buffer: Box<[u8]> = Box::new([0; 6 + LEN]);
let pos: usize = 0;
let end = buffer.len();
setup_frame_hdr(&mut buffer, LEN);
setup_frame_data(&mut buffer);
set_mask(&mut buffer);
mask_frame_data(&mut buffer);
let mut ws = WsState::new();
match ws_demunge(&mut ws, &mut buffer, pos, end, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={} buffer={:?}", new_pos, new_end, String::from_utf8_lossy(&buffer[new_pos..new_end]));
assert_eq!(LEN, new_end - new_pos, "len test");
assert_eq!(pos + 6, new_pos, "new_pos test");
assert_eq!(b"ghijklmnopqrstuvwxy\0", &buffer[new_pos..new_end]);
},
_ => panic!("parse error"),
}
}
#[test]
fn test_skip_ping_frames() {
let mut frame_lengths = Box::new(vec![]);
let mut buffer: Box<[u8]> = Box::new([0; 6 + 4 + 8 + 6 + 16]);
let pos: usize = 0;
let end = buffer.len();
let prefix = "CONN";
setup_frame_hdr(&mut buffer, prefix.len());
buffer[0] = 0x01;
buffer[6..6+4].copy_from_slice(prefix.as_bytes());
buffer[10] = 0x89; buffer[11] = 0x82; buffer[12] = 0x00; buffer[13] = 0x00;
buffer[14] = 0x00;
buffer[15] = 0x00;
buffer[16] = 0xaa; buffer[17] = 0xbb;
let suffix = "ECT\n1234:1234\n\n\0";
buffer[18] = 0x80;
buffer[19] = suffix.len() as u8 | 0x80; buffer[20] = 0x00;
buffer[21] = 0x00;
buffer[22] = 0x00;
buffer[23] = 0x00;
buffer[24..end].copy_from_slice(suffix.as_bytes());
let mut ws = WsState::new();
match ws_demunge(&mut ws, &mut buffer, pos, end, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={} buffer={:?}", new_pos, new_end, String::from_utf8_lossy(&buffer[new_pos..new_end]));
assert_eq!(prefix.len() + suffix.len(), new_end - new_pos, "len test");
assert_eq!(0, new_pos, "new_pos test");
},
_ => panic!("parse error"),
}
}
#[test]
pub fn test_mask() {
if (0xaa & 0x80) != 0x80 {
panic!("here");
}
println!("OK");
}
#[test]
pub fn test_firefox_media_header() {
let mut frame_lengths = Box::new(vec![]);
let mut ws = WsState::new();
let mut buffer: Box<[u8]> = Box::new([0; 14]);
buffer[0] = 129; buffer[1] = 255; buffer[2] = 0;
buffer[3] = 0;
buffer[4] = 0;
buffer[5] = 0;
buffer[6] = 0;
buffer[7] = 6; buffer[8] = 253; buffer[9] = 129; buffer[10] = 212; buffer[11] = 137; buffer[12] = 3; buffer[13] = 53;
match ws_demunge(&mut ws, &mut buffer, 0, 14, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={}, state={:?}", new_pos, new_end, ws.frame_state);
assert_eq!(new_pos, 14); assert_eq!(new_end, 14);
assert_eq!(ws.frame_len, 458113);
},
_ => panic!("parse error"),
}
}
#[test]
pub fn test_firefox_media_header_partial() {
let mut frame_lengths = Box::new(vec![]);
let mut ws = WsState::new();
let mut buffer: Box<[u8]> = Box::new([0; 7]);
buffer[0] = 129; buffer[1] = 255; buffer[2] = 0;
buffer[3] = 0;
buffer[4] = 0;
buffer[5] = 0;
buffer[6] = 0;
match ws_demunge(&mut ws, &mut buffer, 0, 7, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={}, state={:?}", new_pos, new_end, ws.frame_state);
assert_eq!(new_pos, 7); assert_eq!(new_end, 7);
},
_ => panic!("parse error"),
}
let mut buffer: Box<[u8]> = Box::new([0; 14]);
buffer[7] = 6; buffer[8] = 253; buffer[9] = 129; buffer[10] = 212; buffer[11] = 137; buffer[12] = 3; buffer[13] = 53;
match ws_demunge(&mut ws, &mut buffer, 7, 14, &mut frame_lengths) {
Ok((new_pos, new_end)) => {
println!("pos={}, end={}, state={:?}", new_pos, new_end, ws.frame_state);
assert_eq!(new_pos, 14); assert_eq!(new_end, 14);
assert_eq!(ws.frame_len, 458113);
},
_ => panic!("parse error"),
}
}
}