1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
use crate::commands::Command;
use crate::network::buffer::Network;
use crate::network::client::CommandErrors;
use crate::network::client::CommandErrors::CommandResponseViolation;
use crate::network::protocol::Protocol;
use crate::network::timeout::Timeout;
use embedded_nal::TcpClientStack;
use embedded_time::Clock;
use nb;
#[derive(Clone)]
pub(crate) struct Identity {
/// Used for invalidating futures
/// Gets incremented on fatal problems like timeouts or fault responses, on which message<->future
/// mapping can no longer be guaranteed
pub series: usize,
/// Unique index of mapping future to response message
pub index: usize,
}
/// Non-blocking response management
pub struct Future<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> {
id: Identity,
command: Cmd,
protocol: P,
network: &'a Network<'a, N, P>,
timeout: Timeout<'a, C>,
/// Cached error during work of ready(). Will be returned on wait() call.
error: Option<CommandErrors>,
/// Was wait called? Flag is used for destructor.
wait_called: bool,
}
impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Future<'a, N, C, P, Cmd> {
pub(crate) fn new(
id: Identity,
command: Cmd,
protocol: P,
network: &'a Network<'a, N, P>,
timeout: Timeout<'a, C>,
) -> Future<'a, N, C, P, Cmd> {
Self {
id,
command,
protocol,
network,
timeout,
error: None,
wait_called: false,
}
}
/// Blocks until response is received and returns the response
/// Throws an error on invalid response or timeout (if configured)
pub fn wait(mut self) -> Result<Cmd::Response, CommandErrors> {
self.wait_called = true;
if self.error.is_some() {
return Err(self.error.clone().unwrap());
}
self.process(true)?;
// Previous process call ensures that frame is existing
let frame = self.network.take_frame(&self.id).unwrap();
self.protocol.assert_error(&frame)?;
match self.command.eval_response(frame) {
Ok(response) => Ok(response),
Err(_) => Err(CommandResponseViolation),
}
}
/// Non blocking method for checking if data is ready
/// So if true is returned, wait() is non-blocking
/// Reads all pending data and returns true if response is ready
/// Errors are preserved and returned on wait() call
pub fn ready(&mut self) -> bool {
match self.process(false) {
Ok(_) => match self.network.is_complete(&self.id) {
Ok(result) => result,
Err(error) => {
self.error = Some(error);
true
}
},
Err(error) => {
self.error = Some(error);
true
}
}
}
/// Processes socket data
/// If block=false, only pending data is read without blocking
fn process(&mut self, block: bool) -> Result<(), CommandErrors> {
while !self.network.is_complete(&self.id)? {
let result = self.network.receive_chunk();
if self.network.is_buffer_full() {
return Err(CommandErrors::MemoryFull);
}
if let Err(error) = result {
match error {
nb::Error::Other(_) => {
return Err(CommandErrors::TcpError);
}
nb::Error::WouldBlock => {
if self.timeout.expired()? {
self.network.invalidate_futures();
return Err(CommandErrors::Timeout);
}
if !block {
return Ok(());
}
}
}
}
}
Ok(())
}
}
impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Drop
for Future<'a, N, C, P, Cmd>
{
fn drop(&mut self) {
if !self.wait_called {
self.network.drop_future(self.id.clone());
}
}
}