Skip to main content

rovs_openflow/
vconn.rs

1//! Virtual connection (VConn) for OpenFlow.
2
3use bytes::{Bytes, BytesMut};
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use rovs_transport::{Address, Stream};
7
8use crate::error::OfError;
9use crate::flow_monitor::{parse_flow_monitor_reply, FlowMonitorRequest, FlowUpdate};
10use crate::multipart::{parse_flow_stats_reply, FlowStatsEntry, FlowStatsRequest};
11use crate::packet_in::PacketIn;
12use crate::packet_out::PacketOut;
13use crate::{Error, Flow, Header, Message, MessageType, Result, Version};
14
15/// An OpenFlow virtual connection.
16pub struct VConn {
17    stream: Stream,
18    version: Version,
19    next_xid: u32,
20}
21
22impl VConn {
23    /// Connect to an OpenFlow switch.
24    pub async fn connect(addr: &Address) -> Result<Self> {
25        let stream = Stream::connect(addr).await?;
26
27        let mut conn = Self {
28            stream,
29            version: Version::Of13, // Will be negotiated
30            next_xid: 1,
31        };
32
33        conn.handshake().await?;
34        Ok(conn)
35    }
36
37    /// Get the negotiated OpenFlow version.
38    pub fn version(&self) -> Version {
39        self.version
40    }
41
42    /// Perform the OpenFlow handshake.
43    async fn handshake(&mut self) -> Result<()> {
44        // Send Hello
45        let hello = Message::new(
46            Version::Of13,
47            MessageType::Hello,
48            self.next_xid(),
49            Bytes::new(),
50        );
51        self.send_message(&hello).await?;
52
53        // Receive Hello
54        let reply = self.recv_message().await?;
55        if reply.header.msg_type != MessageType::Hello {
56            return Err(Error::InvalidMessage("expected Hello".into()));
57        }
58
59        // Use the lower of the two versions
60        self.version = std::cmp::min(self.version, reply.header.version);
61
62        Ok(())
63    }
64
65    /// Get the next transaction ID.
66    fn next_xid(&mut self) -> u32 {
67        let xid = self.next_xid;
68        self.next_xid = self.next_xid.wrapping_add(1);
69        xid
70    }
71
72    /// Send a message.
73    pub async fn send_message(&mut self, msg: &Message) -> Result<()> {
74        let bytes = msg.encode();
75        self.stream.write_all(&bytes).await?;
76        self.stream.flush().await?;
77        Ok(())
78    }
79
80    /// Receive a message.
81    pub async fn recv_message(&mut self) -> Result<Message> {
82        // Read header
83        let mut header_buf = [0u8; Header::SIZE];
84        self.stream.read_exact(&mut header_buf).await?;
85        let header = Header::decode(&mut Bytes::copy_from_slice(&header_buf))?;
86
87        // Read body
88        let body_len = header.length as usize - Header::SIZE;
89        let mut body = BytesMut::zeroed(body_len);
90        if body_len > 0 {
91            self.stream.read_exact(&mut body).await?;
92        }
93
94        Ok(Message {
95            header,
96            body: body.freeze(),
97        })
98    }
99
100    /// Check if a message is an error and return the appropriate error.
101    fn check_error(msg: &Message) -> Result<()> {
102        if msg.header.msg_type == MessageType::Error {
103            let of_error = OfError::parse(&msg.body)?;
104            return Err(Error::OfError(of_error));
105        }
106        Ok(())
107    }
108
109    /// Send a flow modification to the switch.
110    ///
111    /// This sends the FlowMod message asynchronously without waiting for
112    /// confirmation. Use `send_flow_sync` if you need to ensure the flow
113    /// was successfully installed.
114    pub async fn send_flow(&mut self, flow: &Flow) -> Result<()> {
115        let xid = self.next_xid();
116        let msg = flow.to_message(self.version, xid);
117        self.send_message(&msg).await
118    }
119
120    /// Send a flow modification and wait for confirmation.
121    ///
122    /// Sends the FlowMod followed by a barrier request, then waits for
123    /// the barrier reply. If an error occurs (e.g., invalid flow), it
124    /// will be returned.
125    ///
126    /// This ensures the flow is installed (or rejected) before returning.
127    pub async fn send_flow_sync(&mut self, flow: &Flow) -> Result<()> {
128        // Send the flow
129        let flow_xid = self.next_xid();
130        let flow_msg = flow.to_message(self.version, flow_xid);
131        self.send_message(&flow_msg).await?;
132
133        // Send barrier
134        let barrier_xid = self.next_xid();
135        let barrier_msg =
136            Message::new(self.version, MessageType::BarrierRequest, barrier_xid, Bytes::new());
137        self.send_message(&barrier_msg).await?;
138
139        // Wait for response - could be error or barrier reply
140        loop {
141            let reply = self.recv_message().await?;
142
143            // Check for error (could be from flow or barrier)
144            Self::check_error(&reply)?;
145
146            // If we got the barrier reply, flow was successfully installed
147            if reply.header.msg_type == MessageType::BarrierReply {
148                return Ok(());
149            }
150
151            // Handle echo requests while waiting (keep-alive)
152            if reply.header.msg_type == MessageType::EchoRequest {
153                let echo_reply = Message::new(
154                    self.version,
155                    MessageType::EchoReply,
156                    reply.header.xid,
157                    reply.body.clone(),
158                );
159                self.send_message(&echo_reply).await?;
160            }
161
162            // Skip other async messages (PacketIn, PortStatus, FlowRemoved)
163            // In a full implementation, these would be queued for processing
164        }
165    }
166
167    /// Send an echo request and wait for reply.
168    pub async fn echo(&mut self) -> Result<()> {
169        let xid = self.next_xid();
170        let request = Message::new(self.version, MessageType::EchoRequest, xid, Bytes::new());
171        self.send_message(&request).await?;
172
173        let reply = self.recv_message().await?;
174        if reply.header.msg_type != MessageType::EchoReply {
175            return Err(Error::InvalidMessage("expected EchoReply".into()));
176        }
177        if reply.header.xid != xid {
178            return Err(Error::InvalidMessage("xid mismatch".into()));
179        }
180
181        Ok(())
182    }
183
184    /// Send a barrier and wait for reply.
185    pub async fn barrier(&mut self) -> Result<()> {
186        let xid = self.next_xid();
187        let request = Message::new(self.version, MessageType::BarrierRequest, xid, Bytes::new());
188        self.send_message(&request).await?;
189
190        loop {
191            let reply = self.recv_message().await?;
192
193            // Check for errors
194            Self::check_error(&reply)?;
195
196            if reply.header.msg_type == MessageType::BarrierReply {
197                return Ok(());
198            }
199
200            // Handle echo requests while waiting
201            if reply.header.msg_type == MessageType::EchoRequest {
202                let echo_reply = Message::new(
203                    self.version,
204                    MessageType::EchoReply,
205                    reply.header.xid,
206                    reply.body.clone(),
207                );
208                self.send_message(&echo_reply).await?;
209            }
210
211            // Skip other async messages
212        }
213    }
214
215    /// Dump all flows from the switch.
216    ///
217    /// Returns all flow entries from all tables. Use `dump_flows_filtered`
218    /// for more specific queries.
219    pub async fn dump_flows(&mut self) -> Result<Vec<FlowStatsEntry>> {
220        self.dump_flows_filtered(FlowStatsRequest::new()).await
221    }
222
223    /// Dump flows matching the given filter.
224    ///
225    /// The request can filter by table ID, match fields, cookie, etc.
226    pub async fn dump_flows_filtered(
227        &mut self,
228        request: FlowStatsRequest,
229    ) -> Result<Vec<FlowStatsEntry>> {
230        let xid = self.next_xid();
231        let msg = request.to_message(self.version, xid);
232        self.send_message(&msg).await?;
233
234        let mut all_entries = Vec::new();
235
236        // Receive multipart replies until we get one without MORE flag
237        loop {
238            let reply = self.recv_message().await?;
239
240            // Check for errors
241            Self::check_error(&reply)?;
242
243            // Handle echo requests
244            if reply.header.msg_type == MessageType::EchoRequest {
245                let echo_reply = Message::new(
246                    self.version,
247                    MessageType::EchoReply,
248                    reply.header.xid,
249                    reply.body.clone(),
250                );
251                self.send_message(&echo_reply).await?;
252                continue;
253            }
254
255            // Skip non-multipart replies
256            if reply.header.msg_type != MessageType::MultipartReply {
257                continue;
258            }
259
260            // Parse the flow stats reply
261            let (entries, has_more) = parse_flow_stats_reply(&reply.body)?;
262            all_entries.extend(entries);
263
264            if !has_more {
265                break;
266            }
267        }
268
269        Ok(all_entries)
270    }
271
272    /// Wait for and receive a Packet-In message.
273    ///
274    /// This blocks until a Packet-In message is received, handling
275    /// echo requests and skipping other message types.
276    pub async fn recv_packet_in(&mut self) -> Result<PacketIn> {
277        loop {
278            let msg = self.recv_message().await?;
279
280            // Check for errors
281            Self::check_error(&msg)?;
282
283            // Handle echo requests (keep-alive)
284            if msg.header.msg_type == MessageType::EchoRequest {
285                let echo_reply = Message::new(
286                    self.version,
287                    MessageType::EchoReply,
288                    msg.header.xid,
289                    msg.body.clone(),
290                );
291                self.send_message(&echo_reply).await?;
292                continue;
293            }
294
295            // Process Packet-In
296            if msg.header.msg_type == MessageType::PacketIn {
297                return PacketIn::parse(msg.body);
298            }
299
300            // Skip other async messages (PortStatus, FlowRemoved)
301        }
302    }
303
304    /// Try to receive a Packet-In message without blocking.
305    ///
306    /// Returns `Ok(Some(packet_in))` if a Packet-In is available,
307    /// `Ok(None)` if a different message was received (and handled),
308    /// or an error if something went wrong.
309    ///
310    /// Note: This still blocks on the read itself; it just doesn't loop
311    /// waiting specifically for a Packet-In.
312    pub async fn try_recv_packet_in(&mut self) -> Result<Option<PacketIn>> {
313        let msg = self.recv_message().await?;
314
315        // Check for errors
316        Self::check_error(&msg)?;
317
318        // Handle echo requests
319        if msg.header.msg_type == MessageType::EchoRequest {
320            let echo_reply = Message::new(
321                self.version,
322                MessageType::EchoReply,
323                msg.header.xid,
324                msg.body.clone(),
325            );
326            self.send_message(&echo_reply).await?;
327            return Ok(None);
328        }
329
330        // Process Packet-In
331        if msg.header.msg_type == MessageType::PacketIn {
332            return Ok(Some(PacketIn::parse(msg.body)?));
333        }
334
335        // Other message types
336        Ok(None)
337    }
338
339    /// Register a flow monitor and receive the initial snapshot.
340    ///
341    /// Sends the monitor request and collects the initial flow updates
342    /// (if `NXFMF_INITIAL` flag was set in the request). After this returns,
343    /// use `recv_flow_updates()` to receive ongoing updates.
344    ///
345    /// Use a dedicated VConn for monitoring — the monitor produces a
346    /// continuous stream that occupies the connection's recv path.
347    pub async fn monitor_flows(
348        &mut self,
349        request: FlowMonitorRequest,
350    ) -> Result<Vec<FlowUpdate>> {
351        let xid = self.next_xid();
352        let msg = request.to_message(self.version, xid);
353        self.send_message(&msg).await?;
354
355        let mut all_updates = Vec::new();
356
357        // Collect initial snapshot (multipart replies until no MORE flag)
358        loop {
359            let reply = self.recv_message().await?;
360
361            Self::check_error(&reply)?;
362
363            if reply.header.msg_type == MessageType::EchoRequest {
364                let echo_reply = Message::new(
365                    self.version,
366                    MessageType::EchoReply,
367                    reply.header.xid,
368                    reply.body.clone(),
369                );
370                self.send_message(&echo_reply).await?;
371                continue;
372            }
373
374            if reply.header.msg_type != MessageType::MultipartReply {
375                continue;
376            }
377
378            let (updates, has_more) = parse_flow_monitor_reply(&reply.body)?;
379            all_updates.extend(updates);
380
381            if !has_more {
382                break;
383            }
384        }
385
386        Ok(all_updates)
387    }
388
389    /// Receive the next batch of flow monitor updates.
390    ///
391    /// Blocks until flow update messages are received from OVS.
392    /// Handles echo requests internally. Returns the parsed updates.
393    ///
394    /// Call this in a loop after `monitor_flows()` to receive ongoing
395    /// flow change notifications.
396    pub async fn recv_flow_updates(&mut self) -> Result<Vec<FlowUpdate>> {
397        loop {
398            let msg = self.recv_message().await?;
399
400            Self::check_error(&msg)?;
401
402            if msg.header.msg_type == MessageType::EchoRequest {
403                let echo_reply = Message::new(
404                    self.version,
405                    MessageType::EchoReply,
406                    msg.header.xid,
407                    msg.body.clone(),
408                );
409                self.send_message(&echo_reply).await?;
410                continue;
411            }
412
413            if msg.header.msg_type == MessageType::MultipartReply {
414                let (updates, _has_more) = parse_flow_monitor_reply(&msg.body)?;
415                if !updates.is_empty() {
416                    return Ok(updates);
417                }
418            }
419
420            // Skip other message types (PacketIn, PortStatus, FlowRemoved, etc.)
421        }
422    }
423
424    /// Send a Packet-Out message.
425    ///
426    /// This injects a packet into the switch's datapath or releases
427    /// a buffered packet with the specified actions.
428    pub async fn send_packet_out(&mut self, packet_out: &PacketOut) -> Result<()> {
429        let xid = self.next_xid();
430        let msg = packet_out.to_message(self.version, xid);
431        self.send_message(&msg).await
432    }
433}