#[cfg(test)]
#[path = "swss_parser_tests.rs"]
mod swss_parser_tests;
use crate::record::{ExpandedField, ExpandedValue, LogRecord};
use crate::traits::LogParser;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use std::sync::Arc;
#[derive(Debug, Default)]
pub struct SwssParser;
impl SwssParser {
pub fn new() -> Self {
Self
}
pub fn parse_shared(
raw: &str,
source: &Arc<str>,
loader_id: &Arc<str>,
id: u64,
) -> Option<LogRecord> {
Self::parse_inner(raw, source, loader_id, id)
}
fn parse_inner(
raw: &str,
source: &Arc<str>,
loader_id: &Arc<str>,
id: u64,
) -> Option<LogRecord> {
let b = raw.as_bytes();
if b.len() < 22 {
return None;
}
if b[4] != b'-' || b[7] != b'-' || b[10] != b'.' || b[13] != b':' || b[16] != b':' {
return None;
}
let year = parse_u32(&b[0..4])? as i32;
let month = parse_u32(&b[5..7])?;
let day = parse_u32(&b[8..10])?;
let hour = parse_u32(&b[11..13])?;
let min = parse_u32(&b[14..16])?;
let sec = parse_u32(&b[17..19])?;
if b[19] != b'.' {
return None;
}
let pipe_pos = memchr::memchr(b'|', &b[20..])?;
let frac_end = 20 + pipe_pos;
let frac_str = std::str::from_utf8(&b[20..frac_end]).ok()?;
let micros = parse_fractional_micros(frac_str);
let date = NaiveDate::from_ymd_opt(year, month, day)?;
let time = NaiveTime::from_hms_micro_opt(hour, min, sec, micros)?;
let naive = NaiveDateTime::new(date, time);
let timestamp: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, Utc);
let content_start = frac_end + 1; if content_start >= b.len() {
return Some(LogRecord {
id,
timestamp,
level: None,
source: Arc::clone(source),
pid: None,
tid: None,
component_name: None,
process_name: None,
hostname: None,
container: None,
context: None,
function: None,
message: String::new(),
raw: raw.to_string(),
metadata: None,
loader_id: Arc::clone(loader_id),
expanded: None,
});
}
let content = &raw[content_start..];
let (component, context, function, message) = parse_content(content);
let expanded = build_expanded(&component, &context, &function, &message);
Some(LogRecord {
id,
timestamp,
level: None,
source: Arc::clone(source),
pid: None,
tid: None,
component_name: component,
process_name: None,
hostname: None,
container: None,
context,
function,
message,
raw: raw.to_string(),
metadata: None,
loader_id: Arc::clone(loader_id),
expanded,
})
}
}
impl LogParser for SwssParser {
fn parse(&self, raw: &str, source: &str, loader_id: &str, id: u64) -> Option<LogRecord> {
let source_arc = Arc::from(source);
let loader_arc = Arc::from(loader_id);
Self::parse_inner(raw, &source_arc, &loader_arc, id)
}
fn name(&self) -> &str {
"swss"
}
}
fn build_expanded(
component: &Option<String>,
context: &Option<String>,
function: &Option<String>,
message: &str,
) -> Option<Vec<ExpandedField>> {
let op = function.as_ref()?;
let attr_count = if message.is_empty() {
0
} else {
message.split('|').filter(|kv| kv.contains(':')).count()
};
let field_count =
1 + component.is_some() as usize + context.is_some() as usize + (attr_count > 0) as usize;
let mut fields = Vec::with_capacity(field_count);
fields.push(ExpandedField {
label: "Operation".to_string(),
value: ExpandedValue::Text(op.clone()),
});
if let Some(table) = component {
fields.push(ExpandedField {
label: "Table".to_string(),
value: ExpandedValue::Text(table.clone()),
});
}
if let Some(key) = context {
fields.push(ExpandedField {
label: "Key".to_string(),
value: ExpandedValue::Text(key.clone()),
});
}
if attr_count > 0 {
let mut pairs = Vec::with_capacity(attr_count);
for kv in message.split('|') {
if let Some(colon) = kv.find(':') {
pairs.push((
kv[..colon].to_string(),
ExpandedValue::Text(kv[colon + 1..].to_string()),
));
}
}
fields.push(ExpandedField {
label: "Attributes".to_string(),
value: ExpandedValue::KeyValue(pairs),
});
}
Some(fields)
}
fn parse_content(content: &str) -> (Option<String>, Option<String>, Option<String>, String) {
let first_pipe = match content.find('|') {
Some(pos) => pos,
None => {
return (None, None, None, content.to_string());
}
};
let first_segment = &content[..first_pipe];
let rest = &content[first_pipe + 1..];
let (table, key) = if let Some(colon_pos) = first_segment.find(':') {
(
&first_segment[..colon_pos],
Some(&first_segment[colon_pos + 1..]),
)
} else {
(first_segment, None)
};
if key.is_some() {
let (op, kv) = split_op_and_kv(rest);
(
Some(table.to_string()),
key.map(|k| k.to_string()),
Some(op.to_string()),
kv.unwrap_or_default(),
)
} else {
let (first_rest, after_first) = match rest.find('|') {
Some(pos) => (&rest[..pos], Some(&rest[pos + 1..])),
None => {
if is_known_op(rest) {
return (
Some(table.to_string()),
None,
Some(rest.to_string()),
String::new(),
);
}
return (None, None, None, content.to_string());
}
};
if is_known_op(first_rest) {
(
Some(table.to_string()),
None,
Some(first_rest.to_string()),
after_first.unwrap_or("").to_string(),
)
} else {
let (op, kv) = match after_first {
Some(rest2) => split_op_and_kv(rest2),
None => {
return (None, None, None, content.to_string());
}
};
(
Some(table.to_string()),
Some(first_rest.to_string()),
Some(op.to_string()),
kv.unwrap_or_default(),
)
}
}
}
fn split_op_and_kv(s: &str) -> (&str, Option<String>) {
match s.find('|') {
Some(pos) => (&s[..pos], Some(s[pos + 1..].to_string())),
None => (s, None),
}
}
fn is_known_op(s: &str) -> bool {
matches!(
s,
"SET" | "DEL" | "HSET" | "HDEL" | "GETRESPONSE" | "PLANNINGRESPONSE"
)
}
fn parse_u32(bytes: &[u8]) -> Option<u32> {
let mut result: u32 = 0;
for &b in bytes {
if !b.is_ascii_digit() {
return None;
}
result = result * 10 + (b - b'0') as u32;
}
Some(result)
}
fn parse_fractional_micros(s: &str) -> u32 {
let mut micros: u32 = 0;
let mut digits = 0;
for b in s.bytes() {
if digits >= 6 || !b.is_ascii_digit() {
break;
}
micros = micros * 10 + (b - b'0') as u32;
digits += 1;
}
if digits == 0 {
return 0;
}
for _ in digits..6 {
micros *= 10;
}
micros
}