1use tracing::Instrument;
2
3use crate::client::{Channel, ClientState, HostAddr, Listener};
4use crate::common::phys::PhysLayer;
5use crate::decode::DecodeLevel;
6
7use crate::client::message::Command;
8use crate::client::task::{ClientLoop, SessionError, StateChange};
9use crate::common::frame::{FrameWriter, FramedReader};
10use crate::error::Shutdown;
11use crate::retry::RetryStrategy;
12
13use tokio::net::TcpStream;
14
15pub(crate) fn spawn_tcp_channel(
16 host: HostAddr,
17 max_queued_requests: usize,
18 connect_retry: Box<dyn RetryStrategy>,
19 decode: DecodeLevel,
20 listener: Box<dyn Listener<ClientState>>,
21) -> Channel {
22 let (handle, task) =
23 create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener);
24 tokio::spawn(task);
25 handle
26}
27
28pub(crate) fn create_tcp_channel(
29 host: HostAddr,
30 max_queued_requests: usize,
31 connect_retry: Box<dyn RetryStrategy>,
32 decode: DecodeLevel,
33 listener: Box<dyn Listener<ClientState>>,
34) -> (Channel, impl std::future::Future<Output = ()>) {
35 let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests);
36 let task = async move {
37 TcpChannelTask::new(
38 host.clone(),
39 rx.into(),
40 TcpTaskConnectionHandler::Tcp,
41 connect_retry,
42 decode,
43 listener,
44 )
45 .run()
46 .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host))
47 .await;
48 };
49 (Channel { tx }, task)
50}
51
52pub(crate) enum TcpTaskConnectionHandler {
53 Tcp,
54 #[cfg(feature = "tls")]
55 Tls(crate::tcp::tls::TlsClientConfig),
56}
57
58impl TcpTaskConnectionHandler {
59 async fn handle(
60 &mut self,
61 socket: TcpStream,
62 _endpoint: &HostAddr,
63 ) -> Result<PhysLayer, String> {
64 match self {
65 Self::Tcp => Ok(PhysLayer::new_tcp(socket)),
66 #[cfg(feature = "tls")]
67 Self::Tls(config) => config.handle_connection(socket, _endpoint).await,
68 }
69 }
70}
71
72pub(crate) struct TcpChannelTask {
73 host: HostAddr,
74 connect_retry: Box<dyn RetryStrategy>,
75 connection_handler: TcpTaskConnectionHandler,
76 client_loop: ClientLoop,
77 listener: Box<dyn Listener<ClientState>>,
78}
79
80impl TcpChannelTask {
81 pub(crate) fn new(
82 host: HostAddr,
83 rx: crate::channel::Receiver<Command>,
84 connection_handler: TcpTaskConnectionHandler,
85 connect_retry: Box<dyn RetryStrategy>,
86 decode: DecodeLevel,
87 listener: Box<dyn Listener<ClientState>>,
88 ) -> Self {
89 Self {
90 host,
91 connect_retry,
92 connection_handler,
93 client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode),
94 listener,
95 }
96 }
97
98 pub(crate) async fn run(&mut self) -> Shutdown {
100 self.listener.update(ClientState::Disabled).get().await;
101 let ret = self.run_inner().await;
102 self.listener.update(ClientState::Shutdown).get().await;
103 ret
104 }
105
106 async fn run_inner(&mut self) -> Shutdown {
107 loop {
108 if let Err(Shutdown) = self.client_loop.wait_for_enabled().await {
109 return Shutdown;
110 }
111
112 if let Err(StateChange::Shutdown) = self.try_connect_and_run().await {
113 return Shutdown;
114 }
115
116 if !self.client_loop.is_enabled() {
117 self.listener.update(ClientState::Disabled).get().await;
118 }
119 }
120 }
121
122 async fn connect(&mut self) -> Result<Result<TcpStream, std::io::Error>, StateChange> {
123 tokio::select! {
124 res = self.host.connect() => {
125 Ok(res)
126 }
127 res = self.client_loop.fail_requests() => {
128 Err(res)
129 }
130 }
131 }
132
133 async fn try_connect_and_run(&mut self) -> Result<(), StateChange> {
134 self.listener.update(ClientState::Connecting).get().await;
135 match self.connect().await? {
136 Err(err) => {
137 let delay = self.connect_retry.after_failed_connect();
138 tracing::warn!(
139 "failed to connect to {}: {} - waiting {} ms before next attempt",
140 self.host,
141 err,
142 delay.as_millis()
143 );
144 self.listener
145 .update(ClientState::WaitAfterFailedConnect(delay))
146 .get()
147 .await;
148 self.client_loop.fail_requests_for(delay).await
149 }
150 Ok(socket) => {
151 if let Ok(addr) = socket.peer_addr() {
152 tracing::info!("connected to: {}", addr);
153 }
154 if let Err(err) = socket.set_nodelay(true) {
155 tracing::warn!("unable to enable TCP_NODELAY: {}", err);
156 }
157 match self.connection_handler.handle(socket, &self.host).await {
158 Err(err) => {
159 let delay = self.connect_retry.after_failed_connect();
160 tracing::warn!(
161 "{} - waiting {} ms before next attempt",
162 err,
163 delay.as_millis()
164 );
165 self.listener
166 .update(ClientState::WaitAfterFailedConnect(delay))
167 .get()
168 .await;
169 self.client_loop.fail_requests_for(delay).await
170 }
171 Ok(mut phys) => {
172 self.listener.update(ClientState::Connected).get().await;
173 self.connect_retry.reset();
176 match self.client_loop.run(&mut phys).await {
178 SessionError::Shutdown => Err(StateChange::Shutdown),
180 SessionError::Disabled
182 | SessionError::IoError(_)
183 | SessionError::BadFrame => {
184 let delay = self.connect_retry.after_disconnect();
185 tracing::warn!("waiting {:?} to reconnect", delay);
186 self.listener
187 .update(ClientState::WaitAfterDisconnect(delay))
188 .get()
189 .await;
190 self.client_loop.fail_requests_for(delay).await
191 }
192 }
193 }
194 }
195 }
196 }
197 }
198}