embedded_redis/network/
future.rs1use crate::commands::Command;
2use crate::network::buffer::Network;
3use crate::network::client::CommandErrors;
4use crate::network::client::CommandErrors::CommandResponseViolation;
5use crate::network::protocol::Protocol;
6use crate::network::timeout::Timeout;
7use embedded_nal::TcpClientStack;
8use embedded_time::Clock;
9use nb;
10
11#[derive(Clone)]
12pub(crate) struct Identity {
13 pub series: usize,
17
18 pub index: usize,
20}
21
22pub struct Future<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> {
24 id: Identity,
25 command: Cmd,
26 protocol: P,
27 network: &'a Network<'a, N, P>,
28 timeout: Timeout<'a, C>,
29
30 error: Option<CommandErrors>,
32
33 wait_called: bool,
35}
36
37impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Future<'a, N, C, P, Cmd> {
38 pub(crate) fn new(
39 id: Identity,
40 command: Cmd,
41 protocol: P,
42 network: &'a Network<'a, N, P>,
43 timeout: Timeout<'a, C>,
44 ) -> Future<'a, N, C, P, Cmd> {
45 Self {
46 id,
47 command,
48 protocol,
49 network,
50 timeout,
51 error: None,
52 wait_called: false,
53 }
54 }
55
56 pub fn wait(mut self) -> Result<Cmd::Response, CommandErrors> {
59 self.wait_called = true;
60
61 if self.error.is_some() {
62 return Err(self.error.clone().unwrap());
63 }
64
65 self.process(true)?;
66
67 let frame = self.network.take_frame(&self.id).unwrap();
69 self.protocol.assert_error(&frame)?;
70
71 match self.command.eval_response(frame) {
72 Ok(response) => Ok(response),
73 Err(_) => Err(CommandResponseViolation),
74 }
75 }
76
77 pub fn ready(&mut self) -> bool {
82 match self.process(false) {
83 Ok(_) => match self.network.is_complete(&self.id) {
84 Ok(result) => result,
85 Err(error) => {
86 self.error = Some(error);
87 true
88 }
89 },
90 Err(error) => {
91 self.error = Some(error);
92 true
93 }
94 }
95 }
96
97 fn process(&mut self, block: bool) -> Result<(), CommandErrors> {
100 while !self.network.is_complete(&self.id)? {
101 let result = self.network.receive_chunk();
102
103 if self.network.is_buffer_full() {
104 return Err(CommandErrors::MemoryFull);
105 }
106
107 if let Err(error) = result {
108 match error {
109 nb::Error::Other(_) => {
110 return Err(CommandErrors::TcpError);
111 }
112 nb::Error::WouldBlock => {
113 if self.timeout.expired()? {
114 self.network.invalidate_futures();
115 return Err(CommandErrors::Timeout);
116 }
117
118 if !block {
119 return Ok(());
120 }
121 }
122 }
123 }
124 }
125
126 Ok(())
127 }
128}
129
130impl<N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Drop for Future<'_, N, C, P, Cmd> {
131 fn drop(&mut self) {
132 if !self.wait_called {
133 self.network.drop_future(self.id.clone());
134 }
135 }
136}