#[cfg(test)]
#[path = "sairedis_parser_tests.rs"]
mod sairedis_parser_tests;
use crate::record::{ExpandedField, ExpandedValue, LogLevel, LogRecord};
use crate::traits::LogParser;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use std::cell::RefCell;
use std::sync::Arc;
const KNOWN_OPS: &[u8] = b"crsgGpCRSBqQnaA";
#[derive(Debug)]
pub struct SairedisParser {
last_get_context: RefCell<Option<String>>,
last_get_component: RefCell<Option<String>>,
last_query_context: RefCell<Option<String>>,
last_query_component: RefCell<Option<String>>,
}
impl Default for SairedisParser {
fn default() -> Self {
Self::new()
}
}
impl SairedisParser {
pub fn new() -> Self {
Self {
last_get_context: RefCell::new(None),
last_get_component: RefCell::new(None),
last_query_context: RefCell::new(None),
last_query_component: RefCell::new(None),
}
}
pub fn parse_shared(
&self,
raw: &str,
source: &Arc<str>,
loader_id: &Arc<str>,
id: u64,
) -> Option<LogRecord> {
self.parse_inner(raw, source, loader_id, id)
}
fn parse_inner(
&self,
raw: &str,
source: &Arc<str>,
loader_id: &Arc<str>,
id: u64,
) -> Option<LogRecord> {
let b = raw.as_bytes();
if b.len() < 24 {
return None;
}
if b[4] != b'-' || b[7] != b'-' || b[10] != b'.' || b[13] != b':' || b[16] != b':' {
return None;
}
if b[19] != b'.' {
return None;
}
let year = dig4(b, 0)? as i32;
let month = dig2(b, 5)?;
let day = dig2(b, 8)?;
let hour = dig2(b, 11)?;
let min = dig2(b, 14)?;
let sec = dig2(b, 17)?;
let pipe1 = memchr::memchr(b'|', &b[20..])? + 20;
let frac_bytes = &b[20..pipe1];
let micros = parse_fractional_micros(frac_bytes);
let date = NaiveDate::from_ymd_opt(year, month, day)?;
let time = NaiveTime::from_hms_micro_opt(hour, min, sec, micros)?;
let naive = NaiveDateTime::new(date, time);
let timestamp: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, Utc);
let op_start = pipe1 + 1;
if op_start >= b.len() {
return None;
}
let op = b[op_start];
let after_op = op_start + 1;
if after_op < b.len() && b[after_op] != b'|' {
return None; }
let detail_start = if after_op < b.len() {
after_op + 1 } else {
b.len()
};
let detail = if detail_start < b.len() {
&b[detail_start..]
} else {
&[]
};
let (function, component, context, message) = match op {
b'c' => {
let (comp, ctx, msg) = self.parse_single_op(detail);
("Create".to_string(), comp, ctx, msg)
}
b's' => {
let (comp, ctx, msg) = self.parse_single_op(detail);
("Set".to_string(), comp, ctx, msg)
}
b'g' => {
let (comp, ctx, msg) = self.parse_single_op(detail);
*self.last_get_context.borrow_mut() = ctx.clone();
*self.last_get_component.borrow_mut() = comp.clone();
("Get".to_string(), comp, ctx, msg)
}
b'p' => {
let (comp, ctx, msg) = self.parse_single_op(detail);
("CounterPoll".to_string(), comp, ctx, msg)
}
b'r' => {
let (comp, ctx) = Self::parse_type_context(detail);
("Remove".to_string(), comp, ctx, String::new())
}
b'G' => {
let ctx = self.last_get_context.borrow().clone();
let comp = self.last_get_component.borrow().clone();
let msg = str_from_bytes(detail);
("GetResponse".to_string(), comp, ctx, msg)
}
b'C' => {
let (comp, msg) = Self::parse_bulk_op(detail);
("BulkCreate".to_string(), comp, None, msg)
}
b'R' => {
let (comp, msg) = Self::parse_bulk_op(detail);
("BulkRemove".to_string(), comp, None, msg)
}
b'S' => {
let (comp, msg) = Self::parse_bulk_op(detail);
("BulkSet".to_string(), comp, None, msg)
}
b'B' => {
let (comp, msg) = Self::parse_bulk_op(detail);
("BulkGet".to_string(), comp, None, msg)
}
b'q' => {
let (name, comp, ctx, msg) = self.parse_query(detail);
*self.last_query_context.borrow_mut() = ctx.clone();
*self.last_query_component.borrow_mut() = comp.clone();
(format!("Query: {}", name), comp, ctx, msg)
}
b'Q' => {
let (name, msg) = Self::parse_query_response(detail);
let ctx = self.last_query_context.borrow().clone();
let comp = self.last_query_component.borrow().clone();
(format!("QueryResponse: {}", name), comp, ctx, msg)
}
b'n' => {
let (name, msg) = Self::parse_notification(detail);
(format!("Notification: {}", name), None, None, msg)
}
b'a' => {
let key = str_from_bytes(detail);
("NotifySyncd".to_string(), None, Some(key.clone()), key)
}
b'A' => {
let status = str_from_bytes(detail);
("NotifySyncdResponse".to_string(), None, None, status)
}
_ => {
let op_str = str_from_bytes(&[op]);
let msg = str_from_bytes(detail);
(op_str, None, None, msg)
}
};
let level = match op {
b'n' => Some(LogLevel::Notice),
_ => Some(LogLevel::Info),
};
let expanded = build_expanded(op, &function, &component, &context, &message);
Some(LogRecord {
id,
timestamp,
level,
source: Arc::clone(source),
pid: None,
tid: None,
component_name: component,
process_name: None,
hostname: None,
container: None,
context,
function: Some(function),
message,
raw: String::new(), metadata: None,
loader_id: Arc::clone(loader_id),
expanded,
})
}
fn parse_single_op(&self, detail: &[u8]) -> (Option<String>, Option<String>, String) {
if detail.is_empty() {
return (None, None, String::new());
}
let first_pipe = memchr::memchr(b'|', detail);
let type_ctx_bytes = match first_pipe {
Some(pos) => &detail[..pos],
None => detail,
};
let (comp, ctx) = Self::parse_type_context(type_ctx_bytes);
let message = match first_pipe {
Some(pos) if pos + 1 < detail.len() => str_from_bytes(&detail[pos + 1..]),
_ => String::new(),
};
(comp, ctx, message)
}
fn parse_type_context(bytes: &[u8]) -> (Option<String>, Option<String>) {
if bytes.is_empty() {
return (None, None);
}
let colon_pos = memchr::memchr(b':', bytes);
match colon_pos {
Some(pos) if pos > 0 => {
let comp = str_from_bytes(&bytes[..pos]);
let ctx = if pos + 1 < bytes.len() {
Some(str_from_bytes(&bytes[pos + 1..]))
} else {
None
};
(Some(comp), ctx)
}
_ => {
(Some(str_from_bytes(bytes)), None)
}
}
}
fn parse_bulk_op(detail: &[u8]) -> (Option<String>, String) {
if detail.is_empty() {
return (None, String::new());
}
let double_pipe = find_double_pipe(detail);
match double_pipe {
Some(pos) => {
let comp = str_from_bytes(&detail[..pos]);
let msg = str_from_bytes(&detail[pos..]); (Some(comp), msg)
}
None => {
(None, str_from_bytes(detail))
}
}
}
fn parse_query(&self, detail: &[u8]) -> (String, Option<String>, Option<String>, String) {
if detail.is_empty() {
return (String::new(), None, None, String::new());
}
let pipe1 = memchr::memchr(b'|', detail);
let name = match pipe1 {
Some(pos) => str_from_bytes(&detail[..pos]),
None => return (str_from_bytes(detail), None, None, String::new()),
};
let rest = &detail[pipe1.unwrap() + 1..];
let pipe2 = memchr::memchr(b'|', rest);
let (comp, ctx, msg) = match pipe2 {
Some(pos) => {
let ctx_str = str_from_bytes(&rest[..pos]);
let (comp, ctx) = Self::parse_type_context(rest[..pos].as_ref());
let msg = if pos + 1 < rest.len() {
str_from_bytes(&rest[pos + 1..])
} else {
String::new()
};
let final_ctx = ctx.or(Some(ctx_str));
(comp, final_ctx, msg)
}
None => {
let (comp, ctx) = Self::parse_type_context(rest);
(comp, ctx, String::new())
}
};
(name, comp, ctx, msg)
}
fn parse_query_response(detail: &[u8]) -> (String, String) {
if detail.is_empty() {
return (String::new(), String::new());
}
let pipe1 = memchr::memchr(b'|', detail);
match pipe1 {
Some(pos) => {
let name = str_from_bytes(&detail[..pos]);
let msg = if pos + 1 < detail.len() {
str_from_bytes(&detail[pos + 1..])
} else {
String::new()
};
(name, msg)
}
None => (str_from_bytes(detail), String::new()),
}
}
fn parse_notification(detail: &[u8]) -> (String, String) {
if detail.is_empty() {
return (String::new(), String::new());
}
let pipe1 = memchr::memchr(b'|', detail);
match pipe1 {
Some(pos) => {
let name = str_from_bytes(&detail[..pos]);
let mut msg_end = detail.len();
if msg_end > pos + 1 && detail[msg_end - 1] == b'|' {
msg_end -= 1; }
let msg = if pos + 1 < msg_end {
str_from_bytes(&detail[pos + 1..msg_end])
} else {
String::new()
};
(name, msg)
}
None => (str_from_bytes(detail), String::new()),
}
}
}
fn build_expanded(
op: u8,
function: &str,
component: &Option<String>,
context: &Option<String>,
message: &str,
) -> Option<Vec<ExpandedField>> {
let mut fields = Vec::new();
let is_response = matches!(op, b'G' | b'A' | b'Q');
fields.push(ExpandedField {
label: "Operation".to_string(),
value: ExpandedValue::Text(function.to_string()),
});
if let Some(obj_type) = component {
fields.push(ExpandedField {
label: "Object Type".to_string(),
value: ExpandedValue::Text(obj_type.clone()),
});
}
if let Some(oid) = context {
fields.push(ExpandedField {
label: "OID".to_string(),
value: ExpandedValue::Text(oid.clone()),
});
}
let attrs_message = if is_response && !message.is_empty() {
let first_pipe = message.find('|');
let (status_str, remaining) = match first_pipe {
Some(pos) => (&message[..pos], &message[pos + 1..]),
None => (message, ""),
};
let status = if let Some(eq_pos) = status_str.find('=') {
&status_str[eq_pos + 1..]
} else {
status_str
};
if !status.is_empty() {
fields.push(ExpandedField {
label: "Status".to_string(),
value: ExpandedValue::Text(status.to_string()),
});
}
remaining
} else {
message
};
if !attrs_message.is_empty() {
let pairs: Vec<(String, ExpandedValue)> = attrs_message
.split('|')
.filter(|s| !s.is_empty())
.map(|attr| {
if let Some(pos) = attr.find('=') {
let k = &attr[..pos];
let v = &attr[pos + 1..];
(k.to_string(), ExpandedValue::Text(v.to_string()))
} else {
(attr.to_string(), ExpandedValue::Text(String::new()))
}
})
.collect();
if !pairs.is_empty() {
fields.push(ExpandedField {
label: "Attributes".to_string(),
value: ExpandedValue::KeyValue(pairs),
});
}
}
if matches!(op, b'G' | b'Q') && context.is_some() {
fields.push(ExpandedField {
label: "Request Context".to_string(),
value: ExpandedValue::Text(context.as_ref().unwrap().clone()),
});
}
Some(fields)
}
impl LogParser for SairedisParser {
fn parse(&self, raw: &str, source: &str, loader_id: &str, id: u64) -> Option<LogRecord> {
let source = Arc::from(source);
let loader_id = Arc::from(loader_id);
self.parse_shared(raw, &source, &loader_id, id)
}
fn name(&self) -> &str {
"sairedis"
}
}
pub fn looks_like_sairedis(line: &str) -> bool {
let b = line.as_bytes();
if b.len() < 24 {
return false;
}
if b[4] != b'-' || b[7] != b'-' || b[10] != b'.' {
return false;
}
let pipe1 = match memchr::memchr(b'|', &b[20..]) {
Some(pos) => 20 + pos,
None => return false,
};
let op_pos = pipe1 + 1;
if op_pos + 1 >= b.len() {
return false;
}
let op = b[op_pos];
let after_op = op_pos + 1;
if b[after_op] != b'|' {
return false;
}
KNOWN_OPS.contains(&op)
}
#[inline]
fn dig2(b: &[u8], pos: usize) -> Option<u32> {
let d0 = b[pos].wrapping_sub(b'0');
let d1 = b[pos + 1].wrapping_sub(b'0');
if d0 > 9 || d1 > 9 {
return None;
}
Some(d0 as u32 * 10 + d1 as u32)
}
#[inline]
fn dig4(b: &[u8], pos: usize) -> Option<u32> {
let d0 = b[pos].wrapping_sub(b'0') as u32;
let d1 = b[pos + 1].wrapping_sub(b'0') as u32;
let d2 = b[pos + 2].wrapping_sub(b'0') as u32;
let d3 = b[pos + 3].wrapping_sub(b'0') as u32;
if d0 > 9 || d1 > 9 || d2 > 9 || d3 > 9 {
return None;
}
Some(d0 * 1000 + d1 * 100 + d2 * 10 + d3)
}
#[inline]
fn parse_fractional_micros(bytes: &[u8]) -> u32 {
let mut result: u32 = 0;
let mut digits = 0;
for &byte in bytes {
let d = byte.wrapping_sub(b'0');
if d > 9 {
break;
}
if digits < 6 {
result = result * 10 + d as u32;
digits += 1;
}
}
while digits < 6 {
result *= 10;
digits += 1;
}
result
}
#[inline]
fn str_from_bytes(bytes: &[u8]) -> String {
unsafe { std::str::from_utf8_unchecked(bytes) }.to_string()
}
fn find_double_pipe(bytes: &[u8]) -> Option<usize> {
if bytes.len() < 2 {
return None;
}
(0..bytes.len() - 1).find(|&i| bytes[i] == b'|' && bytes[i + 1] == b'|')
}