rodbus/tcp/
client.rs

1use tracing::Instrument;
2
3use crate::client::{Channel, ClientState, HostAddr, Listener};
4use crate::common::phys::PhysLayer;
5use crate::decode::DecodeLevel;
6
7use crate::client::message::Command;
8use crate::client::task::{ClientLoop, SessionError, StateChange};
9use crate::common::frame::{FrameWriter, FramedReader};
10use crate::error::Shutdown;
11use crate::retry::RetryStrategy;
12
13use tokio::net::TcpStream;
14
15pub(crate) fn spawn_tcp_channel(
16    host: HostAddr,
17    max_queued_requests: usize,
18    connect_retry: Box<dyn RetryStrategy>,
19    decode: DecodeLevel,
20    listener: Box<dyn Listener<ClientState>>,
21) -> Channel {
22    let (handle, task) =
23        create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener);
24    tokio::spawn(task);
25    handle
26}
27
28pub(crate) fn create_tcp_channel(
29    host: HostAddr,
30    max_queued_requests: usize,
31    connect_retry: Box<dyn RetryStrategy>,
32    decode: DecodeLevel,
33    listener: Box<dyn Listener<ClientState>>,
34) -> (Channel, impl std::future::Future<Output = ()>) {
35    let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests);
36    let task = async move {
37        TcpChannelTask::new(
38            host.clone(),
39            rx.into(),
40            TcpTaskConnectionHandler::Tcp,
41            connect_retry,
42            decode,
43            listener,
44        )
45        .run()
46        .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host))
47        .await;
48    };
49    (Channel { tx }, task)
50}
51
52pub(crate) enum TcpTaskConnectionHandler {
53    Tcp,
54    #[cfg(feature = "tls")]
55    Tls(crate::tcp::tls::TlsClientConfig),
56}
57
58impl TcpTaskConnectionHandler {
59    async fn handle(
60        &mut self,
61        socket: TcpStream,
62        _endpoint: &HostAddr,
63    ) -> Result<PhysLayer, String> {
64        match self {
65            Self::Tcp => Ok(PhysLayer::new_tcp(socket)),
66            #[cfg(feature = "tls")]
67            Self::Tls(config) => config.handle_connection(socket, _endpoint).await,
68        }
69    }
70}
71
72pub(crate) struct TcpChannelTask {
73    host: HostAddr,
74    connect_retry: Box<dyn RetryStrategy>,
75    connection_handler: TcpTaskConnectionHandler,
76    client_loop: ClientLoop,
77    listener: Box<dyn Listener<ClientState>>,
78}
79
80impl TcpChannelTask {
81    pub(crate) fn new(
82        host: HostAddr,
83        rx: crate::channel::Receiver<Command>,
84        connection_handler: TcpTaskConnectionHandler,
85        connect_retry: Box<dyn RetryStrategy>,
86        decode: DecodeLevel,
87        listener: Box<dyn Listener<ClientState>>,
88    ) -> Self {
89        Self {
90            host,
91            connect_retry,
92            connection_handler,
93            client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode),
94            listener,
95        }
96    }
97
98    // runs until it is shut down
99    pub(crate) async fn run(&mut self) -> Shutdown {
100        self.listener.update(ClientState::Disabled).get().await;
101        let ret = self.run_inner().await;
102        self.listener.update(ClientState::Shutdown).get().await;
103        ret
104    }
105
106    async fn run_inner(&mut self) -> Shutdown {
107        loop {
108            if let Err(Shutdown) = self.client_loop.wait_for_enabled().await {
109                return Shutdown;
110            }
111
112            if let Err(StateChange::Shutdown) = self.try_connect_and_run().await {
113                return Shutdown;
114            }
115
116            if !self.client_loop.is_enabled() {
117                self.listener.update(ClientState::Disabled).get().await;
118            }
119        }
120    }
121
122    async fn connect(&mut self) -> Result<Result<TcpStream, std::io::Error>, StateChange> {
123        tokio::select! {
124            res = self.host.connect() => {
125                Ok(res)
126            }
127            res = self.client_loop.fail_requests() => {
128                Err(res)
129            }
130        }
131    }
132
133    async fn try_connect_and_run(&mut self) -> Result<(), StateChange> {
134        self.listener.update(ClientState::Connecting).get().await;
135        match self.connect().await? {
136            Err(err) => {
137                let delay = self.connect_retry.after_failed_connect();
138                tracing::warn!(
139                    "failed to connect to {}: {} - waiting {} ms before next attempt",
140                    self.host,
141                    err,
142                    delay.as_millis()
143                );
144                self.listener
145                    .update(ClientState::WaitAfterFailedConnect(delay))
146                    .get()
147                    .await;
148                self.client_loop.fail_requests_for(delay).await
149            }
150            Ok(socket) => {
151                if let Ok(addr) = socket.peer_addr() {
152                    tracing::info!("connected to: {}", addr);
153                }
154                if let Err(err) = socket.set_nodelay(true) {
155                    tracing::warn!("unable to enable TCP_NODELAY: {}", err);
156                }
157                match self.connection_handler.handle(socket, &self.host).await {
158                    Err(err) => {
159                        let delay = self.connect_retry.after_failed_connect();
160                        tracing::warn!(
161                            "{} - waiting {} ms before next attempt",
162                            err,
163                            delay.as_millis()
164                        );
165                        self.listener
166                            .update(ClientState::WaitAfterFailedConnect(delay))
167                            .get()
168                            .await;
169                        self.client_loop.fail_requests_for(delay).await
170                    }
171                    Ok(mut phys) => {
172                        self.listener.update(ClientState::Connected).get().await;
173                        // reset the retry strategy now that we have a successful connection
174                        // we do this here so that the reset happens after a TLS handshake
175                        self.connect_retry.reset();
176                        // run the physical layer independent processing loop
177                        match self.client_loop.run(&mut phys).await {
178                            // the mpsc was closed, end the task
179                            SessionError::Shutdown => Err(StateChange::Shutdown),
180                            // re-establish the connection
181                            SessionError::Disabled
182                            | SessionError::IoError(_)
183                            | SessionError::BadFrame => {
184                                let delay = self.connect_retry.after_disconnect();
185                                tracing::warn!("waiting {:?} to reconnect", delay);
186                                self.listener
187                                    .update(ClientState::WaitAfterDisconnect(delay))
188                                    .get()
189                                    .await;
190                                self.client_loop.fail_requests_for(delay).await
191                            }
192                        }
193                    }
194                }
195            }
196        }
197    }
198}