1use bytes::{Bytes, BytesMut};
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use rovs_transport::{Address, Stream};
7
8use crate::error::OfError;
9use crate::flow_monitor::{parse_flow_monitor_reply, FlowMonitorRequest, FlowUpdate};
10use crate::multipart::{parse_flow_stats_reply, FlowStatsEntry, FlowStatsRequest};
11use crate::packet_in::PacketIn;
12use crate::packet_out::PacketOut;
13use crate::{Error, Flow, Header, Message, MessageType, Result, Version};
14
15pub struct VConn {
17 stream: Stream,
18 version: Version,
19 next_xid: u32,
20}
21
22impl VConn {
23 pub async fn connect(addr: &Address) -> Result<Self> {
25 let stream = Stream::connect(addr).await?;
26
27 let mut conn = Self {
28 stream,
29 version: Version::Of13, next_xid: 1,
31 };
32
33 conn.handshake().await?;
34 Ok(conn)
35 }
36
37 pub fn version(&self) -> Version {
39 self.version
40 }
41
42 async fn handshake(&mut self) -> Result<()> {
44 let hello = Message::new(
46 Version::Of13,
47 MessageType::Hello,
48 self.next_xid(),
49 Bytes::new(),
50 );
51 self.send_message(&hello).await?;
52
53 let reply = self.recv_message().await?;
55 if reply.header.msg_type != MessageType::Hello {
56 return Err(Error::InvalidMessage("expected Hello".into()));
57 }
58
59 self.version = std::cmp::min(self.version, reply.header.version);
61
62 Ok(())
63 }
64
65 fn next_xid(&mut self) -> u32 {
67 let xid = self.next_xid;
68 self.next_xid = self.next_xid.wrapping_add(1);
69 xid
70 }
71
72 pub async fn send_message(&mut self, msg: &Message) -> Result<()> {
74 let bytes = msg.encode();
75 self.stream.write_all(&bytes).await?;
76 self.stream.flush().await?;
77 Ok(())
78 }
79
80 pub async fn recv_message(&mut self) -> Result<Message> {
82 let mut header_buf = [0u8; Header::SIZE];
84 self.stream.read_exact(&mut header_buf).await?;
85 let header = Header::decode(&mut Bytes::copy_from_slice(&header_buf))?;
86
87 let body_len = header.length as usize - Header::SIZE;
89 let mut body = BytesMut::zeroed(body_len);
90 if body_len > 0 {
91 self.stream.read_exact(&mut body).await?;
92 }
93
94 Ok(Message {
95 header,
96 body: body.freeze(),
97 })
98 }
99
100 fn check_error(msg: &Message) -> Result<()> {
102 if msg.header.msg_type == MessageType::Error {
103 let of_error = OfError::parse(&msg.body)?;
104 return Err(Error::OfError(of_error));
105 }
106 Ok(())
107 }
108
109 pub async fn send_flow(&mut self, flow: &Flow) -> Result<()> {
115 let xid = self.next_xid();
116 let msg = flow.to_message(self.version, xid);
117 self.send_message(&msg).await
118 }
119
120 pub async fn send_flow_sync(&mut self, flow: &Flow) -> Result<()> {
128 let flow_xid = self.next_xid();
130 let flow_msg = flow.to_message(self.version, flow_xid);
131 self.send_message(&flow_msg).await?;
132
133 let barrier_xid = self.next_xid();
135 let barrier_msg =
136 Message::new(self.version, MessageType::BarrierRequest, barrier_xid, Bytes::new());
137 self.send_message(&barrier_msg).await?;
138
139 loop {
141 let reply = self.recv_message().await?;
142
143 Self::check_error(&reply)?;
145
146 if reply.header.msg_type == MessageType::BarrierReply {
148 return Ok(());
149 }
150
151 if reply.header.msg_type == MessageType::EchoRequest {
153 let echo_reply = Message::new(
154 self.version,
155 MessageType::EchoReply,
156 reply.header.xid,
157 reply.body.clone(),
158 );
159 self.send_message(&echo_reply).await?;
160 }
161
162 }
165 }
166
167 pub async fn echo(&mut self) -> Result<()> {
169 let xid = self.next_xid();
170 let request = Message::new(self.version, MessageType::EchoRequest, xid, Bytes::new());
171 self.send_message(&request).await?;
172
173 let reply = self.recv_message().await?;
174 if reply.header.msg_type != MessageType::EchoReply {
175 return Err(Error::InvalidMessage("expected EchoReply".into()));
176 }
177 if reply.header.xid != xid {
178 return Err(Error::InvalidMessage("xid mismatch".into()));
179 }
180
181 Ok(())
182 }
183
184 pub async fn barrier(&mut self) -> Result<()> {
186 let xid = self.next_xid();
187 let request = Message::new(self.version, MessageType::BarrierRequest, xid, Bytes::new());
188 self.send_message(&request).await?;
189
190 loop {
191 let reply = self.recv_message().await?;
192
193 Self::check_error(&reply)?;
195
196 if reply.header.msg_type == MessageType::BarrierReply {
197 return Ok(());
198 }
199
200 if reply.header.msg_type == MessageType::EchoRequest {
202 let echo_reply = Message::new(
203 self.version,
204 MessageType::EchoReply,
205 reply.header.xid,
206 reply.body.clone(),
207 );
208 self.send_message(&echo_reply).await?;
209 }
210
211 }
213 }
214
215 pub async fn dump_flows(&mut self) -> Result<Vec<FlowStatsEntry>> {
220 self.dump_flows_filtered(FlowStatsRequest::new()).await
221 }
222
223 pub async fn dump_flows_filtered(
227 &mut self,
228 request: FlowStatsRequest,
229 ) -> Result<Vec<FlowStatsEntry>> {
230 let xid = self.next_xid();
231 let msg = request.to_message(self.version, xid);
232 self.send_message(&msg).await?;
233
234 let mut all_entries = Vec::new();
235
236 loop {
238 let reply = self.recv_message().await?;
239
240 Self::check_error(&reply)?;
242
243 if reply.header.msg_type == MessageType::EchoRequest {
245 let echo_reply = Message::new(
246 self.version,
247 MessageType::EchoReply,
248 reply.header.xid,
249 reply.body.clone(),
250 );
251 self.send_message(&echo_reply).await?;
252 continue;
253 }
254
255 if reply.header.msg_type != MessageType::MultipartReply {
257 continue;
258 }
259
260 let (entries, has_more) = parse_flow_stats_reply(&reply.body)?;
262 all_entries.extend(entries);
263
264 if !has_more {
265 break;
266 }
267 }
268
269 Ok(all_entries)
270 }
271
272 pub async fn recv_packet_in(&mut self) -> Result<PacketIn> {
277 loop {
278 let msg = self.recv_message().await?;
279
280 Self::check_error(&msg)?;
282
283 if msg.header.msg_type == MessageType::EchoRequest {
285 let echo_reply = Message::new(
286 self.version,
287 MessageType::EchoReply,
288 msg.header.xid,
289 msg.body.clone(),
290 );
291 self.send_message(&echo_reply).await?;
292 continue;
293 }
294
295 if msg.header.msg_type == MessageType::PacketIn {
297 return PacketIn::parse(msg.body);
298 }
299
300 }
302 }
303
304 pub async fn try_recv_packet_in(&mut self) -> Result<Option<PacketIn>> {
313 let msg = self.recv_message().await?;
314
315 Self::check_error(&msg)?;
317
318 if msg.header.msg_type == MessageType::EchoRequest {
320 let echo_reply = Message::new(
321 self.version,
322 MessageType::EchoReply,
323 msg.header.xid,
324 msg.body.clone(),
325 );
326 self.send_message(&echo_reply).await?;
327 return Ok(None);
328 }
329
330 if msg.header.msg_type == MessageType::PacketIn {
332 return Ok(Some(PacketIn::parse(msg.body)?));
333 }
334
335 Ok(None)
337 }
338
339 pub async fn monitor_flows(
348 &mut self,
349 request: FlowMonitorRequest,
350 ) -> Result<Vec<FlowUpdate>> {
351 let xid = self.next_xid();
352 let msg = request.to_message(self.version, xid);
353 self.send_message(&msg).await?;
354
355 let mut all_updates = Vec::new();
356
357 loop {
359 let reply = self.recv_message().await?;
360
361 Self::check_error(&reply)?;
362
363 if reply.header.msg_type == MessageType::EchoRequest {
364 let echo_reply = Message::new(
365 self.version,
366 MessageType::EchoReply,
367 reply.header.xid,
368 reply.body.clone(),
369 );
370 self.send_message(&echo_reply).await?;
371 continue;
372 }
373
374 if reply.header.msg_type != MessageType::MultipartReply {
375 continue;
376 }
377
378 let (updates, has_more) = parse_flow_monitor_reply(&reply.body)?;
379 all_updates.extend(updates);
380
381 if !has_more {
382 break;
383 }
384 }
385
386 Ok(all_updates)
387 }
388
389 pub async fn recv_flow_updates(&mut self) -> Result<Vec<FlowUpdate>> {
397 loop {
398 let msg = self.recv_message().await?;
399
400 Self::check_error(&msg)?;
401
402 if msg.header.msg_type == MessageType::EchoRequest {
403 let echo_reply = Message::new(
404 self.version,
405 MessageType::EchoReply,
406 msg.header.xid,
407 msg.body.clone(),
408 );
409 self.send_message(&echo_reply).await?;
410 continue;
411 }
412
413 if msg.header.msg_type == MessageType::MultipartReply {
414 let (updates, _has_more) = parse_flow_monitor_reply(&msg.body)?;
415 if !updates.is_empty() {
416 return Ok(updates);
417 }
418 }
419
420 }
422 }
423
424 pub async fn send_packet_out(&mut self, packet_out: &PacketOut) -> Result<()> {
429 let xid = self.next_xid();
430 let msg = packet_out.to_message(self.version, xid);
431 self.send_message(&msg).await
432 }
433}