Skip to main content

openvpn_mgmt_codec/
stream.rs

1//! Helpers for categorizing [`OvpnMessage`]s into responses and notifications.
2//!
3//! The raw codec yields [`OvpnMessage`] variants, and callers typically
4//! need to branch on "is this a response to a command I sent?" vs. "is
5//! this an asynchronous notification?". This module provides
6//! [`ManagementEvent`] (the two-variant enum) and [`classify`] (a mapping
7//! function suitable for use with stream combinators like
8//! [`StreamExt::map`](futures::StreamExt::map)).
9//!
10//! # Example
11//!
12//! ```no_run
13//! use tokio::net::TcpStream;
14//! use tokio_util::codec::Framed;
15//! use futures::{SinkExt, StreamExt};
16//! use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand, StatusFormat};
17//! use openvpn_mgmt_codec::stream::{ManagementEvent, classify};
18//!
19//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20//! let stream = TcpStream::connect("127.0.0.1:7505").await?;
21//! let framed = Framed::new(stream, OvpnCodec::new());
22//! let (mut sink, raw_stream) = framed.split();
23//!
24//! let mut mgmt = raw_stream.map(classify);
25//!
26//! sink.send(OvpnCommand::Status(StatusFormat::V3)).await?;
27//!
28//! while let Some(event) = mgmt.next().await {
29//!     match event? {
30//!         ManagementEvent::Notification(n) => {
31//!             println!("async notification: {n:?}");
32//!         }
33//!         ManagementEvent::Response(msg) => {
34//!             println!("command response: {msg:?}");
35//!         }
36//!     }
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42use std::io;
43
44use crate::message::{Notification, OvpnMessage};
45
46/// A management-interface event, categorized as either a command response
47/// or an asynchronous notification.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ManagementEvent {
50    /// A command response: [`OvpnMessage::Success`], [`OvpnMessage::Error`],
51    /// [`OvpnMessage::MultiLine`], [`OvpnMessage::Pkcs11IdEntry`],
52    /// [`OvpnMessage::Info`], [`OvpnMessage::PasswordPrompt`], or
53    /// [`OvpnMessage::Unrecognized`].
54    Response(OvpnMessage),
55
56    /// A real-time notification from the daemon.
57    Notification(Notification),
58}
59
60impl From<OvpnMessage> for ManagementEvent {
61    fn from(msg: OvpnMessage) -> Self {
62        match msg {
63            OvpnMessage::Notification(n) => Self::Notification(n),
64            other => Self::Response(other),
65        }
66    }
67}
68
69/// Classify an [`OvpnMessage`] result into a [`ManagementEvent`] result.
70///
71/// This function is designed to be passed directly to a stream combinator:
72///
73/// ```ignore
74/// use futures::StreamExt;
75/// let events = raw_stream.map(classify);
76/// ```
77///
78/// # Extracting notifications with timeout
79///
80/// ```no_run
81/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82/// use tokio::net::TcpStream;
83/// use tokio_util::codec::Framed;
84/// use futures::{SinkExt, StreamExt};
85/// use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand};
86///
87/// let stream = TcpStream::connect("127.0.0.1:7505").await?;
88/// let mut framed = Framed::new(stream, OvpnCodec::new());
89///
90/// // Send a command and read the response with a timeout.
91/// framed.send(OvpnCommand::Pid).await?;
92/// let response = tokio::time::timeout(
93///     std::time::Duration::from_secs(5),
94///     framed.next(),
95/// ).await?
96///  .ok_or("stream ended")?
97///  ?;
98/// println!("got: {response:?}");
99/// # Ok(())
100/// # }
101/// ```
102///
103/// # Reconnection with backoff
104///
105/// ```no_run
106/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
107/// use tokio::net::TcpStream;
108/// use tokio_util::codec::Framed;
109/// use futures::StreamExt;
110/// use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage};
111///
112/// let mut backoff = std::time::Duration::from_secs(1);
113/// loop {
114///     match TcpStream::connect("127.0.0.1:7505").await {
115///         Ok(stream) => {
116///             backoff = std::time::Duration::from_secs(1); // reset
117///             let mut framed = Framed::new(stream, OvpnCodec::new());
118///             while let Some(msg) = framed.next().await {
119///                 match msg {
120///                     Ok(m) => println!("{m:?}"),
121///                     Err(e) => { eprintln!("decode error: {e}"); break; }
122///                 }
123///             }
124///             eprintln!("connection closed, reconnecting...");
125///         }
126///         Err(e) => {
127///             eprintln!("connect failed: {e}, retrying in {backoff:?}");
128///         }
129///     }
130///     tokio::time::sleep(backoff).await;
131///     backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
132/// }
133/// # }
134/// ```
135///
136/// # Detecting connection loss via `>FATAL:`
137///
138/// ```no_run
139/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
140/// use tokio::net::TcpStream;
141/// use tokio_util::codec::Framed;
142/// use futures::StreamExt;
143/// use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage, Notification};
144///
145/// let stream = TcpStream::connect("127.0.0.1:7505").await?;
146/// let mut framed = Framed::new(stream, OvpnCodec::new());
147///
148/// while let Some(msg) = framed.next().await {
149///     match msg? {
150///         OvpnMessage::Notification(Notification::Fatal { message }) => {
151///             eprintln!("OpenVPN fatal: {message}");
152///             // Trigger graceful shutdown / reconnection.
153///             break;
154///         }
155///         other => println!("{other:?}"),
156///     }
157/// }
158/// // Stream ended — either FATAL or the daemon closed the connection.
159/// // In both cases, you should reconnect (see reconnection example above).
160/// # Ok(())
161/// # }
162/// ```
163pub fn classify(result: Result<OvpnMessage, io::Error>) -> Result<ManagementEvent, io::Error> {
164    result.map(ManagementEvent::from)
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::message::Notification;
171
172    #[test]
173    fn success_maps_to_response() {
174        let msg = OvpnMessage::Success("pid=42".to_string());
175        let event: ManagementEvent = msg.into();
176        assert!(matches!(
177            event,
178            ManagementEvent::Response(OvpnMessage::Success(_))
179        ));
180    }
181
182    #[test]
183    fn notification_maps_to_notification() {
184        let msg = OvpnMessage::Notification(Notification::Hold {
185            text: "Waiting".to_string(),
186        });
187        let event: ManagementEvent = msg.into();
188        assert!(matches!(
189            event,
190            ManagementEvent::Notification(Notification::Hold { .. })
191        ));
192    }
193
194    #[test]
195    fn info_maps_to_response() {
196        let msg = OvpnMessage::Info("banner".to_string());
197        let event: ManagementEvent = msg.into();
198        assert!(matches!(
199            event,
200            ManagementEvent::Response(OvpnMessage::Info(_))
201        ));
202    }
203
204    #[test]
205    fn classify_maps_ok() {
206        let result: Result<OvpnMessage, io::Error> = Ok(OvpnMessage::Success("test".to_string()));
207        let event = classify(result).unwrap();
208        assert!(matches!(
209            event,
210            ManagementEvent::Response(OvpnMessage::Success(_))
211        ));
212    }
213
214    #[test]
215    fn classify_passes_through_error() {
216        let result: Result<OvpnMessage, io::Error> = Err(io::Error::other("fail"));
217        assert!(classify(result).is_err());
218    }
219}