embedded_redis/network/
future.rs

1use crate::commands::Command;
2use crate::network::buffer::Network;
3use crate::network::client::CommandErrors;
4use crate::network::client::CommandErrors::CommandResponseViolation;
5use crate::network::protocol::Protocol;
6use crate::network::timeout::Timeout;
7use embedded_nal::TcpClientStack;
8use embedded_time::Clock;
9use nb;
10
11#[derive(Clone)]
12pub(crate) struct Identity {
13    /// Used for invalidating futures
14    /// Gets incremented on fatal problems like timeouts or fault responses, on which message<->future
15    /// mapping can no longer be guaranteed
16    pub series: usize,
17
18    /// Unique index of mapping future to response message
19    pub index: usize,
20}
21
22/// Non-blocking response management
23pub struct Future<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> {
24    id: Identity,
25    command: Cmd,
26    protocol: P,
27    network: &'a Network<'a, N, P>,
28    timeout: Timeout<'a, C>,
29
30    /// Cached error during work of ready(). Will be returned on wait() call.
31    error: Option<CommandErrors>,
32
33    /// Was wait called? Flag is used for destructor.
34    wait_called: bool,
35}
36
37impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Future<'a, N, C, P, Cmd> {
38    pub(crate) fn new(
39        id: Identity,
40        command: Cmd,
41        protocol: P,
42        network: &'a Network<'a, N, P>,
43        timeout: Timeout<'a, C>,
44    ) -> Future<'a, N, C, P, Cmd> {
45        Self {
46            id,
47            command,
48            protocol,
49            network,
50            timeout,
51            error: None,
52            wait_called: false,
53        }
54    }
55
56    /// Blocks until response is received and returns the response
57    /// Throws an error on invalid response or timeout (if configured)
58    pub fn wait(mut self) -> Result<Cmd::Response, CommandErrors> {
59        self.wait_called = true;
60
61        if self.error.is_some() {
62            return Err(self.error.clone().unwrap());
63        }
64
65        self.process(true)?;
66
67        // Previous process call ensures that frame is existing
68        let frame = self.network.take_frame(&self.id).unwrap();
69        self.protocol.assert_error(&frame)?;
70
71        match self.command.eval_response(frame) {
72            Ok(response) => Ok(response),
73            Err(_) => Err(CommandResponseViolation),
74        }
75    }
76
77    /// Non blocking method for checking if data is ready
78    /// So if true is returned, wait() is non-blocking
79    /// Reads all pending data and returns true if response is ready
80    /// Errors are preserved and returned on wait() call
81    pub fn ready(&mut self) -> bool {
82        match self.process(false) {
83            Ok(_) => match self.network.is_complete(&self.id) {
84                Ok(result) => result,
85                Err(error) => {
86                    self.error = Some(error);
87                    true
88                }
89            },
90            Err(error) => {
91                self.error = Some(error);
92                true
93            }
94        }
95    }
96
97    /// Processes socket data
98    /// If block=false, only pending data is read without blocking
99    fn process(&mut self, block: bool) -> Result<(), CommandErrors> {
100        while !self.network.is_complete(&self.id)? {
101            let result = self.network.receive_chunk();
102
103            if self.network.is_buffer_full() {
104                return Err(CommandErrors::MemoryFull);
105            }
106
107            if let Err(error) = result {
108                match error {
109                    nb::Error::Other(_) => {
110                        return Err(CommandErrors::TcpError);
111                    }
112                    nb::Error::WouldBlock => {
113                        if self.timeout.expired()? {
114                            self.network.invalidate_futures();
115                            return Err(CommandErrors::Timeout);
116                        }
117
118                        if !block {
119                            return Ok(());
120                        }
121                    }
122                }
123            }
124        }
125
126        Ok(())
127    }
128}
129
130impl<N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Drop for Future<'_, N, C, P, Cmd> {
131    fn drop(&mut self) {
132        if !self.wait_called {
133            self.network.drop_future(self.id.clone());
134        }
135    }
136}