1use std::cmp::{min, PartialEq};
13use std::collections::VecDeque;
14use std::ops::Add;
15use std::time::{Duration, Instant};
16use strum_macros::FromRepr;
17use crate::cemi::apdu::Apdu;
18use crate::dpt::DPT;
19use crate::cemi::l_data::LData;
20use crate::cemi::Message;
21use crate::group_event::{GroupEvent, GroupEventType};
22use crate::knxnet;
23use crate::knxnet::connectionstate::ConnectionstateRequest;
24use crate::knxnet::{cri, KnxNetIpError, Service};
25use crate::knxnet::disconnect::{DisconnectRequest, DisconnectResponse};
26use crate::knxnet::hpai::{HPAI, Protocol};
27use crate::knxnet::status::StatusCode;
28use crate::knxnet::tunnel::TunnelAck;
29
30#[derive(Debug, Copy, Clone, PartialEq)]
31pub struct TunnelConnectionConfig {
32 resent_interval: Duration,
33 response_timeout: Duration,
34 heartbeat_response_timeout: Duration,
35 heartbeat_interval: Duration,
36}
37
38impl Default for TunnelConnectionConfig {
39 fn default() -> TunnelConnectionConfig {
40 TunnelConnectionConfig{
41 resent_interval: Duration::from_millis(1000),
42 heartbeat_interval: Duration::from_secs(60),
43 response_timeout: Duration::from_millis(1500),
44 heartbeat_response_timeout: Duration::from_secs(10),
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Default)]
50struct OutMessage {
51 data: Vec<u8>,
52 need_ack: bool,
53 retried: u8,
54}
55
56#[derive(FromRepr, Debug, Copy, Clone, PartialEq, Default)]
57enum TunnelConnectionState {
58 #[default]
59 Disconnected,
60 Connecting,
61 Connected,
62 Disconnecting,
63}
64
65#[derive(Debug)]
66pub struct TunnelConnection {
67 state: TunnelConnectionState,
68 awaiting_heartbeat_response: bool,
69 channel: u8,
70 host_info: HPAI,
71 outbound_seq: u8,
72 inbound_seq: u8,
73 out_queue: VecDeque<OutMessage>,
74 ack_queue: VecDeque<OutMessage>,
75 current_ack: Vec<u8>,
76 message_pending: bool,
77 next_resent: Instant,
78 next_timeout: Instant,
79 next_heartbeat: Instant,
80 config: TunnelConnectionConfig,
81}
82
83impl TunnelConnection {
84 pub fn new(ipv4: [u8;4], port: u16, config: TunnelConnectionConfig) -> TunnelConnection {
86 let host_info = HPAI::new(Protocol::Udp4Protocol, ipv4, port);
87
88
89 let mut con = TunnelConnection{
90 state: TunnelConnectionState::Disconnected,
91 awaiting_heartbeat_response: false,
92 channel: 0,
93 next_resent: Instant::now().add(config.resent_interval),
94 next_timeout: Instant::now().add(config.response_timeout),
95 next_heartbeat: Instant::now().add(config.heartbeat_interval),
96 config,
97 outbound_seq: 0,
98 inbound_seq: 0,
99 out_queue: VecDeque::from(vec![]),
100 ack_queue: VecDeque::from(vec![]),
101 host_info,
102 message_pending: true,
103 current_ack: vec![],
104 };
105 con.send_connect_request();
106 con
107 }
108
109 pub fn send<T: DPT+Default>(&mut self, ev: GroupEvent<T>) ->() {
110 let msg = match ev.event_type {
111 GroupEventType::GroupValueRead => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueRead, destination: ev.address , ..LData::<T>::default()}),
112 GroupEventType::GroupValueWrite => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueWrite(ev.data), destination: ev.address , ..LData::<T>::default()}),
113 GroupEventType::GroupValueResponse => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueResponse(ev.data), destination: ev.address , ..LData::<T>::default()}),
114 };
115 let req = Service::TunnelRequest(knxnet::tunnel::TunnelRequest{
116 channel: self.channel,
117 seq: self.outbound_seq,
118 data: msg,
119 });
120 self.outbound_seq += 1;
121 self.push_out_message(OutMessage{data: req.encoded(), need_ack: true, retried:0});
122 }
123
124 pub fn get_outbound_data(&mut self) -> Option<&[u8]> {
125 if !self.ack_queue.is_empty(){
126 self.current_ack = self.ack_queue.pop_front().unwrap().data;
127 return Some(&self.current_ack)
128 }
129 if self.message_pending && !self.out_queue.is_empty() {
130 self.message_pending = false;
131 self.next_resent = Instant::now().add(self.config.resent_interval);
132 self.out_queue[0].retried += 1;
133 return Some(&self.out_queue[0].data)
135 }
136
137 return None
138 }
139
140 fn remove_first_message(&mut self){
141 self.out_queue.pop_front();
142 if !self.out_queue.is_empty(){
143 self.message_pending = true;
144 self.next_timeout = Instant::now().add(self.config.response_timeout);
145 } else {
146 self.next_resent = Instant::now().add(self.config.heartbeat_interval);
148 self.next_timeout = Instant::now().add(self.config.heartbeat_interval);
149 }
150 }
151
152 fn handle_outbount_send(&mut self){
153 self.remove_first_message();
154 }
155
156 pub fn get_next_time_event(&self) -> Instant{
157 return min(self.next_heartbeat, min(self.next_resent, self.next_timeout))
158 }
159
160 pub fn connected(&self) -> bool {return self.state == TunnelConnectionState::Connected}
161
162 pub fn handle_time_events(&mut self) -> () {
163 if self.next_timeout < Instant::now() && !self.out_queue.is_empty() {
164 match self.state {
165 TunnelConnectionState::Connecting => {
166 panic!("Failed to initialize tunnel connection")
168 }
169 TunnelConnectionState::Disconnected | TunnelConnectionState::Connected => {
170 self.remove_first_message();
172 }
173 TunnelConnectionState::Disconnecting => {
175 self.remove_first_message();
176 self.state = TunnelConnectionState::Disconnected;
177 self.send_connect_request();
178 }
179 }
180 }
181 if self.next_heartbeat < Instant::now() && self.state == TunnelConnectionState::Connected{
182 if self.awaiting_heartbeat_response {
183 self.send_disconnect_request();
185 } else {
186 self.send_connection_state_request();
187 self.awaiting_heartbeat_response = true;
188 }
189 self.next_heartbeat += self.config.heartbeat_interval;
190 }
191 if self.next_resent < Instant::now() && !self.out_queue.is_empty(){
192 self.message_pending = true
194 }
195 }
196
197 pub fn handle_inbound_message(&mut self, data: &[u8]) -> Option<GroupEvent::<Vec<u8>>> {
198 let service = Service::<Vec<u8>>::decoded(data);
199 let service = match service {
201 Ok(s) => s,
202 Err(e) => return None
203 };
204 return match service {
205 Service::DisconnectRequest(req) => {
206 if req.channel == self.channel {
207 self.push_out_message(OutMessage{
208 data: Service::<()>::DisconnectResponse(
209 DisconnectResponse{
210 channel: self.channel,
211 status: StatusCode::NoError,
212 }).encoded(),
213 need_ack: false,
214 retried: 0,
215 });
216 self.state = TunnelConnectionState::Disconnected;
217 self.send_connect_request();
218 }
219 None
220 },
221 Service::DisconnectResponse(resp) => {
222 if resp.channel == self.channel && resp.status == StatusCode::NoError{
223 self.handle_outbount_send();
224 self.state = TunnelConnectionState::Disconnected;
225 self.send_connect_request();
226 }
227 None
228 }
229 Service::ConnectResponse(connect) => {
230 if connect.status == StatusCode::NoError {
231 self.inbound_seq = 0;
232 self.outbound_seq = 0;
233 self.channel = connect.channel;
234 self.handle_outbount_send();
235 self.state = TunnelConnectionState::Connected;
236 }
237 None
238 }
239 Service::ConnectionstateResponse(con_res) => {
240 if con_res.status == StatusCode::NoError {
241 self.awaiting_heartbeat_response = false;
242 self.handle_outbount_send()
243 } else {
244 self.awaiting_heartbeat_response = false;
245 self.handle_outbount_send();
246 self.state = TunnelConnectionState::Disconnected;
247 self.send_connect_request();
248 }
249 None
250 }
251 Service::TunnelAck(tack) => {
252 if tack.status == StatusCode::NoError {
253 self.handle_outbount_send()
254 }
255 None
256 },
257 Service::TunnelRequest(treq) => {
258 if !(self.inbound_seq == treq.seq || self.inbound_seq == treq.seq.wrapping_add(1)) || self.channel != treq.channel{
260 println!("Discarding due to not matching seq {}, channel {}", self.inbound_seq, self.channel);
261 return None
262 }
263 self.push_out_message(OutMessage{
264 data: Service::<()>::TunnelAck(
265 TunnelAck{
266 seq: treq.seq,
267 channel: treq.channel,
268 status: StatusCode::NoError,
269 }).encoded(),
270 need_ack: false,
271 retried: 0,
272 });
273 self.inbound_seq = treq.seq.wrapping_add(1);
275
276 match treq.data {
277 Message::LDataInd(i, d) => {
278 match d.data {
279 Apdu::GroupValueRead => {
280 Some(GroupEvent::<Vec<u8>> {
281 data: vec![],
282 address: d.destination,
283 event_type: GroupEventType::GroupValueRead,
284 })
285 }
286 Apdu::GroupValueResponse(data) => {
287 Some(GroupEvent::<Vec<u8>> {
288 data,
289 address: d.destination,
290 event_type: GroupEventType::GroupValueResponse,
291 })
292 }
293 Apdu::GroupValueWrite(data) => {
294 Some(GroupEvent::<Vec<u8>> {
295 data,
296 address: d.destination,
297 event_type: GroupEventType::GroupValueWrite,
298 })
299 }
300 _ => None
301 }
302 }
303 _ => None
304 }
305 },
306 _ => None,
307 }
308
309 }
310
311 fn push_out_message(&mut self, msg: OutMessage) {
312 if !msg.need_ack {
313 return self.ack_queue.push_back(msg)
314 }
315 if self.out_queue.is_empty() {
316 self.message_pending = true;
317 self.next_timeout = Instant::now().add(self.config.response_timeout)
318 }
319
320 self.out_queue.push_back(msg)
322 }
323
324 fn send_connection_state_request(&mut self){
325 let req: Service<()> = Service::ConnectionstateRequest(ConnectionstateRequest{
326 channel: self.channel,
327 control: self.host_info
328 });
329
330 self.push_out_message(OutMessage {
331 data: req.encoded(),
332 need_ack: true,
333 retried: 0,
334 });
335 }
336
337 fn send_connect_request(&mut self){
338 let tunnel_request: Service<()> = Service::ConnectRequest(crate::knxnet::connect::ConnectRequest{
339 data: self.host_info,
340 control: self.host_info,
341 connection_type: cri::ConnectionReqType::TunnelConnection {layer: cri::TunnelingLayer::TunnelLinkLayer}
342 });
343
344
345 let buf = tunnel_request.encoded();
346 self.out_queue.clear();
347 self.ack_queue.clear();
348 self.inbound_seq = 0;
349 self.outbound_seq = 0;
350 self.state = TunnelConnectionState::Connecting;
351 self.push_out_message(OutMessage{
352 data: buf,
353 need_ack: true,
354 retried: 0,
355 });
356 }
357
358 fn send_disconnect_request(&mut self){
359 let disconnect_request: Service<()> = Service::DisconnectRequest(crate::knxnet::disconnect::DisconnectRequest{
360 channel: self.channel,
361 control: self.host_info,
362 });
363
364
365 let buf = disconnect_request.encoded();
366 self.state = TunnelConnectionState::Disconnecting;
367 self.push_out_message(OutMessage{
368 data: buf,
369 need_ack: true,
370 retried: 0,
371 });
372 }
373}