tacacs_plus/
inner.rs

1//! The non-thread-safe internals of a client.
2
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::pin::Pin;
7use std::task::Poll;
8
9use byteorder::{ByteOrder, NetworkEndian};
10use futures::poll;
11use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
12use tacacs_plus_protocol::{Deserialize, PacketBody, Serialize};
13use tacacs_plus_protocol::{HeaderInfo, Packet, PacketFlags};
14
15use super::ClientError;
16
17#[cfg(test)]
18mod tests;
19
20/// A (pinned, boxed) future that returns a client connection or an error, as returned from a [`ConnectionFactory`].
21///
22/// This is roughly equivalent to the [`BoxFuture`](futures::future::BoxFuture) type in the `futures` crate, but without
23/// the lifetime parameter.
24pub type ConnectionFuture<S> = Pin<Box<dyn Future<Output = io::Result<S>> + Send>>;
25
26/// An async factory that returns connections used by a [`Client`](super::Client).
27///
28/// The `Box` allows both closures and function pointers.
29///
30/// [Async closures are currently unstable](https://github.com/rust-lang/rust/issues/62290),
31/// but you can emulate them with normal functions or closures that return `Box::pin`ned async blocks.
32///
33/// Rust's closure type inference can also fail sometimes, so either explicitly annotating
34/// the type of a closure or passing it directly to a function call (e.g., [`Client::new()`](super::Client::new))
35/// can fix that.
36///
37/// # Examples
38///
39/// ```
40/// use futures::io::{Cursor, Result};
41///
42/// use tacacs_plus::{ConnectionFactory, ConnectionFuture};
43///
44/// // function that returns a connection (in this case just a Cursor)
45/// fn function_factory() -> ConnectionFuture<Cursor<Vec<u8>>> {
46///     Box::pin(async {
47///         let vec = Vec::new();
48///         Ok(Cursor::new(vec))
49///     })
50/// }
51///
52/// // boxed function pointer
53/// let _: ConnectionFactory<_> = Box::new(function_factory);
54///
55/// // closures work too
56/// let _: ConnectionFactory<_> = Box::new(
57///     || Box::pin(
58///         async {
59///             let vec: Vec<u8> = Vec::new();
60///             Ok(Cursor::new(vec))
61///         }
62///     )
63/// );
64/// ```
65pub type ConnectionFactory<S> = Box<dyn Fn() -> ConnectionFuture<S> + Send>;
66
67pub(super) struct ClientInner<S> {
68    /// The underlying (TCP per RFC8907) connection for this client, if present.
69    connection: Option<S>,
70
71    /// A factory for opening new connections internally, so the library consumer doesn't have to.
72    ///
73    /// The factory is invoked whenever a new connection needs to be established, including when an ERROR status
74    /// is reported by the server as well as for each new session if the server doesn't support single connection mode.
75    connection_factory: ConnectionFactory<S>,
76
77    /// Whether a session has been completed on the contained connection.
78    first_session_completed: bool,
79
80    /// Whether single connection mode has been established for this connection.
81    ///
82    /// The single connection flag is meant to be ignored after the first two packets
83    /// in a session according to [RFC8907 section 4.3], so we have to keep track of
84    /// that internally.
85    ///
86    /// [RFC8907 section 4.3]: https://www.rfc-editor.org/rfc/rfc8907.html#section-4.3-5
87    single_connection_established: bool,
88}
89
90impl<S: fmt::Debug> fmt::Debug for ClientInner<S> {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("ClientInner")
93            .field("connection", &self.connection)
94            .field("first_session_completed", &self.first_session_completed)
95            .field(
96                "single_connection_established",
97                &self.single_connection_established,
98            )
99            .finish_non_exhaustive()
100    }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> {
104    pub(super) fn new(factory: ConnectionFactory<S>) -> Self {
105        Self {
106            connection: None,
107            connection_factory: factory,
108            first_session_completed: false,
109            single_connection_established: false,
110        }
111    }
112
113    /// NOTE: This function will open a new connection with the stored factory as needed.
114    async fn connection(&mut self) -> io::Result<&mut S> {
115        // obtain new connection from factory
116        if self.connection.is_none() {
117            let new_conn = (self.connection_factory)().await?;
118            self.connection = Some(new_conn);
119        }
120
121        // SAFETY: self.connection is guaranteed to be non-None by the above check
122        let conn = self.connection.as_mut().unwrap();
123
124        Ok(conn)
125    }
126
127    /// Writes a packet to the underlying connection, reconnecting if necessary.
128    pub(super) async fn send_packet<B: PacketBody + Serialize>(
129        &mut self,
130        packet: Packet<B>,
131        secret_key: Option<&[u8]>,
132    ) -> Result<(), ClientError> {
133        // check if other end closed our connection, and reopen it accordingly
134        let connection = self.connection().await?;
135        if !is_connection_open(connection).await? {
136            self.post_session_cleanup(true).await?;
137        }
138
139        // send the packet after ensuring the connection is valid (or dropping
140        // it if it's invalid)
141        self._send_packet(packet, secret_key).await
142    }
143
144    /// Writes a packet to the underlying connection.
145    async fn _send_packet<B: PacketBody + Serialize>(
146        &mut self,
147        packet: Packet<B>,
148        secret_key: Option<&[u8]>,
149    ) -> Result<(), ClientError> {
150        // allocate zero-filled buffer large enough to hold packet
151        let mut packet_buffer = vec![0; packet.wire_size()];
152
153        // obfuscate packet if we have a secret key
154        if let Some(key) = secret_key {
155            packet.serialize(key, &mut packet_buffer)?;
156        } else {
157            packet.serialize_unobfuscated(&mut packet_buffer)?;
158        }
159
160        let connection = self.connection().await?;
161        connection.write_all(&packet_buffer).await?;
162        connection.flush().await.map_err(Into::into)
163    }
164
165    /// Receives a packet from the underlying connection.
166    pub(super) async fn receive_packet<B>(
167        &mut self,
168        secret_key: Option<&[u8]>,
169        expected_sequence_number: u8,
170    ) -> Result<Packet<B>, ClientError>
171    where
172        B: PacketBody + for<'a> Deserialize<'a>,
173    {
174        let mut buffer = vec![0; HeaderInfo::HEADER_SIZE_BYTES];
175        let buffer = &mut buffer;
176
177        let connection = self.connection().await?;
178        connection.read_exact(buffer).await?;
179
180        // read rest of body based on length reported in header
181        let body_length = NetworkEndian::read_u32(&buffer[8..12]);
182        buffer.resize(HeaderInfo::HEADER_SIZE_BYTES + body_length as usize, 0);
183        connection
184            .read_exact(&mut buffer[HeaderInfo::HEADER_SIZE_BYTES..])
185            .await?;
186
187        // unobfuscate packet as necessary
188        let deserialize_result: Packet<B> = if let Some(key) = secret_key {
189            Packet::deserialize(key, buffer)?
190        } else {
191            Packet::deserialize_unobfuscated(buffer)?
192        };
193
194        let actual_sequence_number = deserialize_result.header().sequence_number();
195        if actual_sequence_number == expected_sequence_number {
196            Ok(deserialize_result)
197        } else {
198            Err(ClientError::SequenceNumberMismatch {
199                expected: expected_sequence_number,
200                actual: actual_sequence_number,
201            })
202        }
203    }
204
205    /// NOTE: This function is separate from post_session_cleanup since it has to be done after the first reply/second packet
206    /// in a session, but ASCII authentication can span more packets.
207    pub(super) fn set_internal_single_connect_status(&mut self, header: &HeaderInfo) {
208        // only update single connection status if this is the first reply of the first session of this connection
209        if !self.first_session_completed
210            && header.sequence_number() == 2
211            && header.flags().contains(PacketFlags::SINGLE_CONNECTION)
212        {
213            self.single_connection_established = true;
214        }
215    }
216
217    pub(super) async fn post_session_cleanup(&mut self, status_is_error: bool) -> io::Result<()> {
218        // close session if server doesn't agree to SINGLE_CONNECTION negotiation, or if an error occurred (since a mutex guarantees only one session is going at a time)
219        if !self.single_connection_established || status_is_error {
220            // SAFETY: connection() should be called before this function, and guarantees inner.connection is non-None
221            let mut connection = self.connection.take().unwrap();
222            connection.close().await?;
223
224            // reset connection status "flags", as a new one will be opened for the next session
225            self.single_connection_established = false;
226            self.first_session_completed = false;
227        } else if !self.first_session_completed {
228            // connection was not closed, so we indicate that a session was completed on this connection to ignore
229            // the single connection mode flag for future sessions on this connection, as required by RFC 8907.
230            // (see section 4.3: https://www.rfc-editor.org/rfc/rfc8907.html#section-4.3-5)
231            self.first_session_completed = true;
232        }
233
234        Ok(())
235    }
236}
237
238/// Checks if the provided connection is still open on both sides.
239///
240/// This is accomplished by attempting to read a single byte from the connection
241/// and checking for an EOF condition or specific errors (broken pipe/connection reset).
242///
243/// This might be overkill, but during testing I encountered a case where a write succeeded
244/// and a subsequent read hung due to the connection being closed on the other side, so
245/// avoiding that is preferable.
246async fn is_connection_open<C>(connection: &mut C) -> io::Result<bool>
247where
248    C: AsyncRead + Unpin,
249{
250    // read into a 1-byte buffer, since a 0-byte buffer might return 0 besides just on EOF
251    let mut buffer = [0];
252
253    // poll the read future exactly once to see if anything is ready immediately
254    match poll!(connection.read(&mut buffer)) {
255        // something ready on first poll likely indicates something wrong, since we aren't
256        // expecting any data to actually be ready
257        Poll::Ready(ready) => match ready {
258            // read of length 0 indicates an EOF, which happens when the other side closes a TCP connection
259            Ok(0) => Ok(false),
260
261            Err(e) => match e.kind() {
262                // these errors indicate that the connection is closed, which is the exact
263                // situation we're trying to recover from
264                //
265                // BrokenPipe seems to be Linux-specific (?), ConnectionReset is more general though
266                // (checked TCP & read(2) man pages for MacOS/FreeBSD/Linux)
267                io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset => Ok(false),
268
269                // bubble up any other errors to the caller
270                _ => Err(e),
271            },
272
273            // if there's data still available, the connection is still open, although
274            // this shouldn't happen in the context of TACACS+
275            Ok(_) => Ok(true),
276        },
277
278        // nothing ready to read -> connection is still open
279        Poll::Pending => Ok(true),
280    }
281}