1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use crate::commands::Command;
use crate::network::buffer::Network;
use crate::network::client::CommandErrors;
use crate::network::client::CommandErrors::CommandResponseViolation;
use crate::network::protocol::Protocol;
use crate::network::timeout::Timeout;
use embedded_nal::TcpClientStack;
use embedded_time::Clock;
use nb;

#[derive(Clone)]
pub(crate) struct Identity {
    /// Used for invalidating futures
    /// Gets incremented on fatal problems like timeouts or fault responses, on which message<->future
    /// mapping can no longer be guaranteed
    pub series: usize,

    /// Unique index of mapping future to response message
    pub index: usize,
}

/// Non-blocking response management
pub struct Future<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> {
    id: Identity,
    command: Cmd,
    protocol: P,
    network: &'a Network<'a, N, P>,
    timeout: Timeout<'a, C>,

    /// Cached error during work of ready(). Will be returned on wait() call.
    error: Option<CommandErrors>,

    /// Was wait called? Flag is used for destructor.
    wait_called: bool,
}

impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Future<'a, N, C, P, Cmd> {
    pub(crate) fn new(
        id: Identity,
        command: Cmd,
        protocol: P,
        network: &'a Network<'a, N, P>,
        timeout: Timeout<'a, C>,
    ) -> Future<'a, N, C, P, Cmd> {
        Self {
            id,
            command,
            protocol,
            network,
            timeout,
            error: None,
            wait_called: false,
        }
    }

    /// Blocks until response is received and returns the response
    /// Throws an error on invalid response or timeout (if configured)
    pub fn wait(mut self) -> Result<Cmd::Response, CommandErrors> {
        self.wait_called = true;

        if self.error.is_some() {
            return Err(self.error.clone().unwrap());
        }

        self.process(true)?;

        // Previous process call ensures that frame is existing
        let frame = self.network.take_frame(&self.id).unwrap();
        self.protocol.assert_error(&frame)?;

        match self.command.eval_response(frame) {
            Ok(response) => Ok(response),
            Err(_) => Err(CommandResponseViolation),
        }
    }

    /// Non blocking method for checking if data is ready
    /// So if true is returned, wait() is non-blocking
    /// Reads all pending data and returns true if response is ready
    /// Errors are preserved and returned on wait() call
    pub fn ready(&mut self) -> bool {
        match self.process(false) {
            Ok(_) => match self.network.is_complete(&self.id) {
                Ok(result) => result,
                Err(error) => {
                    self.error = Some(error);
                    true
                }
            },
            Err(error) => {
                self.error = Some(error);
                true
            }
        }
    }

    /// Processes socket data
    /// If block=false, only pending data is read without blocking
    fn process(&mut self, block: bool) -> Result<(), CommandErrors> {
        while !self.network.is_complete(&self.id)? {
            let result = self.network.receive_chunk();

            if let Err(error) = result {
                match error {
                    nb::Error::Other(_) => {
                        return Err(CommandErrors::TcpError);
                    }
                    nb::Error::WouldBlock => {
                        if self.timeout.expired()? {
                            self.network.invalidate_futures();
                            return Err(CommandErrors::Timeout);
                        }

                        if !block {
                            return Ok(());
                        }
                    }
                }
            }
        }

        Ok(())
    }
}

impl<'a, N: TcpClientStack, C: Clock, P: Protocol, Cmd: Command<P::FrameType>> Drop
    for Future<'a, N, C, P, Cmd>
{
    fn drop(&mut self) {
        if !self.wait_called {
            self.network.drop_future(self.id.clone());
        }
    }
}