#[derive(Debug, Clone, PartialEq)]
pub enum FieldValue {
Value(String),
Null,
Unchanged,
}
impl FieldValue {
pub fn as_str(&self) -> Option<&str> {
match self {
FieldValue::Value(s) => Some(s.as_str()),
_ => None,
}
}
pub fn parse_f64(&self) -> Option<f64> {
self.as_str()?.parse().ok()
}
pub fn parse_i64(&self) -> Option<i64> {
self.as_str()?.parse().ok()
}
pub fn parse_bool(&self) -> Option<bool> {
match self.as_str()? {
"true" => Some(true),
"false" => Some(false),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Frame {
Update {
item_index: usize,
fields: Vec<FieldValue>,
},
Probe,
Loop,
SyncError,
Error {
code: String,
message: String,
},
End {
cause: Option<String>,
},
Ok {
session_id: String,
},
Unknown(String),
}
#[must_use]
pub fn parse_line(line: &[u8]) -> Frame {
let s = match std::str::from_utf8(line) {
Ok(s) => s.trim_end_matches('\r'),
Err(_) => return Frame::Unknown(String::from_utf8_lossy(line).into_owned()),
};
if s == "PROBE" {
return Frame::Probe;
}
if s == "LOOP" {
return Frame::Loop;
}
if s == "SYNC ERROR" {
return Frame::SyncError;
}
if s == "OK" {
return Frame::Ok {
session_id: String::new(),
};
}
if let Some(rest) = s.strip_prefix("SessionId:") {
return Frame::Ok {
session_id: rest.to_owned(),
};
}
if let Some(code_rest) = s.strip_prefix("ERROR") {
let trimmed = code_rest.trim();
let (code, message) = if let Some(idx) = trimmed.find('\n') {
(trimmed[..idx].to_owned(), trimmed[idx + 1..].to_owned())
} else {
(trimmed.to_owned(), String::new())
};
return Frame::Error { code, message };
}
if let Some(rest) = s.strip_prefix("END") {
let cause = {
let trimmed = rest.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_owned())
}
};
return Frame::End { cause };
}
if let Some(pipe_pos) = s.find('|') {
let index_str = &s[..pipe_pos];
if let Ok(item_index) = index_str.parse::<usize>() {
let fields_str = &s[pipe_pos + 1..];
let fields = fields_str.split('|').map(decode_field).collect();
return Frame::Update { item_index, fields };
}
}
Frame::Unknown(s.to_owned())
}
pub fn parse_ok_block(block: &str) -> Option<(String, Vec<(String, String)>)> {
let mut lines = block.lines();
let first = lines.next()?.trim();
if first != "OK" {
return None;
}
let mut session_id = String::new();
let mut extras = Vec::new();
for line in lines {
let line = line.trim();
if line.is_empty() {
break;
}
if let Some(id) = line.strip_prefix("SessionId:") {
id.clone_into(&mut session_id);
} else if let Some((k, v)) = line.split_once(':') {
extras.push((k.to_owned(), v.to_owned()));
}
}
Some((session_id, extras))
}
fn decode_field(token: &str) -> FieldValue {
match token {
"$" => FieldValue::Value(String::new()),
"#" => FieldValue::Null,
"" => FieldValue::Unchanged,
other => FieldValue::Value(other.to_owned()),
}
}
pub fn merge_fields(state: &mut Vec<Option<String>>, fields: &[FieldValue]) {
if state.len() < fields.len() {
state.resize(fields.len(), None);
}
for (i, fv) in fields.iter().enumerate() {
match fv {
FieldValue::Value(s) => state[i] = Some(s.clone()),
FieldValue::Null => state[i] = None,
FieldValue::Unchanged => { }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_probe() {
assert_eq!(parse_line(b"PROBE"), Frame::Probe);
}
#[test]
fn test_loop() {
assert_eq!(parse_line(b"LOOP"), Frame::Loop);
}
#[test]
fn test_sync_error() {
assert_eq!(parse_line(b"SYNC ERROR"), Frame::SyncError);
}
#[test]
fn test_ok_bare() {
assert_eq!(
parse_line(b"OK"),
Frame::Ok {
session_id: String::new()
}
);
}
#[test]
fn test_session_id_line() {
assert_eq!(
parse_line(b"SessionId:Sabc123"),
Frame::Ok {
session_id: "Sabc123".into()
}
);
}
#[test]
fn test_error_bare() {
let f = parse_line(b"ERROR42 bad connection");
assert!(matches!(f, Frame::Error { .. }));
}
#[test]
fn test_end_no_cause() {
assert_eq!(parse_line(b"END"), Frame::End { cause: None });
}
#[test]
fn test_end_with_cause() {
assert_eq!(
parse_line(b"END session expired"),
Frame::End {
cause: Some("session expired".into())
}
);
}
#[test]
fn test_simple_update() {
let f = parse_line(b"1|100.5|101.0");
assert_eq!(
f,
Frame::Update {
item_index: 1,
fields: vec![
FieldValue::Value("100.5".into()),
FieldValue::Value("101.0".into()),
]
}
);
}
#[test]
fn test_empty_string_dollar() {
let f = parse_line(b"1|$|101.0");
assert_eq!(
f,
Frame::Update {
item_index: 1,
fields: vec![
FieldValue::Value(String::new()),
FieldValue::Value("101.0".into()),
]
}
);
}
#[test]
fn test_null_hash() {
let f = parse_line(b"1|#|101.0");
assert_eq!(
f,
Frame::Update {
item_index: 1,
fields: vec![FieldValue::Null, FieldValue::Value("101.0".into()),]
}
);
}
#[test]
fn test_unchanged_trailing_fields() {
let f = parse_line(b"2|BID_VALUE|OFR_VALUE");
assert_eq!(
f,
Frame::Update {
item_index: 2,
fields: vec![
FieldValue::Value("BID_VALUE".into()),
FieldValue::Value("OFR_VALUE".into()),
]
}
);
}
#[test]
fn test_update_with_empty_middle_field() {
let f = parse_line(b"1|100||200");
assert_eq!(
f,
Frame::Update {
item_index: 1,
fields: vec![
FieldValue::Value("100".into()),
FieldValue::Unchanged,
FieldValue::Value("200".into()),
]
}
);
}
#[test]
fn test_update_all_special() {
let f = parse_line(b"3|$|#|$");
assert_eq!(
f,
Frame::Update {
item_index: 3,
fields: vec![
FieldValue::Value(String::new()),
FieldValue::Null,
FieldValue::Value(String::new()),
]
}
);
}
#[test]
fn test_high_item_index() {
let f = parse_line(b"42|val");
assert_eq!(
f,
Frame::Update {
item_index: 42,
fields: vec![FieldValue::Value("val".into())]
}
);
}
#[test]
fn test_crlf_stripped() {
let f = parse_line(b"PROBE\r");
assert_eq!(f, Frame::Probe);
}
#[test]
fn test_unknown_line() {
let f = parse_line(b"SOMETHING_WEIRD");
assert!(matches!(f, Frame::Unknown(_)));
}
#[test]
fn test_parse_ok_block_basic() {
let block = "OK\r\nSessionId:Sabc\r\nControlAddress:push.lightstreamer.com\r\n\r\n";
let (sid, extras) = parse_ok_block(block).unwrap();
assert_eq!(sid, "Sabc");
assert!(extras.iter().any(|(k, _)| k == "ControlAddress"));
}
#[test]
fn test_parse_ok_block_missing_ok() {
assert!(parse_ok_block("ERROR\r\nSomething\r\n").is_none());
}
#[test]
fn test_parse_ok_block_no_session_id() {
let block = "OK\r\nControlAddress:push.ls.com\r\n\r\n";
let (sid, _) = parse_ok_block(block).unwrap();
assert_eq!(sid, "");
}
#[test]
fn test_merge_fields_all_new() {
let mut state: Vec<Option<String>> = vec![];
merge_fields(
&mut state,
&[
FieldValue::Value("100.0".into()),
FieldValue::Value("101.0".into()),
],
);
assert_eq!(state, vec![Some("100.0".into()), Some("101.0".into())]);
}
#[test]
fn test_merge_fields_unchanged_preserved() {
let mut state = vec![Some("100.0".into()), Some("101.0".into())];
merge_fields(
&mut state,
&[FieldValue::Unchanged, FieldValue::Value("102.0".into())],
);
assert_eq!(state, vec![Some("100.0".into()), Some("102.0".into())]);
}
#[test]
fn test_merge_fields_null_clears() {
let mut state = vec![Some("100.0".into()), Some("101.0".into())];
merge_fields(&mut state, &[FieldValue::Null, FieldValue::Unchanged]);
assert_eq!(state, vec![None, Some("101.0".into())]);
}
#[test]
fn test_merge_fields_extends_state() {
let mut state: Vec<Option<String>> = vec![Some("a".into())];
merge_fields(
&mut state,
&[
FieldValue::Unchanged,
FieldValue::Value("b".into()),
FieldValue::Value("c".into()),
],
);
assert_eq!(
state,
vec![Some("a".into()), Some("b".into()), Some("c".into())]
);
}
#[test]
fn test_field_value_parse_f64() {
assert_eq!(FieldValue::Value("1.5".into()).parse_f64(), Some(1.5));
assert_eq!(FieldValue::Null.parse_f64(), None);
assert_eq!(FieldValue::Unchanged.parse_f64(), None);
}
#[test]
fn test_field_value_parse_i64() {
assert_eq!(FieldValue::Value("42".into()).parse_i64(), Some(42));
}
#[test]
fn test_field_value_parse_bool() {
assert_eq!(FieldValue::Value("true".into()).parse_bool(), Some(true));
assert_eq!(FieldValue::Value("false".into()).parse_bool(), Some(false));
assert_eq!(FieldValue::Value("yes".into()).parse_bool(), None);
}
}