embedded_redis/network/
handler.rs

1use crate::commands::auth::AuthCommand;
2use crate::commands::builder::{CommandBuilder, ToStringOption};
3use crate::commands::hello::HelloCommand;
4use crate::commands::ping::PingCommand;
5use crate::commands::Command;
6use crate::network::buffer::Network;
7use crate::network::client::{Client, CommandErrors};
8use crate::network::handler::ConnectionError::{TcpConnectionFailed, TcpSocketError};
9use crate::network::protocol::{Protocol, Resp2, Resp3};
10use crate::network::response::MemoryParameters;
11use alloc::string::{String, ToString};
12use core::cell::RefCell;
13use core::net::SocketAddr;
14use embedded_nal::TcpClientStack;
15use embedded_time::duration::Extensions;
16use embedded_time::duration::Microseconds;
17use embedded_time::Clock;
18
19/// Error handling for connection management
20#[derive(Debug, Eq, PartialEq)]
21pub enum ConnectionError {
22    /// Unable to get a socket from network layer
23    TcpSocketError,
24
25    /// TCP Connect failed
26    TcpConnectionFailed,
27
28    /// Authentication failed with the given sub error
29    AuthenticationError(CommandErrors),
30
31    /// Protocol switch (switch to RESP3) failed with the given sub error
32    ProtocolSwitchError(CommandErrors),
33}
34
35/// Authentication credentials
36#[derive(Clone)]
37pub struct Credentials {
38    pub(crate) username: Option<String>,
39    pub(crate) password: String,
40}
41
42impl Credentials {
43    /// Uses ACL based authentication
44    /// Required Redis version >= 6 + ACL enabled
45    pub fn acl(username: &str, password: &str) -> Self {
46        Credentials {
47            username: Some(username.to_string()),
48            password: password.to_string(),
49        }
50    }
51
52    /// Uses password-only authentication.
53    /// This form just authenticates against the password set with requirepass (Redis server conf)
54    pub fn password_only(password: &str) -> Self {
55        Self {
56            username: None,
57            password: password.to_string(),
58        }
59    }
60}
61
62/// Connection handler for Redis client
63///
64/// While the Client is not Send, the connection handler is.
65/// The handler is designed with the approach that the creation of new clients is cheap.
66/// Thus, the use of short-lived clients in concurrent applications is not a problem.
67pub struct ConnectionHandler<N: TcpClientStack, P: Protocol>
68where
69    HelloCommand: Command<<P as Protocol>::FrameType>,
70{
71    /// Network details of Redis server
72    remote: SocketAddr,
73
74    /// Authentication credentials. None in case of no authentication.
75    authentication: Option<Credentials>,
76
77    /// Cached socket
78    socket: Option<N::TcpSocket>,
79
80    /// Previous authentication try failed, so socket gets closed on next connect()
81    auth_failed: bool,
82
83    /// Optional timeout
84    /// Max. duration waiting for Redis responses
85    timeout: Microseconds,
86
87    /// Parameters for memory allocation
88    memory: MemoryParameters,
89
90    /// Redis protocol
91    /// RESP3 requires Redis version >= 6.0
92    protocol: P,
93
94    /// Use PING command for testing connection
95    use_ping: bool,
96
97    /// Response to HELLO command, only used for RESP3
98    pub(crate) hello_response: Option<<HelloCommand as Command<<P as Protocol>::FrameType>>::Response>,
99}
100
101impl<N: TcpClientStack> ConnectionHandler<N, Resp2> {
102    /// Creates a new connection handler using RESP2 protocol
103    pub fn resp2(remote: SocketAddr) -> ConnectionHandler<N, Resp2> {
104        ConnectionHandler::new(remote, Resp2 {})
105    }
106}
107
108impl<N: TcpClientStack> ConnectionHandler<N, Resp3> {
109    /// Creates a new connection handler using RESP3 protocol
110    pub fn resp3(remote: SocketAddr) -> ConnectionHandler<N, Resp3> {
111        ConnectionHandler::new(remote, Resp3 {})
112    }
113}
114
115impl<N: TcpClientStack, P: Protocol> ConnectionHandler<N, P>
116where
117    AuthCommand: Command<<P as Protocol>::FrameType>,
118    HelloCommand: Command<<P as Protocol>::FrameType>,
119    PingCommand: Command<<P as Protocol>::FrameType>,
120    <P as Protocol>::FrameType: ToStringOption,
121    <P as Protocol>::FrameType: From<CommandBuilder>,
122{
123    fn new(remote: SocketAddr, protocol: P) -> Self {
124        ConnectionHandler {
125            remote,
126            authentication: None,
127            socket: None,
128            auth_failed: false,
129            timeout: 0.microseconds(),
130            memory: MemoryParameters::default(),
131            protocol,
132            use_ping: false,
133            hello_response: None,
134        }
135    }
136
137    /// Returns a Redis client. Caches the connection for future reuse.
138    /// The client has the same lifetime as the network reference.
139    ///
140    /// As the connection is cached, later calls are cheap.
141    /// So a new client may be created when switching threads, RISC tasks, etc.
142    ///
143    /// *Authentication*
144    /// Authentication is done automatically when creating a new connection. So the caller can
145    /// expect a already authenticated and read2use client
146    ///
147    /// # Arguments
148    ///
149    /// * `network`: Mutable borrow of embedded-nal network stack
150    /// * `clock`: Borrow of embedded-time clock
151    ///
152    /// returns: Result<Client<N, C, P>, ConnectionError>
153    pub fn connect<'a, C: Clock>(
154        &'a mut self,
155        network: &'a mut N,
156        clock: Option<&'a C>,
157    ) -> Result<Client<'a, N, C, P>, ConnectionError> {
158        // Previous socket is maybe faulty, so we are closing it here
159        if self.auth_failed {
160            self.disconnect(network);
161        }
162
163        // Check if cached socket is still connected
164        self.test_socket(network, clock);
165
166        // Reuse existing connection
167        if self.socket.is_some() {
168            return Ok(self.create_client(network, clock));
169        }
170
171        self.new_client(network, clock)
172    }
173
174    /// Creates and authenticates a new client
175    fn new_client<'a, C: Clock>(
176        &'a mut self,
177        network: &'a mut N,
178        clock: Option<&'a C>,
179    ) -> Result<Client<'a, N, C, P>, ConnectionError> {
180        self.connect_socket(network)?;
181        let credentials = self.authentication.clone();
182        let client = self.create_client(network, clock);
183
184        match client.init(credentials) {
185            Ok(response) => {
186                self.hello_response = response;
187                Ok(self.create_client(network, clock))
188            }
189            Err(error) => {
190                self.auth_failed = true;
191                Err(error)
192            }
193        }
194    }
195
196    /// Tests if the cached socket is still connected, if not it's closed
197    fn test_socket<'a, C: Clock>(&'a mut self, network: &'a mut N, clock: Option<&'a C>) {
198        if self.socket.is_none() {
199            return;
200        }
201
202        if self.use_ping && self.ping(network, clock).is_err() {
203            self.disconnect(network);
204        }
205    }
206
207    /// Sends ping command for testing the socket
208    fn ping<'a, C: Clock>(
209        &'a mut self,
210        network: &'a mut N,
211        clock: Option<&'a C>,
212    ) -> Result<(), CommandErrors> {
213        self.create_client(network, clock).ping()?.wait()?;
214        Ok(())
215    }
216
217    /// Disconnects the connection
218    pub fn disconnect(&mut self, network: &mut N) {
219        if self.socket.is_none() {
220            return;
221        }
222
223        let _ = network.close(self.socket.take().unwrap());
224        self.auth_failed = false;
225    }
226
227    /// Creates a new TCP connection
228    fn connect_socket(&mut self, network: &mut N) -> Result<(), ConnectionError> {
229        let socket_result = network.socket();
230        if socket_result.is_err() {
231            return Err(TcpSocketError);
232        }
233
234        let mut socket = socket_result.unwrap();
235        if network.connect(&mut socket, self.remote).is_err() {
236            let _ = network.close(socket);
237            return Err(TcpConnectionFailed);
238        };
239
240        self.socket = Some(socket);
241        Ok(())
242    }
243
244    /// Creates a new client instance
245    fn create_client<'a, C: Clock>(
246        &'a mut self,
247        stack: &'a mut N,
248        clock: Option<&'a C>,
249    ) -> Client<'a, N, C, P> {
250        Client {
251            network: Network::new(
252                RefCell::new(stack),
253                RefCell::new(self.socket.as_mut().unwrap()),
254                self.protocol.clone(),
255                self.memory.clone(),
256            ),
257            timeout_duration: self.timeout,
258            clock,
259            hello_response: self.hello_response.as_ref(),
260        }
261    }
262}
263
264impl<N: TcpClientStack, P: Protocol> ConnectionHandler<N, P>
265where
266    HelloCommand: Command<<P as Protocol>::FrameType>,
267{
268    /// Sets the max. duration waiting for Redis responses
269    pub fn timeout(&mut self, timeout: Microseconds) -> &mut Self {
270        self.timeout = timeout;
271        self
272    }
273
274    /// Sets the authentication credentials
275    pub fn auth(&mut self, credentials: Credentials) -> &mut Self {
276        self.authentication = Some(credentials);
277        self
278    }
279
280    /// Using PING command for testing connections
281    pub fn use_ping(&mut self) -> &mut Self {
282        self.use_ping = true;
283        self
284    }
285
286    /// Sets memory allocation parameters
287    pub fn memory(&mut self, parameters: MemoryParameters) -> &mut Self {
288        self.memory = parameters;
289        self
290    }
291}