openvpn_mgmt_codec/stream.rs
1//! Helpers for categorizing [`OvpnMessage`]s into responses and notifications.
2//!
3//! The raw codec yields [`OvpnMessage`] variants, and callers typically
4//! need to branch on "is this a response to a command I sent?" vs. "is
5//! this an asynchronous notification?". This module provides
6//! [`ManagementEvent`] (the two-variant enum) and [`classify`] (a mapping
7//! function suitable for use with stream combinators like
8//! [`StreamExt::map`](futures::StreamExt::map)).
9//!
10//! # Example
11//!
12//! ```no_run
13//! use tokio::net::TcpStream;
14//! use tokio_util::codec::Framed;
15//! use futures::{SinkExt, StreamExt};
16//! use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand, StatusFormat};
17//! use openvpn_mgmt_codec::stream::{ManagementEvent, classify};
18//!
19//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20//! let stream = TcpStream::connect("127.0.0.1:7505").await?;
21//! let framed = Framed::new(stream, OvpnCodec::new());
22//! let (mut sink, raw_stream) = framed.split();
23//!
24//! let mut mgmt = raw_stream.map(classify);
25//!
26//! sink.send(OvpnCommand::Status(StatusFormat::V3)).await?;
27//!
28//! while let Some(event) = mgmt.next().await {
29//! match event? {
30//! ManagementEvent::Notification(n) => {
31//! println!("async notification: {n:?}");
32//! }
33//! ManagementEvent::Response(msg) => {
34//! println!("command response: {msg:?}");
35//! }
36//! }
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42use std::io;
43
44use crate::message::{Notification, OvpnMessage};
45
46/// A management-interface event, categorized as either a command response
47/// or an asynchronous notification.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ManagementEvent {
50 /// A command response: [`OvpnMessage::Success`], [`OvpnMessage::Error`],
51 /// [`OvpnMessage::MultiLine`], [`OvpnMessage::Pkcs11IdEntry`],
52 /// [`OvpnMessage::Info`], [`OvpnMessage::PasswordPrompt`], or
53 /// [`OvpnMessage::Unrecognized`].
54 Response(OvpnMessage),
55
56 /// A real-time notification from the daemon.
57 Notification(Notification),
58}
59
60impl From<OvpnMessage> for ManagementEvent {
61 fn from(msg: OvpnMessage) -> Self {
62 match msg {
63 OvpnMessage::Notification(n) => Self::Notification(n),
64 other => Self::Response(other),
65 }
66 }
67}
68
69/// Classify an [`OvpnMessage`] result into a [`ManagementEvent`] result.
70///
71/// This function is designed to be passed directly to a stream combinator:
72///
73/// ```ignore
74/// use futures::StreamExt;
75/// let events = raw_stream.map(classify);
76/// ```
77///
78/// # Extracting notifications with timeout
79///
80/// ```no_run
81/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82/// use tokio::net::TcpStream;
83/// use tokio_util::codec::Framed;
84/// use futures::{SinkExt, StreamExt};
85/// use openvpn_mgmt_codec::{OvpnCodec, OvpnCommand};
86///
87/// let stream = TcpStream::connect("127.0.0.1:7505").await?;
88/// let mut framed = Framed::new(stream, OvpnCodec::new());
89///
90/// // Send a command and read the response with a timeout.
91/// framed.send(OvpnCommand::Pid).await?;
92/// let response = tokio::time::timeout(
93/// std::time::Duration::from_secs(5),
94/// framed.next(),
95/// ).await?
96/// .ok_or("stream ended")?
97/// ?;
98/// println!("got: {response:?}");
99/// # Ok(())
100/// # }
101/// ```
102///
103/// # Reconnection with backoff
104///
105/// ```no_run
106/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
107/// use tokio::net::TcpStream;
108/// use tokio_util::codec::Framed;
109/// use futures::StreamExt;
110/// use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage};
111///
112/// let mut backoff = std::time::Duration::from_secs(1);
113/// loop {
114/// match TcpStream::connect("127.0.0.1:7505").await {
115/// Ok(stream) => {
116/// backoff = std::time::Duration::from_secs(1); // reset
117/// let mut framed = Framed::new(stream, OvpnCodec::new());
118/// while let Some(msg) = framed.next().await {
119/// match msg {
120/// Ok(m) => println!("{m:?}"),
121/// Err(e) => { eprintln!("decode error: {e}"); break; }
122/// }
123/// }
124/// eprintln!("connection closed, reconnecting...");
125/// }
126/// Err(e) => {
127/// eprintln!("connect failed: {e}, retrying in {backoff:?}");
128/// }
129/// }
130/// tokio::time::sleep(backoff).await;
131/// backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
132/// }
133/// # }
134/// ```
135///
136/// # Detecting connection loss via `>FATAL:`
137///
138/// ```no_run
139/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
140/// use tokio::net::TcpStream;
141/// use tokio_util::codec::Framed;
142/// use futures::StreamExt;
143/// use openvpn_mgmt_codec::{OvpnCodec, OvpnMessage, Notification};
144///
145/// let stream = TcpStream::connect("127.0.0.1:7505").await?;
146/// let mut framed = Framed::new(stream, OvpnCodec::new());
147///
148/// while let Some(msg) = framed.next().await {
149/// match msg? {
150/// OvpnMessage::Notification(Notification::Fatal { message }) => {
151/// eprintln!("OpenVPN fatal: {message}");
152/// // Trigger graceful shutdown / reconnection.
153/// break;
154/// }
155/// other => println!("{other:?}"),
156/// }
157/// }
158/// // Stream ended — either FATAL or the daemon closed the connection.
159/// // In both cases, you should reconnect (see reconnection example above).
160/// # Ok(())
161/// # }
162/// ```
163pub fn classify(result: Result<OvpnMessage, io::Error>) -> Result<ManagementEvent, io::Error> {
164 result.map(ManagementEvent::from)
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::message::Notification;
171
172 #[test]
173 fn success_maps_to_response() {
174 let msg = OvpnMessage::Success("pid=42".to_string());
175 let event: ManagementEvent = msg.into();
176 assert!(matches!(
177 event,
178 ManagementEvent::Response(OvpnMessage::Success(_))
179 ));
180 }
181
182 #[test]
183 fn notification_maps_to_notification() {
184 let msg = OvpnMessage::Notification(Notification::Hold {
185 text: "Waiting".to_string(),
186 });
187 let event: ManagementEvent = msg.into();
188 assert!(matches!(
189 event,
190 ManagementEvent::Notification(Notification::Hold { .. })
191 ));
192 }
193
194 #[test]
195 fn info_maps_to_response() {
196 let msg = OvpnMessage::Info("banner".to_string());
197 let event: ManagementEvent = msg.into();
198 assert!(matches!(
199 event,
200 ManagementEvent::Response(OvpnMessage::Info(_))
201 ));
202 }
203
204 #[test]
205 fn classify_maps_ok() {
206 let result: Result<OvpnMessage, io::Error> = Ok(OvpnMessage::Success("test".to_string()));
207 let event = classify(result).unwrap();
208 assert!(matches!(
209 event,
210 ManagementEvent::Response(OvpnMessage::Success(_))
211 ));
212 }
213
214 #[test]
215 fn classify_passes_through_error() {
216 let result: Result<OvpnMessage, io::Error> = Err(io::Error::other("fail"));
217 assert!(classify(result).is_err());
218 }
219}