use bytes::Bytes;
use crate::action::NICIRA_VENDOR_ID;
use crate::multipart::{MultipartHeader, MultipartType};
use crate::message::{Message, MessageType};
use crate::{Match, Version};
const NXST_FLOW_MONITOR: u32 = 2;
pub mod monitor_flags {
pub const INITIAL: u16 = 1 << 0;
pub const ADD: u16 = 1 << 1;
pub const DELETE: u16 = 1 << 2;
pub const MODIFY: u16 = 1 << 3;
pub const ACTIONS: u16 = 1 << 4;
pub const OWN: u16 = 1 << 5;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlowUpdateEvent {
Added,
Deleted,
Modified,
}
impl FlowUpdateEvent {
fn from_u16(v: u16) -> crate::Result<Self> {
match v {
0 => Ok(Self::Added),
1 => Ok(Self::Deleted),
2 => Ok(Self::Modified),
_ => Err(crate::Error::Parse(format!("unknown flow update event: {v}"))),
}
}
}
#[derive(Debug)]
pub enum FlowUpdate {
Full(Box<FlowUpdateFull>),
Abbrev {
xid: u32,
},
}
#[derive(Debug)]
pub struct FlowUpdateFull {
pub event: FlowUpdateEvent,
pub reason: u16,
pub priority: u16,
pub idle_timeout: u16,
pub hard_timeout: u16,
pub table_id: u8,
pub cookie: u64,
pub match_fields: Match,
pub actions: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct FlowMonitorRequest {
pub id: u32,
pub flags: u16,
pub out_port: u16,
pub table_id: u8,
pub match_fields: Match,
}
impl FlowMonitorRequest {
pub fn new(id: u32) -> Self {
Self {
id,
flags: 0,
out_port: 0xffff, table_id: 0xff, match_fields: Match::new(),
}
}
pub fn all_changes(id: u32) -> Self {
Self::new(id).flags(
monitor_flags::INITIAL
| monitor_flags::ADD
| monitor_flags::DELETE
| monitor_flags::MODIFY
| monitor_flags::ACTIONS,
)
}
pub fn flags(mut self, flags: u16) -> Self {
self.flags = flags;
self
}
pub fn table(mut self, table_id: u8) -> Self {
self.table_id = table_id;
self
}
pub fn match_fields(mut self, m: Match) -> Self {
self.match_fields = m;
self
}
pub fn out_port(mut self, port: u16) -> Self {
self.out_port = port;
self
}
fn encode_body(&self) -> Vec<u8> {
let match_bytes = self.match_fields.encode_oxm_fields();
let match_len = match_bytes.len() as u16;
let mut buf = Vec::with_capacity(12 + match_bytes.len());
buf.extend(self.id.to_be_bytes());
buf.extend(self.flags.to_be_bytes());
buf.extend(self.out_port.to_be_bytes());
buf.extend(match_len.to_be_bytes());
buf.push(self.table_id);
buf.push(0);
buf.extend(match_bytes);
buf
}
pub fn to_message(&self, version: Version, xid: u32) -> Message {
let mp_header = MultipartHeader {
mp_type: MultipartType::Experimenter,
flags: 0,
};
let mut body = Vec::new();
body.extend(mp_header.encode());
body.extend(NICIRA_VENDOR_ID.to_be_bytes());
body.extend(NXST_FLOW_MONITOR.to_be_bytes());
body.extend(self.encode_body());
Message::new(version, MessageType::MultipartRequest, xid, Bytes::from(body))
}
}
pub fn parse_flow_monitor_reply(body: &[u8]) -> crate::Result<(Vec<FlowUpdate>, bool)> {
if body.len() < MultipartHeader::SIZE + 8 {
return Err(crate::Error::Parse(
"flow monitor reply too short".into(),
));
}
let mp_header = MultipartHeader::decode(body)?;
if mp_header.mp_type != MultipartType::Experimenter {
return Err(crate::Error::Parse(format!(
"expected Experimenter multipart type, got {:?}",
mp_header.mp_type
)));
}
let offset = MultipartHeader::SIZE;
let vendor = u32::from_be_bytes([
body[offset], body[offset + 1], body[offset + 2], body[offset + 3],
]);
let subtype = u32::from_be_bytes([
body[offset + 4], body[offset + 5], body[offset + 6], body[offset + 7],
]);
if vendor != NICIRA_VENDOR_ID {
return Err(crate::Error::Parse(format!(
"expected Nicira vendor ID 0x{NICIRA_VENDOR_ID:08x}, got 0x{vendor:08x}"
)));
}
if subtype != NXST_FLOW_MONITOR {
return Err(crate::Error::Parse(format!(
"expected NXST_FLOW_MONITOR subtype {NXST_FLOW_MONITOR}, got {subtype}"
)));
}
let mut updates = Vec::new();
let mut pos = offset + 8;
while pos + 4 <= body.len() {
let entry_len = u16::from_be_bytes([body[pos], body[pos + 1]]) as usize;
let event_code = u16::from_be_bytes([body[pos + 2], body[pos + 3]]);
if entry_len < 4 || pos + entry_len > body.len() {
break;
}
let update = if event_code == 3 {
if entry_len < 8 {
return Err(crate::Error::Parse("flow update abbrev too short".into()));
}
let xid = u32::from_be_bytes([
body[pos + 4], body[pos + 5], body[pos + 6], body[pos + 7],
]);
FlowUpdate::Abbrev { xid }
} else {
parse_flow_update_full(&body[pos..pos + entry_len], event_code)?
};
updates.push(update);
pos += entry_len;
}
Ok((updates, mp_header.has_more()))
}
fn parse_flow_update_full(data: &[u8], event_code: u16) -> crate::Result<FlowUpdate> {
const FIXED_SIZE: usize = 24;
if data.len() < FIXED_SIZE {
return Err(crate::Error::Parse("flow update entry too short".into()));
}
let entry_len = u16::from_be_bytes([data[0], data[1]]) as usize;
let event = FlowUpdateEvent::from_u16(event_code)?;
let reason = u16::from_be_bytes([data[4], data[5]]);
let priority = u16::from_be_bytes([data[6], data[7]]);
let idle_timeout = u16::from_be_bytes([data[8], data[9]]);
let hard_timeout = u16::from_be_bytes([data[10], data[11]]);
let match_len = u16::from_be_bytes([data[12], data[13]]) as usize;
let table_id = data[14];
let cookie = u64::from_be_bytes([
data[16], data[17], data[18], data[19],
data[20], data[21], data[22], data[23],
]);
let match_end = FIXED_SIZE + match_len;
if match_end > entry_len {
return Err(crate::Error::Parse("flow update match truncated".into()));
}
let match_fields = Match::decode_oxm(&data[FIXED_SIZE..match_end])?;
let actions = if match_end < entry_len {
data[match_end..entry_len].to_vec()
} else {
Vec::new()
};
Ok(FlowUpdate::Full(Box::new(FlowUpdateFull {
event,
reason,
priority,
idle_timeout,
hard_timeout,
table_id,
cookie,
match_fields,
actions,
})))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn monitor_request_defaults() {
let req = FlowMonitorRequest::new(1);
assert_eq!(req.id, 1);
assert_eq!(req.flags, 0);
assert_eq!(req.out_port, 0xffff);
assert_eq!(req.table_id, 0xff);
}
#[test]
fn monitor_request_all_changes() {
let req = FlowMonitorRequest::all_changes(42);
assert_eq!(req.id, 42);
assert_eq!(
req.flags,
monitor_flags::INITIAL
| monitor_flags::ADD
| monitor_flags::DELETE
| monitor_flags::MODIFY
| monitor_flags::ACTIONS
);
}
#[test]
fn monitor_request_builder() {
let req = FlowMonitorRequest::new(1)
.flags(monitor_flags::ADD | monitor_flags::DELETE)
.table(0)
.out_port(1);
assert_eq!(req.table_id, 0);
assert_eq!(req.out_port, 1);
assert_eq!(req.flags, monitor_flags::ADD | monitor_flags::DELETE);
}
#[test]
fn monitor_request_encode_body() {
let req = FlowMonitorRequest::new(1)
.flags(monitor_flags::ADD);
let body = req.encode_body();
assert_eq!(body.len(), 12);
assert_eq!(body[0..4], [0, 0, 0, 1]);
assert_eq!(body[4..6], [0, 2]);
assert_eq!(body[6..8], [0xff, 0xff]);
assert_eq!(body[8..10], [0, 0]);
assert_eq!(body[10], 0xff);
assert_eq!(body[11], 0);
}
#[test]
fn monitor_request_to_message() {
let req = FlowMonitorRequest::all_changes(1);
let msg = req.to_message(Version::Of13, 10);
assert_eq!(msg.header.msg_type, MessageType::MultipartRequest);
assert_eq!(msg.header.xid, 10);
assert!(msg.body.len() >= 28);
assert_eq!(msg.body[0..2], [0xff, 0xff]);
assert_eq!(msg.body[8..12], NICIRA_VENDOR_ID.to_be_bytes());
assert_eq!(msg.body[12..16], 2u32.to_be_bytes());
}
#[test]
fn parse_flow_update_added() {
let mut body = Vec::new();
body.extend([0xff, 0xff]); body.extend([0x00, 0x00]); body.extend([0x00, 0x00, 0x00, 0x00]);
body.extend(NICIRA_VENDOR_ID.to_be_bytes());
body.extend(NXST_FLOW_MONITOR.to_be_bytes());
let entry_len: u16 = 24; body.extend(entry_len.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(100u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(0); body.push(0); body.extend(0x1234u64.to_be_bytes());
let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
assert!(!has_more);
assert_eq!(updates.len(), 1);
match &updates[0] {
FlowUpdate::Full(f) => {
assert_eq!(f.event, FlowUpdateEvent::Added);
assert_eq!(f.priority, 100);
assert_eq!(f.table_id, 0);
assert_eq!(f.cookie, 0x1234);
assert!(f.actions.is_empty());
}
FlowUpdate::Abbrev { .. } => panic!("expected Full update"),
}
}
#[test]
fn parse_flow_update_abbrev() {
let mut body = Vec::new();
body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
body.extend(NICIRA_VENDOR_ID.to_be_bytes());
body.extend(NXST_FLOW_MONITOR.to_be_bytes());
body.extend(8u16.to_be_bytes()); body.extend(3u16.to_be_bytes()); body.extend(42u32.to_be_bytes());
let (updates, _) = parse_flow_monitor_reply(&body).unwrap();
assert_eq!(updates.len(), 1);
match &updates[0] {
FlowUpdate::Abbrev { xid } => assert_eq!(*xid, 42),
FlowUpdate::Full(_) => panic!("expected Abbrev update"),
}
}
#[test]
fn parse_multiple_updates() {
let mut body = Vec::new();
body.extend([0xff, 0xff]); body.extend([0x00, 0x01]); body.extend([0x00, 0x00, 0x00, 0x00]);
body.extend(NICIRA_VENDOR_ID.to_be_bytes());
body.extend(NXST_FLOW_MONITOR.to_be_bytes());
body.extend(24u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(200u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(1); body.push(0); body.extend(0xABCDu64.to_be_bytes());
body.extend(24u16.to_be_bytes()); body.extend(1u16.to_be_bytes()); body.extend(3u16.to_be_bytes()); body.extend(50u16.to_be_bytes()); body.extend(10u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(0); body.push(0); body.extend(0u64.to_be_bytes());
let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
assert!(has_more);
assert_eq!(updates.len(), 2);
match &updates[0] {
FlowUpdate::Full(f) => {
assert_eq!(f.event, FlowUpdateEvent::Added);
assert_eq!(f.priority, 200);
assert_eq!(f.table_id, 1);
}
_ => panic!("expected Full"),
}
match &updates[1] {
FlowUpdate::Full(f) => {
assert_eq!(f.event, FlowUpdateEvent::Deleted);
assert_eq!(f.reason, 3);
assert_eq!(f.priority, 50);
}
_ => panic!("expected Full"),
}
}
#[test]
fn parse_wrong_vendor_id() {
let mut body = Vec::new();
body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
body.extend(0xDEAD_BEEFu32.to_be_bytes()); body.extend(NXST_FLOW_MONITOR.to_be_bytes());
assert!(parse_flow_monitor_reply(&body).is_err());
}
#[test]
fn flow_update_event_values() {
assert_eq!(FlowUpdateEvent::from_u16(0).unwrap(), FlowUpdateEvent::Added);
assert_eq!(FlowUpdateEvent::from_u16(1).unwrap(), FlowUpdateEvent::Deleted);
assert_eq!(FlowUpdateEvent::from_u16(2).unwrap(), FlowUpdateEvent::Modified);
assert!(FlowUpdateEvent::from_u16(4).is_err());
}
}