1use bytes::Bytes;
42
43use crate::action::NICIRA_VENDOR_ID;
44use crate::multipart::{MultipartHeader, MultipartType};
45use crate::message::{Message, MessageType};
46use crate::{Match, Version};
47
48const NXST_FLOW_MONITOR: u32 = 2;
50
51pub mod monitor_flags {
53 pub const INITIAL: u16 = 1 << 0;
55 pub const ADD: u16 = 1 << 1;
57 pub const DELETE: u16 = 1 << 2;
59 pub const MODIFY: u16 = 1 << 3;
61 pub const ACTIONS: u16 = 1 << 4;
63 pub const OWN: u16 = 1 << 5;
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum FlowUpdateEvent {
70 Added,
72 Deleted,
74 Modified,
76}
77
78impl FlowUpdateEvent {
79 fn from_u16(v: u16) -> crate::Result<Self> {
80 match v {
81 0 => Ok(Self::Added),
82 1 => Ok(Self::Deleted),
83 2 => Ok(Self::Modified),
84 _ => Err(crate::Error::Parse(format!("unknown flow update event: {v}"))),
85 }
86 }
87}
88
89#[derive(Debug)]
91pub enum FlowUpdate {
92 Full(Box<FlowUpdateFull>),
94 Abbrev {
96 xid: u32,
98 },
99}
100
101#[derive(Debug)]
103pub struct FlowUpdateFull {
104 pub event: FlowUpdateEvent,
106 pub reason: u16,
108 pub priority: u16,
110 pub idle_timeout: u16,
112 pub hard_timeout: u16,
114 pub table_id: u8,
116 pub cookie: u64,
118 pub match_fields: Match,
120 pub actions: Vec<u8>,
122}
123
124#[derive(Debug, Clone)]
140pub struct FlowMonitorRequest {
141 pub id: u32,
143 pub flags: u16,
145 pub out_port: u16,
147 pub table_id: u8,
149 pub match_fields: Match,
151}
152
153impl FlowMonitorRequest {
154 pub fn new(id: u32) -> Self {
156 Self {
157 id,
158 flags: 0,
159 out_port: 0xffff, table_id: 0xff, match_fields: Match::new(),
162 }
163 }
164
165 pub fn all_changes(id: u32) -> Self {
169 Self::new(id).flags(
170 monitor_flags::INITIAL
171 | monitor_flags::ADD
172 | monitor_flags::DELETE
173 | monitor_flags::MODIFY
174 | monitor_flags::ACTIONS,
175 )
176 }
177
178 pub fn flags(mut self, flags: u16) -> Self {
180 self.flags = flags;
181 self
182 }
183
184 pub fn table(mut self, table_id: u8) -> Self {
186 self.table_id = table_id;
187 self
188 }
189
190 pub fn match_fields(mut self, m: Match) -> Self {
192 self.match_fields = m;
193 self
194 }
195
196 pub fn out_port(mut self, port: u16) -> Self {
198 self.out_port = port;
199 self
200 }
201
202 fn encode_body(&self) -> Vec<u8> {
204 let match_bytes = self.match_fields.encode_oxm_fields();
205 let match_len = match_bytes.len() as u16;
206
207 let mut buf = Vec::with_capacity(12 + match_bytes.len());
208
209 buf.extend(self.id.to_be_bytes());
211 buf.extend(self.flags.to_be_bytes());
213 buf.extend(self.out_port.to_be_bytes());
215 buf.extend(match_len.to_be_bytes());
217 buf.push(self.table_id);
219 buf.push(0);
221 buf.extend(match_bytes);
223
224 buf
225 }
226
227 pub fn to_message(&self, version: Version, xid: u32) -> Message {
229 let mp_header = MultipartHeader {
230 mp_type: MultipartType::Experimenter,
231 flags: 0,
232 };
233
234 let mut body = Vec::new();
235 body.extend(mp_header.encode());
237 body.extend(NICIRA_VENDOR_ID.to_be_bytes());
239 body.extend(NXST_FLOW_MONITOR.to_be_bytes());
240 body.extend(self.encode_body());
242
243 Message::new(version, MessageType::MultipartRequest, xid, Bytes::from(body))
244 }
245}
246
247pub fn parse_flow_monitor_reply(body: &[u8]) -> crate::Result<(Vec<FlowUpdate>, bool)> {
251 if body.len() < MultipartHeader::SIZE + 8 {
252 return Err(crate::Error::Parse(
253 "flow monitor reply too short".into(),
254 ));
255 }
256
257 let mp_header = MultipartHeader::decode(body)?;
259 if mp_header.mp_type != MultipartType::Experimenter {
260 return Err(crate::Error::Parse(format!(
261 "expected Experimenter multipart type, got {:?}",
262 mp_header.mp_type
263 )));
264 }
265
266 let offset = MultipartHeader::SIZE;
267
268 let vendor = u32::from_be_bytes([
270 body[offset], body[offset + 1], body[offset + 2], body[offset + 3],
271 ]);
272 let subtype = u32::from_be_bytes([
273 body[offset + 4], body[offset + 5], body[offset + 6], body[offset + 7],
274 ]);
275
276 if vendor != NICIRA_VENDOR_ID {
277 return Err(crate::Error::Parse(format!(
278 "expected Nicira vendor ID 0x{NICIRA_VENDOR_ID:08x}, got 0x{vendor:08x}"
279 )));
280 }
281 if subtype != NXST_FLOW_MONITOR {
282 return Err(crate::Error::Parse(format!(
283 "expected NXST_FLOW_MONITOR subtype {NXST_FLOW_MONITOR}, got {subtype}"
284 )));
285 }
286
287 let mut updates = Vec::new();
289 let mut pos = offset + 8; while pos + 4 <= body.len() {
292 let entry_len = u16::from_be_bytes([body[pos], body[pos + 1]]) as usize;
293 let event_code = u16::from_be_bytes([body[pos + 2], body[pos + 3]]);
294
295 if entry_len < 4 || pos + entry_len > body.len() {
296 break;
297 }
298
299 let update = if event_code == 3 {
300 if entry_len < 8 {
302 return Err(crate::Error::Parse("flow update abbrev too short".into()));
303 }
304 let xid = u32::from_be_bytes([
305 body[pos + 4], body[pos + 5], body[pos + 6], body[pos + 7],
306 ]);
307 FlowUpdate::Abbrev { xid }
308 } else {
309 parse_flow_update_full(&body[pos..pos + entry_len], event_code)?
311 };
312
313 updates.push(update);
314 pos += entry_len;
315 }
316
317 Ok((updates, mp_header.has_more()))
318}
319
320fn parse_flow_update_full(data: &[u8], event_code: u16) -> crate::Result<FlowUpdate> {
331 const FIXED_SIZE: usize = 24;
332
333 if data.len() < FIXED_SIZE {
334 return Err(crate::Error::Parse("flow update entry too short".into()));
335 }
336
337 let entry_len = u16::from_be_bytes([data[0], data[1]]) as usize;
338 let event = FlowUpdateEvent::from_u16(event_code)?;
339 let reason = u16::from_be_bytes([data[4], data[5]]);
340 let priority = u16::from_be_bytes([data[6], data[7]]);
341 let idle_timeout = u16::from_be_bytes([data[8], data[9]]);
342 let hard_timeout = u16::from_be_bytes([data[10], data[11]]);
343 let match_len = u16::from_be_bytes([data[12], data[13]]) as usize;
344 let table_id = data[14];
345 let cookie = u64::from_be_bytes([
347 data[16], data[17], data[18], data[19],
348 data[20], data[21], data[22], data[23],
349 ]);
350
351 let match_end = FIXED_SIZE + match_len;
353 if match_end > entry_len {
354 return Err(crate::Error::Parse("flow update match truncated".into()));
355 }
356 let match_fields = Match::decode_oxm(&data[FIXED_SIZE..match_end])?;
357
358 let actions = if match_end < entry_len {
360 data[match_end..entry_len].to_vec()
361 } else {
362 Vec::new()
363 };
364
365 Ok(FlowUpdate::Full(Box::new(FlowUpdateFull {
366 event,
367 reason,
368 priority,
369 idle_timeout,
370 hard_timeout,
371 table_id,
372 cookie,
373 match_fields,
374 actions,
375 })))
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn monitor_request_defaults() {
384 let req = FlowMonitorRequest::new(1);
385 assert_eq!(req.id, 1);
386 assert_eq!(req.flags, 0);
387 assert_eq!(req.out_port, 0xffff);
388 assert_eq!(req.table_id, 0xff);
389 }
390
391 #[test]
392 fn monitor_request_all_changes() {
393 let req = FlowMonitorRequest::all_changes(42);
394 assert_eq!(req.id, 42);
395 assert_eq!(
396 req.flags,
397 monitor_flags::INITIAL
398 | monitor_flags::ADD
399 | monitor_flags::DELETE
400 | monitor_flags::MODIFY
401 | monitor_flags::ACTIONS
402 );
403 }
404
405 #[test]
406 fn monitor_request_builder() {
407 let req = FlowMonitorRequest::new(1)
408 .flags(monitor_flags::ADD | monitor_flags::DELETE)
409 .table(0)
410 .out_port(1);
411
412 assert_eq!(req.table_id, 0);
413 assert_eq!(req.out_port, 1);
414 assert_eq!(req.flags, monitor_flags::ADD | monitor_flags::DELETE);
415 }
416
417 #[test]
418 fn monitor_request_encode_body() {
419 let req = FlowMonitorRequest::new(1)
420 .flags(monitor_flags::ADD);
421 let body = req.encode_body();
422
423 assert_eq!(body.len(), 12);
426
427 assert_eq!(body[0..4], [0, 0, 0, 1]);
429 assert_eq!(body[4..6], [0, 2]);
431 assert_eq!(body[6..8], [0xff, 0xff]);
433 assert_eq!(body[8..10], [0, 0]);
435 assert_eq!(body[10], 0xff);
437 assert_eq!(body[11], 0);
439 }
440
441 #[test]
442 fn monitor_request_to_message() {
443 let req = FlowMonitorRequest::all_changes(1);
444 let msg = req.to_message(Version::Of13, 10);
445
446 assert_eq!(msg.header.msg_type, MessageType::MultipartRequest);
447 assert_eq!(msg.header.xid, 10);
448
449 assert!(msg.body.len() >= 28);
451
452 assert_eq!(msg.body[0..2], [0xff, 0xff]);
454 assert_eq!(msg.body[8..12], NICIRA_VENDOR_ID.to_be_bytes());
456 assert_eq!(msg.body[12..16], 2u32.to_be_bytes());
458 }
459
460 #[test]
461 fn parse_flow_update_added() {
462 let mut body = Vec::new();
464
465 body.extend([0xff, 0xff]); body.extend([0x00, 0x00]); body.extend([0x00, 0x00, 0x00, 0x00]); body.extend(NICIRA_VENDOR_ID.to_be_bytes());
472 body.extend(NXST_FLOW_MONITOR.to_be_bytes());
473
474 let entry_len: u16 = 24; body.extend(entry_len.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(100u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(0); body.push(0); body.extend(0x1234u64.to_be_bytes()); let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
488 assert!(!has_more);
489 assert_eq!(updates.len(), 1);
490
491 match &updates[0] {
492 FlowUpdate::Full(f) => {
493 assert_eq!(f.event, FlowUpdateEvent::Added);
494 assert_eq!(f.priority, 100);
495 assert_eq!(f.table_id, 0);
496 assert_eq!(f.cookie, 0x1234);
497 assert!(f.actions.is_empty());
498 }
499 FlowUpdate::Abbrev { .. } => panic!("expected Full update"),
500 }
501 }
502
503 #[test]
504 fn parse_flow_update_abbrev() {
505 let mut body = Vec::new();
506
507 body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
509 body.extend(NICIRA_VENDOR_ID.to_be_bytes());
511 body.extend(NXST_FLOW_MONITOR.to_be_bytes());
512
513 body.extend(8u16.to_be_bytes()); body.extend(3u16.to_be_bytes()); body.extend(42u32.to_be_bytes()); let (updates, _) = parse_flow_monitor_reply(&body).unwrap();
519 assert_eq!(updates.len(), 1);
520
521 match &updates[0] {
522 FlowUpdate::Abbrev { xid } => assert_eq!(*xid, 42),
523 FlowUpdate::Full(_) => panic!("expected Abbrev update"),
524 }
525 }
526
527 #[test]
528 fn parse_multiple_updates() {
529 let mut body = Vec::new();
530
531 body.extend([0xff, 0xff]); body.extend([0x00, 0x01]); body.extend([0x00, 0x00, 0x00, 0x00]); body.extend(NICIRA_VENDOR_ID.to_be_bytes());
538 body.extend(NXST_FLOW_MONITOR.to_be_bytes());
539
540 body.extend(24u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(200u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(1); body.push(0); body.extend(0xABCDu64.to_be_bytes()); body.extend(24u16.to_be_bytes()); body.extend(1u16.to_be_bytes()); body.extend(3u16.to_be_bytes()); body.extend(50u16.to_be_bytes()); body.extend(10u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.extend(0u16.to_be_bytes()); body.push(0); body.push(0); body.extend(0u64.to_be_bytes()); let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
565 assert!(has_more);
566 assert_eq!(updates.len(), 2);
567
568 match &updates[0] {
569 FlowUpdate::Full(f) => {
570 assert_eq!(f.event, FlowUpdateEvent::Added);
571 assert_eq!(f.priority, 200);
572 assert_eq!(f.table_id, 1);
573 }
574 _ => panic!("expected Full"),
575 }
576
577 match &updates[1] {
578 FlowUpdate::Full(f) => {
579 assert_eq!(f.event, FlowUpdateEvent::Deleted);
580 assert_eq!(f.reason, 3);
581 assert_eq!(f.priority, 50);
582 }
583 _ => panic!("expected Full"),
584 }
585 }
586
587 #[test]
588 fn parse_wrong_vendor_id() {
589 let mut body = Vec::new();
590 body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
591 body.extend(0xDEAD_BEEFu32.to_be_bytes()); body.extend(NXST_FLOW_MONITOR.to_be_bytes());
593
594 assert!(parse_flow_monitor_reply(&body).is_err());
595 }
596
597 #[test]
598 fn flow_update_event_values() {
599 assert_eq!(FlowUpdateEvent::from_u16(0).unwrap(), FlowUpdateEvent::Added);
600 assert_eq!(FlowUpdateEvent::from_u16(1).unwrap(), FlowUpdateEvent::Deleted);
601 assert_eq!(FlowUpdateEvent::from_u16(2).unwrap(), FlowUpdateEvent::Modified);
602 assert!(FlowUpdateEvent::from_u16(4).is_err());
603 }
604}