pub fn classify(
result: Result<OvpnMessage, Error>,
) -> Result<ManagementEvent, Error>Expand description
Classify an OvpnMessage result into a ManagementEvent result.
This function is designed to be passed directly to a stream combinator:
ⓘ
use futures::StreamExt;
let events = raw_stream.map(classify);§Extracting notifications with timeout
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::{SinkExt, StreamExt};
use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand};
let stream = TcpStream::connect("127.0.0.1:7505").await?;
let mut framed = Framed::new(stream, OvpnCodec::new());
// Send a command and read the response with a timeout.
framed.send(OvpnCommand::Pid).await?;
let response = tokio::time::timeout(
std::time::Duration::from_secs(5),
framed.next(),
).await?
.ok_or("stream ended")?
?;
println!("got: {response:?}");§Reconnection with backoff
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::StreamExt;
use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage};
let mut backoff = std::time::Duration::from_secs(1);
loop {
match TcpStream::connect("127.0.0.1:7505").await {
Ok(stream) => {
backoff = std::time::Duration::from_secs(1); // reset
let mut framed = Framed::new(stream, OvpnCodec::new());
while let Some(msg) = framed.next().await {
match msg {
Ok(m) => println!("{m:?}"),
Err(e) => { eprintln!("decode error: {e}"); break; }
}
}
eprintln!("connection closed, reconnecting...");
}
Err(e) => {
eprintln!("connect failed: {e}, retrying in {backoff:?}");
}
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
}§Detecting connection loss via >FATAL:
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::StreamExt;
use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage, Notification};
let stream = TcpStream::connect("127.0.0.1:7505").await?;
let mut framed = Framed::new(stream, OvpnCodec::new());
while let Some(msg) = framed.next().await {
match msg? {
OvpnMessage::Notification(Notification::Fatal { message }) => {
eprintln!("OpenVPN fatal: {message}");
// Trigger graceful shutdown / reconnection.
break;
}
other => println!("{other:?}"),
}
}
// Stream ended — either FATAL or the daemon closed the connection.
// In both cases, you should reconnect (see reconnection example above).