tacacs_plus/inner.rs
1//! The non-thread-safe internals of a client.
2
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::pin::Pin;
7use std::task::Poll;
8
9use byteorder::{ByteOrder, NetworkEndian};
10use futures::poll;
11use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
12use tacacs_plus_protocol::{Deserialize, PacketBody, Serialize};
13use tacacs_plus_protocol::{HeaderInfo, Packet, PacketFlags};
14
15use super::ClientError;
16
17#[cfg(test)]
18mod tests;
19
20/// A (pinned, boxed) future that returns a client connection or an error, as returned from a [`ConnectionFactory`].
21///
22/// This is roughly equivalent to the [`BoxFuture`](futures::future::BoxFuture) type in the `futures` crate, but without
23/// the lifetime parameter.
24pub type ConnectionFuture<S> = Pin<Box<dyn Future<Output = io::Result<S>> + Send>>;
25
26/// An async factory that returns connections used by a [`Client`](super::Client).
27///
28/// The `Box` allows both closures and function pointers.
29///
30/// [Async closures are currently unstable](https://github.com/rust-lang/rust/issues/62290),
31/// but you can emulate them with normal functions or closures that return `Box::pin`ned async blocks.
32///
33/// Rust's closure type inference can also fail sometimes, so either explicitly annotating
34/// the type of a closure or passing it directly to a function call (e.g., [`Client::new()`](super::Client::new))
35/// can fix that.
36///
37/// # Examples
38///
39/// ```
40/// use futures::io::{Cursor, Result};
41///
42/// use tacacs_plus::{ConnectionFactory, ConnectionFuture};
43///
44/// // function that returns a connection (in this case just a Cursor)
45/// fn function_factory() -> ConnectionFuture<Cursor<Vec<u8>>> {
46/// Box::pin(async {
47/// let vec = Vec::new();
48/// Ok(Cursor::new(vec))
49/// })
50/// }
51///
52/// // boxed function pointer
53/// let _: ConnectionFactory<_> = Box::new(function_factory);
54///
55/// // closures work too
56/// let _: ConnectionFactory<_> = Box::new(
57/// || Box::pin(
58/// async {
59/// let vec: Vec<u8> = Vec::new();
60/// Ok(Cursor::new(vec))
61/// }
62/// )
63/// );
64/// ```
65pub type ConnectionFactory<S> = Box<dyn Fn() -> ConnectionFuture<S> + Send>;
66
67pub(super) struct ClientInner<S> {
68 /// The underlying (TCP per RFC8907) connection for this client, if present.
69 connection: Option<S>,
70
71 /// A factory for opening new connections internally, so the library consumer doesn't have to.
72 ///
73 /// The factory is invoked whenever a new connection needs to be established, including when an ERROR status
74 /// is reported by the server as well as for each new session if the server doesn't support single connection mode.
75 connection_factory: ConnectionFactory<S>,
76
77 /// Whether a session has been completed on the contained connection.
78 first_session_completed: bool,
79
80 /// Whether single connection mode has been established for this connection.
81 ///
82 /// The single connection flag is meant to be ignored after the first two packets
83 /// in a session according to [RFC8907 section 4.3], so we have to keep track of
84 /// that internally.
85 ///
86 /// [RFC8907 section 4.3]: https://www.rfc-editor.org/rfc/rfc8907.html#section-4.3-5
87 single_connection_established: bool,
88}
89
90impl<S: fmt::Debug> fmt::Debug for ClientInner<S> {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("ClientInner")
93 .field("connection", &self.connection)
94 .field("first_session_completed", &self.first_session_completed)
95 .field(
96 "single_connection_established",
97 &self.single_connection_established,
98 )
99 .finish_non_exhaustive()
100 }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> {
104 pub(super) fn new(factory: ConnectionFactory<S>) -> Self {
105 Self {
106 connection: None,
107 connection_factory: factory,
108 first_session_completed: false,
109 single_connection_established: false,
110 }
111 }
112
113 /// NOTE: This function will open a new connection with the stored factory as needed.
114 async fn connection(&mut self) -> io::Result<&mut S> {
115 // obtain new connection from factory
116 if self.connection.is_none() {
117 let new_conn = (self.connection_factory)().await?;
118 self.connection = Some(new_conn);
119 }
120
121 // SAFETY: self.connection is guaranteed to be non-None by the above check
122 let conn = self.connection.as_mut().unwrap();
123
124 Ok(conn)
125 }
126
127 /// Writes a packet to the underlying connection, reconnecting if necessary.
128 pub(super) async fn send_packet<B: PacketBody + Serialize>(
129 &mut self,
130 packet: Packet<B>,
131 secret_key: Option<&[u8]>,
132 ) -> Result<(), ClientError> {
133 // check if other end closed our connection, and reopen it accordingly
134 let connection = self.connection().await?;
135 if !is_connection_open(connection).await? {
136 self.post_session_cleanup(true).await?;
137 }
138
139 // send the packet after ensuring the connection is valid (or dropping
140 // it if it's invalid)
141 self._send_packet(packet, secret_key).await
142 }
143
144 /// Writes a packet to the underlying connection.
145 async fn _send_packet<B: PacketBody + Serialize>(
146 &mut self,
147 packet: Packet<B>,
148 secret_key: Option<&[u8]>,
149 ) -> Result<(), ClientError> {
150 // allocate zero-filled buffer large enough to hold packet
151 let mut packet_buffer = vec![0; packet.wire_size()];
152
153 // obfuscate packet if we have a secret key
154 if let Some(key) = secret_key {
155 packet.serialize(key, &mut packet_buffer)?;
156 } else {
157 packet.serialize_unobfuscated(&mut packet_buffer)?;
158 }
159
160 let connection = self.connection().await?;
161 connection.write_all(&packet_buffer).await?;
162 connection.flush().await.map_err(Into::into)
163 }
164
165 /// Receives a packet from the underlying connection.
166 pub(super) async fn receive_packet<B>(
167 &mut self,
168 secret_key: Option<&[u8]>,
169 expected_sequence_number: u8,
170 ) -> Result<Packet<B>, ClientError>
171 where
172 B: PacketBody + for<'a> Deserialize<'a>,
173 {
174 let mut buffer = vec![0; HeaderInfo::HEADER_SIZE_BYTES];
175 let buffer = &mut buffer;
176
177 let connection = self.connection().await?;
178 connection.read_exact(buffer).await?;
179
180 // read rest of body based on length reported in header
181 let body_length = NetworkEndian::read_u32(&buffer[8..12]);
182 buffer.resize(HeaderInfo::HEADER_SIZE_BYTES + body_length as usize, 0);
183 connection
184 .read_exact(&mut buffer[HeaderInfo::HEADER_SIZE_BYTES..])
185 .await?;
186
187 // unobfuscate packet as necessary
188 let deserialize_result: Packet<B> = if let Some(key) = secret_key {
189 Packet::deserialize(key, buffer)?
190 } else {
191 Packet::deserialize_unobfuscated(buffer)?
192 };
193
194 let actual_sequence_number = deserialize_result.header().sequence_number();
195 if actual_sequence_number == expected_sequence_number {
196 Ok(deserialize_result)
197 } else {
198 Err(ClientError::SequenceNumberMismatch {
199 expected: expected_sequence_number,
200 actual: actual_sequence_number,
201 })
202 }
203 }
204
205 /// NOTE: This function is separate from post_session_cleanup since it has to be done after the first reply/second packet
206 /// in a session, but ASCII authentication can span more packets.
207 pub(super) fn set_internal_single_connect_status(&mut self, header: &HeaderInfo) {
208 // only update single connection status if this is the first reply of the first session of this connection
209 if !self.first_session_completed
210 && header.sequence_number() == 2
211 && header.flags().contains(PacketFlags::SINGLE_CONNECTION)
212 {
213 self.single_connection_established = true;
214 }
215 }
216
217 pub(super) async fn post_session_cleanup(&mut self, status_is_error: bool) -> io::Result<()> {
218 // close session if server doesn't agree to SINGLE_CONNECTION negotiation, or if an error occurred (since a mutex guarantees only one session is going at a time)
219 if !self.single_connection_established || status_is_error {
220 // SAFETY: connection() should be called before this function, and guarantees inner.connection is non-None
221 let mut connection = self.connection.take().unwrap();
222 connection.close().await?;
223
224 // reset connection status "flags", as a new one will be opened for the next session
225 self.single_connection_established = false;
226 self.first_session_completed = false;
227 } else if !self.first_session_completed {
228 // connection was not closed, so we indicate that a session was completed on this connection to ignore
229 // the single connection mode flag for future sessions on this connection, as required by RFC 8907.
230 // (see section 4.3: https://www.rfc-editor.org/rfc/rfc8907.html#section-4.3-5)
231 self.first_session_completed = true;
232 }
233
234 Ok(())
235 }
236}
237
238/// Checks if the provided connection is still open on both sides.
239///
240/// This is accomplished by attempting to read a single byte from the connection
241/// and checking for an EOF condition or specific errors (broken pipe/connection reset).
242///
243/// This might be overkill, but during testing I encountered a case where a write succeeded
244/// and a subsequent read hung due to the connection being closed on the other side, so
245/// avoiding that is preferable.
246async fn is_connection_open<C>(connection: &mut C) -> io::Result<bool>
247where
248 C: AsyncRead + Unpin,
249{
250 // read into a 1-byte buffer, since a 0-byte buffer might return 0 besides just on EOF
251 let mut buffer = [0];
252
253 // poll the read future exactly once to see if anything is ready immediately
254 match poll!(connection.read(&mut buffer)) {
255 // something ready on first poll likely indicates something wrong, since we aren't
256 // expecting any data to actually be ready
257 Poll::Ready(ready) => match ready {
258 // read of length 0 indicates an EOF, which happens when the other side closes a TCP connection
259 Ok(0) => Ok(false),
260
261 Err(e) => match e.kind() {
262 // these errors indicate that the connection is closed, which is the exact
263 // situation we're trying to recover from
264 //
265 // BrokenPipe seems to be Linux-specific (?), ConnectionReset is more general though
266 // (checked TCP & read(2) man pages for MacOS/FreeBSD/Linux)
267 io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset => Ok(false),
268
269 // bubble up any other errors to the caller
270 _ => Err(e),
271 },
272
273 // if there's data still available, the connection is still open, although
274 // this shouldn't happen in the context of TACACS+
275 Ok(_) => Ok(true),
276 },
277
278 // nothing ready to read -> connection is still open
279 Poll::Pending => Ok(true),
280 }
281}