Skip to main content

rovs_openflow/
flow_monitor.rs

1//! Nicira Flow Monitor (NXST_FLOW_MONITOR).
2//!
3//! Provides event-driven flow table monitoring via the Nicira vendor extension.
4//! A controller can register interest in flow table changes and receive
5//! asynchronous notifications as flows are added, modified, or deleted.
6//!
7//! # Protocol
8//!
9//! The flow monitor uses the Multipart Experimenter framework:
10//! - Request: Multipart Request with Nicira vendor header and NXST_FLOW_MONITOR subtype
11//! - Reply: Continuous stream of Multipart Experimenter replies with flow update entries
12//!
13//! # Usage
14//!
15//! Open a dedicated VConn for monitoring (standard OVS practice):
16//!
17//! ```no_run
18//! # use rovs_openflow::*;
19//! # async fn example() -> Result<()> {
20//! # let addr = rovs_transport::Address::Tcp("127.0.0.1:6653".parse().unwrap());
21//! let mut mon = VConn::connect(&addr).await?;
22//!
23//! // Register monitor for all flow changes
24//! let request = FlowMonitorRequest::all_changes(1);
25//! let initial = mon.monitor_flows(request).await?;
26//!
27//! // Receive ongoing updates
28//! loop {
29//!     let updates = mon.recv_flow_updates().await?;
30//!     for update in &updates {
31//!         match update {
32//!             FlowUpdate::Full(f) => println!("{:?}: table={} pri={}",
33//!                 f.event, f.table_id, f.priority),
34//!             FlowUpdate::Abbrev { xid } => println!("own change (xid={xid})"),
35//!         }
36//!     }
37//! }
38//! # }
39//! ```
40
41use bytes::Bytes;
42
43use crate::action::NICIRA_VENDOR_ID;
44use crate::multipart::{MultipartHeader, MultipartType};
45use crate::message::{Message, MessageType};
46use crate::{Match, Version};
47
48/// Nicira stats subtype for flow monitor.
49const NXST_FLOW_MONITOR: u32 = 2;
50
51/// Flow monitor request flags (NXFMF_*).
52pub mod monitor_flags {
53    /// Send existing flows as initial ADDED events.
54    pub const INITIAL: u16 = 1 << 0;
55    /// Notify when a new flow is added.
56    pub const ADD: u16 = 1 << 1;
57    /// Notify when a flow is deleted.
58    pub const DELETE: u16 = 1 << 2;
59    /// Notify when a flow is modified.
60    pub const MODIFY: u16 = 1 << 3;
61    /// Include actions in update entries.
62    pub const ACTIONS: u16 = 1 << 4;
63    /// Include updates caused by the caller's own messages.
64    pub const OWN: u16 = 1 << 5;
65}
66
67/// Flow update event type.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum FlowUpdateEvent {
70    /// Flow was added (or initial snapshot entry).
71    Added,
72    /// Flow was deleted.
73    Deleted,
74    /// Flow was modified.
75    Modified,
76}
77
78impl FlowUpdateEvent {
79    fn from_u16(v: u16) -> crate::Result<Self> {
80        match v {
81            0 => Ok(Self::Added),
82            1 => Ok(Self::Deleted),
83            2 => Ok(Self::Modified),
84            _ => Err(crate::Error::Parse(format!("unknown flow update event: {v}"))),
85        }
86    }
87}
88
89/// A flow update received from the monitor.
90#[derive(Debug)]
91pub enum FlowUpdate {
92    /// A flow was added, deleted, or modified.
93    Full(Box<FlowUpdateFull>),
94    /// Abbreviated notification for the caller's own changes.
95    Abbrev {
96        /// The xid of the message that caused the update.
97        xid: u32,
98    },
99}
100
101/// Full flow update entry (ADDED, DELETED, or MODIFIED).
102#[derive(Debug)]
103pub struct FlowUpdateFull {
104    /// The type of event.
105    pub event: FlowUpdateEvent,
106    /// Reason for deletion (OFPRR_* value). Only meaningful for Deleted events.
107    pub reason: u16,
108    /// Flow priority.
109    pub priority: u16,
110    /// Idle timeout (seconds).
111    pub idle_timeout: u16,
112    /// Hard timeout (seconds).
113    pub hard_timeout: u16,
114    /// Table ID.
115    pub table_id: u8,
116    /// Flow cookie.
117    pub cookie: u64,
118    /// Match fields.
119    pub match_fields: Match,
120    /// Actions (raw bytes). Only present if NXFMF_ACTIONS was set.
121    pub actions: Vec<u8>,
122}
123
124/// Builder for a flow monitor request.
125///
126/// # Wire Format
127///
128/// ```text
129/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
130/// |                          monitor_id                           |
131/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
132/// |            flags            |           out_port              |
133/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
134/// |          match_len          |   table_id  |     pad           |
135/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
136/// |                    match (NXM/OXM TLVs)                       |
137/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
138/// ```
139#[derive(Debug, Clone)]
140pub struct FlowMonitorRequest {
141    /// Controller-assigned monitor ID.
142    pub id: u32,
143    /// Monitor flags (NXFMF_*).
144    pub flags: u16,
145    /// Required output port filter, or 0xffff for any.
146    pub out_port: u16,
147    /// Table to monitor, or 0xff for all tables.
148    pub table_id: u8,
149    /// Match fields filter.
150    pub match_fields: Match,
151}
152
153impl FlowMonitorRequest {
154    /// Create a new flow monitor request with the given ID.
155    pub fn new(id: u32) -> Self {
156        Self {
157            id,
158            flags: 0,
159            out_port: 0xffff, // OFPP_NONE (16-bit)
160            table_id: 0xff,   // All tables
161            match_fields: Match::new(),
162        }
163    }
164
165    /// Create a monitor for all flow changes in all tables.
166    ///
167    /// Enables INITIAL, ADD, DELETE, MODIFY, and ACTIONS flags.
168    pub fn all_changes(id: u32) -> Self {
169        Self::new(id).flags(
170            monitor_flags::INITIAL
171                | monitor_flags::ADD
172                | monitor_flags::DELETE
173                | monitor_flags::MODIFY
174                | monitor_flags::ACTIONS,
175        )
176    }
177
178    /// Set monitor flags.
179    pub fn flags(mut self, flags: u16) -> Self {
180        self.flags = flags;
181        self
182    }
183
184    /// Filter by table ID (0xff for all tables).
185    pub fn table(mut self, table_id: u8) -> Self {
186        self.table_id = table_id;
187        self
188    }
189
190    /// Filter by match fields.
191    pub fn match_fields(mut self, m: Match) -> Self {
192        self.match_fields = m;
193        self
194    }
195
196    /// Filter by output port.
197    pub fn out_port(mut self, port: u16) -> Self {
198        self.out_port = port;
199        self
200    }
201
202    /// Encode the monitor request body (without multipart/vendor headers).
203    fn encode_body(&self) -> Vec<u8> {
204        let match_bytes = self.match_fields.encode_oxm_fields();
205        let match_len = match_bytes.len() as u16;
206
207        let mut buf = Vec::with_capacity(12 + match_bytes.len());
208
209        // id (4)
210        buf.extend(self.id.to_be_bytes());
211        // flags (2)
212        buf.extend(self.flags.to_be_bytes());
213        // out_port (2)
214        buf.extend(self.out_port.to_be_bytes());
215        // match_len (2)
216        buf.extend(match_len.to_be_bytes());
217        // table_id (1)
218        buf.push(self.table_id);
219        // pad (1)
220        buf.push(0);
221        // match fields (variable, NXM/OXM TLVs, no match header)
222        buf.extend(match_bytes);
223
224        buf
225    }
226
227    /// Create the complete multipart request message.
228    pub fn to_message(&self, version: Version, xid: u32) -> Message {
229        let mp_header = MultipartHeader {
230            mp_type: MultipartType::Experimenter,
231            flags: 0,
232        };
233
234        let mut body = Vec::new();
235        // Multipart header (8 bytes)
236        body.extend(mp_header.encode());
237        // Experimenter header: vendor (4) + subtype (4)
238        body.extend(NICIRA_VENDOR_ID.to_be_bytes());
239        body.extend(NXST_FLOW_MONITOR.to_be_bytes());
240        // Monitor request body
241        body.extend(self.encode_body());
242
243        Message::new(version, MessageType::MultipartRequest, xid, Bytes::from(body))
244    }
245}
246
247/// Parse flow monitor update entries from a multipart experimenter reply body.
248///
249/// Returns the parsed updates and whether the MORE flag is set.
250pub fn parse_flow_monitor_reply(body: &[u8]) -> crate::Result<(Vec<FlowUpdate>, bool)> {
251    if body.len() < MultipartHeader::SIZE + 8 {
252        return Err(crate::Error::Parse(
253            "flow monitor reply too short".into(),
254        ));
255    }
256
257    // Multipart header
258    let mp_header = MultipartHeader::decode(body)?;
259    if mp_header.mp_type != MultipartType::Experimenter {
260        return Err(crate::Error::Parse(format!(
261            "expected Experimenter multipart type, got {:?}",
262            mp_header.mp_type
263        )));
264    }
265
266    let offset = MultipartHeader::SIZE;
267
268    // Vendor header
269    let vendor = u32::from_be_bytes([
270        body[offset], body[offset + 1], body[offset + 2], body[offset + 3],
271    ]);
272    let subtype = u32::from_be_bytes([
273        body[offset + 4], body[offset + 5], body[offset + 6], body[offset + 7],
274    ]);
275
276    if vendor != NICIRA_VENDOR_ID {
277        return Err(crate::Error::Parse(format!(
278            "expected Nicira vendor ID 0x{NICIRA_VENDOR_ID:08x}, got 0x{vendor:08x}"
279        )));
280    }
281    if subtype != NXST_FLOW_MONITOR {
282        return Err(crate::Error::Parse(format!(
283            "expected NXST_FLOW_MONITOR subtype {NXST_FLOW_MONITOR}, got {subtype}"
284        )));
285    }
286
287    // Parse flow update entries
288    let mut updates = Vec::new();
289    let mut pos = offset + 8; // After vendor header
290
291    while pos + 4 <= body.len() {
292        let entry_len = u16::from_be_bytes([body[pos], body[pos + 1]]) as usize;
293        let event_code = u16::from_be_bytes([body[pos + 2], body[pos + 3]]);
294
295        if entry_len < 4 || pos + entry_len > body.len() {
296            break;
297        }
298
299        let update = if event_code == 3 {
300            // ABBREV
301            if entry_len < 8 {
302                return Err(crate::Error::Parse("flow update abbrev too short".into()));
303            }
304            let xid = u32::from_be_bytes([
305                body[pos + 4], body[pos + 5], body[pos + 6], body[pos + 7],
306            ]);
307            FlowUpdate::Abbrev { xid }
308        } else {
309            // ADDED (0), DELETED (1), MODIFIED (2)
310            parse_flow_update_full(&body[pos..pos + entry_len], event_code)?
311        };
312
313        updates.push(update);
314        pos += entry_len;
315    }
316
317    Ok((updates, mp_header.has_more()))
318}
319
320/// Parse a full flow update entry.
321///
322/// Wire format:
323/// ```text
324/// length(2) + event(2) + reason(2) + priority(2) +
325/// idle_timeout(2) + hard_timeout(2) + match_len(2) +
326/// table_id(1) + pad(1) + cookie(8) = 24 bytes fixed
327/// + match NXM TLVs (match_len bytes)
328/// + actions (remaining bytes)
329/// ```
330fn parse_flow_update_full(data: &[u8], event_code: u16) -> crate::Result<FlowUpdate> {
331    const FIXED_SIZE: usize = 24;
332
333    if data.len() < FIXED_SIZE {
334        return Err(crate::Error::Parse("flow update entry too short".into()));
335    }
336
337    let entry_len = u16::from_be_bytes([data[0], data[1]]) as usize;
338    let event = FlowUpdateEvent::from_u16(event_code)?;
339    let reason = u16::from_be_bytes([data[4], data[5]]);
340    let priority = u16::from_be_bytes([data[6], data[7]]);
341    let idle_timeout = u16::from_be_bytes([data[8], data[9]]);
342    let hard_timeout = u16::from_be_bytes([data[10], data[11]]);
343    let match_len = u16::from_be_bytes([data[12], data[13]]) as usize;
344    let table_id = data[14];
345    // data[15] is padding
346    let cookie = u64::from_be_bytes([
347        data[16], data[17], data[18], data[19],
348        data[20], data[21], data[22], data[23],
349    ]);
350
351    // Match fields (NXM/OXM TLVs, no match header)
352    let match_end = FIXED_SIZE + match_len;
353    if match_end > entry_len {
354        return Err(crate::Error::Parse("flow update match truncated".into()));
355    }
356    let match_fields = Match::decode_oxm(&data[FIXED_SIZE..match_end])?;
357
358    // Actions (remaining bytes after match)
359    let actions = if match_end < entry_len {
360        data[match_end..entry_len].to_vec()
361    } else {
362        Vec::new()
363    };
364
365    Ok(FlowUpdate::Full(Box::new(FlowUpdateFull {
366        event,
367        reason,
368        priority,
369        idle_timeout,
370        hard_timeout,
371        table_id,
372        cookie,
373        match_fields,
374        actions,
375    })))
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn monitor_request_defaults() {
384        let req = FlowMonitorRequest::new(1);
385        assert_eq!(req.id, 1);
386        assert_eq!(req.flags, 0);
387        assert_eq!(req.out_port, 0xffff);
388        assert_eq!(req.table_id, 0xff);
389    }
390
391    #[test]
392    fn monitor_request_all_changes() {
393        let req = FlowMonitorRequest::all_changes(42);
394        assert_eq!(req.id, 42);
395        assert_eq!(
396            req.flags,
397            monitor_flags::INITIAL
398                | monitor_flags::ADD
399                | monitor_flags::DELETE
400                | monitor_flags::MODIFY
401                | monitor_flags::ACTIONS
402        );
403    }
404
405    #[test]
406    fn monitor_request_builder() {
407        let req = FlowMonitorRequest::new(1)
408            .flags(monitor_flags::ADD | monitor_flags::DELETE)
409            .table(0)
410            .out_port(1);
411
412        assert_eq!(req.table_id, 0);
413        assert_eq!(req.out_port, 1);
414        assert_eq!(req.flags, monitor_flags::ADD | monitor_flags::DELETE);
415    }
416
417    #[test]
418    fn monitor_request_encode_body() {
419        let req = FlowMonitorRequest::new(1)
420            .flags(monitor_flags::ADD);
421        let body = req.encode_body();
422
423        // id (4) + flags (2) + out_port (2) + match_len (2) + table_id (1) + pad (1) = 12
424        // + 0 match bytes (empty match)
425        assert_eq!(body.len(), 12);
426
427        // id = 1
428        assert_eq!(body[0..4], [0, 0, 0, 1]);
429        // flags = ADD (2)
430        assert_eq!(body[4..6], [0, 2]);
431        // out_port = 0xffff
432        assert_eq!(body[6..8], [0xff, 0xff]);
433        // match_len = 0
434        assert_eq!(body[8..10], [0, 0]);
435        // table_id = 0xff
436        assert_eq!(body[10], 0xff);
437        // pad
438        assert_eq!(body[11], 0);
439    }
440
441    #[test]
442    fn monitor_request_to_message() {
443        let req = FlowMonitorRequest::all_changes(1);
444        let msg = req.to_message(Version::Of13, 10);
445
446        assert_eq!(msg.header.msg_type, MessageType::MultipartRequest);
447        assert_eq!(msg.header.xid, 10);
448
449        // Body: multipart header (8) + vendor header (8) + request body (12+)
450        assert!(msg.body.len() >= 28);
451
452        // Check multipart type = Experimenter (0xffff)
453        assert_eq!(msg.body[0..2], [0xff, 0xff]);
454        // Check vendor ID
455        assert_eq!(msg.body[8..12], NICIRA_VENDOR_ID.to_be_bytes());
456        // Check subtype = NXST_FLOW_MONITOR (2)
457        assert_eq!(msg.body[12..16], 2u32.to_be_bytes());
458    }
459
460    #[test]
461    fn parse_flow_update_added() {
462        // Build a minimal flow monitor reply
463        let mut body = Vec::new();
464
465        // Multipart header: type=Experimenter, flags=0, pad=0
466        body.extend([0xff, 0xff]); // type
467        body.extend([0x00, 0x00]); // flags
468        body.extend([0x00, 0x00, 0x00, 0x00]); // pad
469
470        // Vendor header
471        body.extend(NICIRA_VENDOR_ID.to_be_bytes());
472        body.extend(NXST_FLOW_MONITOR.to_be_bytes());
473
474        // Flow update entry: ADDED, priority=100, table=0, no match, no actions
475        let entry_len: u16 = 24; // fixed size, no match, no actions
476        body.extend(entry_len.to_be_bytes()); // length
477        body.extend(0u16.to_be_bytes());      // event = ADDED
478        body.extend(0u16.to_be_bytes());      // reason
479        body.extend(100u16.to_be_bytes());    // priority
480        body.extend(0u16.to_be_bytes());      // idle_timeout
481        body.extend(0u16.to_be_bytes());      // hard_timeout
482        body.extend(0u16.to_be_bytes());      // match_len = 0
483        body.push(0);                         // table_id
484        body.push(0);                         // pad
485        body.extend(0x1234u64.to_be_bytes()); // cookie
486
487        let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
488        assert!(!has_more);
489        assert_eq!(updates.len(), 1);
490
491        match &updates[0] {
492            FlowUpdate::Full(f) => {
493                assert_eq!(f.event, FlowUpdateEvent::Added);
494                assert_eq!(f.priority, 100);
495                assert_eq!(f.table_id, 0);
496                assert_eq!(f.cookie, 0x1234);
497                assert!(f.actions.is_empty());
498            }
499            FlowUpdate::Abbrev { .. } => panic!("expected Full update"),
500        }
501    }
502
503    #[test]
504    fn parse_flow_update_abbrev() {
505        let mut body = Vec::new();
506
507        // Multipart header
508        body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
509        // Vendor header
510        body.extend(NICIRA_VENDOR_ID.to_be_bytes());
511        body.extend(NXST_FLOW_MONITOR.to_be_bytes());
512
513        // ABBREV entry: length=8, event=3, xid=42
514        body.extend(8u16.to_be_bytes());  // length
515        body.extend(3u16.to_be_bytes());  // event = ABBREV
516        body.extend(42u32.to_be_bytes()); // xid
517
518        let (updates, _) = parse_flow_monitor_reply(&body).unwrap();
519        assert_eq!(updates.len(), 1);
520
521        match &updates[0] {
522            FlowUpdate::Abbrev { xid } => assert_eq!(*xid, 42),
523            FlowUpdate::Full(_) => panic!("expected Abbrev update"),
524        }
525    }
526
527    #[test]
528    fn parse_multiple_updates() {
529        let mut body = Vec::new();
530
531        // Multipart header with MORE flag
532        body.extend([0xff, 0xff]); // type = Experimenter
533        body.extend([0x00, 0x01]); // flags = MORE
534        body.extend([0x00, 0x00, 0x00, 0x00]); // pad
535
536        // Vendor header
537        body.extend(NICIRA_VENDOR_ID.to_be_bytes());
538        body.extend(NXST_FLOW_MONITOR.to_be_bytes());
539
540        // First entry: ADDED
541        body.extend(24u16.to_be_bytes());     // length
542        body.extend(0u16.to_be_bytes());      // event = ADDED
543        body.extend(0u16.to_be_bytes());      // reason
544        body.extend(200u16.to_be_bytes());    // priority
545        body.extend(0u16.to_be_bytes());      // idle_timeout
546        body.extend(0u16.to_be_bytes());      // hard_timeout
547        body.extend(0u16.to_be_bytes());      // match_len
548        body.push(1);                         // table_id
549        body.push(0);                         // pad
550        body.extend(0xABCDu64.to_be_bytes()); // cookie
551
552        // Second entry: DELETED
553        body.extend(24u16.to_be_bytes());     // length
554        body.extend(1u16.to_be_bytes());      // event = DELETED
555        body.extend(3u16.to_be_bytes());      // reason = OFPRR_DELETE
556        body.extend(50u16.to_be_bytes());     // priority
557        body.extend(10u16.to_be_bytes());     // idle_timeout
558        body.extend(0u16.to_be_bytes());      // hard_timeout
559        body.extend(0u16.to_be_bytes());      // match_len
560        body.push(0);                         // table_id
561        body.push(0);                         // pad
562        body.extend(0u64.to_be_bytes());      // cookie
563
564        let (updates, has_more) = parse_flow_monitor_reply(&body).unwrap();
565        assert!(has_more);
566        assert_eq!(updates.len(), 2);
567
568        match &updates[0] {
569            FlowUpdate::Full(f) => {
570                assert_eq!(f.event, FlowUpdateEvent::Added);
571                assert_eq!(f.priority, 200);
572                assert_eq!(f.table_id, 1);
573            }
574            _ => panic!("expected Full"),
575        }
576
577        match &updates[1] {
578            FlowUpdate::Full(f) => {
579                assert_eq!(f.event, FlowUpdateEvent::Deleted);
580                assert_eq!(f.reason, 3);
581                assert_eq!(f.priority, 50);
582            }
583            _ => panic!("expected Full"),
584        }
585    }
586
587    #[test]
588    fn parse_wrong_vendor_id() {
589        let mut body = Vec::new();
590        body.extend([0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
591        body.extend(0xDEAD_BEEFu32.to_be_bytes()); // wrong vendor
592        body.extend(NXST_FLOW_MONITOR.to_be_bytes());
593
594        assert!(parse_flow_monitor_reply(&body).is_err());
595    }
596
597    #[test]
598    fn flow_update_event_values() {
599        assert_eq!(FlowUpdateEvent::from_u16(0).unwrap(), FlowUpdateEvent::Added);
600        assert_eq!(FlowUpdateEvent::from_u16(1).unwrap(), FlowUpdateEvent::Deleted);
601        assert_eq!(FlowUpdateEvent::from_u16(2).unwrap(), FlowUpdateEvent::Modified);
602        assert!(FlowUpdateEvent::from_u16(4).is_err());
603    }
604}