Skip to main content

classify

Function classify 

Source
pub fn classify(
    result: Result<OvpnMessage, Error>,
) -> Result<ManagementEvent, Error>
Expand description

Classify an OvpnMessage result into a ManagementEvent result.

This function is designed to be passed directly to a stream combinator:

use futures::StreamExt;
let events = raw_stream.map(classify);

§Extracting notifications with timeout

use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::{SinkExt, StreamExt};
use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand};

let stream = TcpStream::connect("127.0.0.1:7505").await?;
let mut framed = Framed::new(stream, OvpnCodec::new());

// Send a command and read the response with a timeout.
framed.send(OvpnCommand::Pid).await?;
let response = tokio::time::timeout(
    std::time::Duration::from_secs(5),
    framed.next(),
).await?
 .ok_or("stream ended")?
 ?;
println!("got: {response:?}");

§Reconnection with backoff

use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::StreamExt;
use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage};

let mut backoff = std::time::Duration::from_secs(1);
loop {
    match TcpStream::connect("127.0.0.1:7505").await {
        Ok(stream) => {
            backoff = std::time::Duration::from_secs(1); // reset
            let mut framed = Framed::new(stream, OvpnCodec::new());
            while let Some(msg) = framed.next().await {
                match msg {
                    Ok(m) => println!("{m:?}"),
                    Err(e) => { eprintln!("decode error: {e}"); break; }
                }
            }
            eprintln!("connection closed, reconnecting...");
        }
        Err(e) => {
            eprintln!("connect failed: {e}, retrying in {backoff:?}");
        }
    }
    tokio::time::sleep(backoff).await;
    backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
}

§Detecting connection loss via >FATAL:

use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::StreamExt;
use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage, Notification};

let stream = TcpStream::connect("127.0.0.1:7505").await?;
let mut framed = Framed::new(stream, OvpnCodec::new());

while let Some(msg) = framed.next().await {
    match msg? {
        OvpnMessage::Notification(Notification::Fatal { message }) => {
            eprintln!("OpenVPN fatal: {message}");
            // Trigger graceful shutdown / reconnection.
            break;
        }
        other => println!("{other:?}"),
    }
}
// Stream ended — either FATAL or the daemon closed the connection.
// In both cases, you should reconnect (see reconnection example above).