knx_rust/
tunnel_connection.rs

1//runtime facing functions:
2
3// get data to be transmitted next
4// data was transmitted
5// get next time event
6// handle next time event
7// handle received data -> returns a cemi or none
8// send data with future<Result<>>
9
10
11
12use std::cmp::{min, PartialEq};
13use std::collections::VecDeque;
14use std::ops::Add;
15use std::time::{Duration, Instant};
16use strum_macros::FromRepr;
17use crate::cemi::apdu::Apdu;
18use crate::dpt::DPT;
19use crate::cemi::l_data::LData;
20use crate::cemi::Message;
21use crate::group_event::{GroupEvent, GroupEventType};
22use crate::knxnet;
23use crate::knxnet::connectionstate::ConnectionstateRequest;
24use crate::knxnet::{cri, KnxNetIpError, Service};
25use crate::knxnet::disconnect::{DisconnectRequest, DisconnectResponse};
26use crate::knxnet::hpai::{HPAI, Protocol};
27use crate::knxnet::status::StatusCode;
28use crate::knxnet::tunnel::TunnelAck;
29
30#[derive(Debug, Copy, Clone, PartialEq)]
31pub struct TunnelConnectionConfig {
32    resent_interval: Duration,
33    response_timeout: Duration,
34    heartbeat_response_timeout: Duration,
35    heartbeat_interval: Duration,
36}
37
38impl Default for TunnelConnectionConfig {
39    fn default() -> TunnelConnectionConfig {
40        TunnelConnectionConfig{
41            resent_interval: Duration::from_millis(1000),
42            heartbeat_interval: Duration::from_secs(60),
43            response_timeout: Duration::from_millis(1500),
44            heartbeat_response_timeout: Duration::from_secs(10),
45        }
46    }
47}
48
49#[derive(Debug, Clone, PartialEq, Default)]
50struct OutMessage {
51    data: Vec<u8>,
52    need_ack: bool,
53    retried: u8,
54}
55
56#[derive(FromRepr, Debug, Copy, Clone, PartialEq, Default)]
57enum TunnelConnectionState {
58    #[default]
59    Disconnected,
60    Connecting,
61    Connected,
62    Disconnecting,
63}
64
65#[derive(Debug)]
66pub struct TunnelConnection {
67    state: TunnelConnectionState,
68    awaiting_heartbeat_response: bool,
69    channel: u8,
70    host_info: HPAI,
71    outbound_seq: u8,
72    inbound_seq: u8,
73    out_queue: VecDeque<OutMessage>,
74    ack_queue: VecDeque<OutMessage>,
75    current_ack: Vec<u8>,
76    message_pending: bool,
77    next_resent: Instant,
78    next_timeout: Instant,
79    next_heartbeat: Instant,
80    config: TunnelConnectionConfig,
81}
82
83impl TunnelConnection {
84    /// Create an TunnelConnReq
85    pub fn new(ipv4: [u8;4], port: u16, config: TunnelConnectionConfig) -> TunnelConnection {
86        let host_info = HPAI::new(Protocol::Udp4Protocol, ipv4, port);
87
88
89        let mut con = TunnelConnection{
90            state: TunnelConnectionState::Disconnected,
91            awaiting_heartbeat_response: false,
92            channel: 0,
93            next_resent: Instant::now().add(config.resent_interval),
94            next_timeout: Instant::now().add(config.response_timeout),
95            next_heartbeat: Instant::now().add(config.heartbeat_interval),
96            config,
97            outbound_seq: 0,
98            inbound_seq: 0,
99            out_queue: VecDeque::from(vec![]),
100            ack_queue: VecDeque::from(vec![]),
101            host_info,
102            message_pending: true,
103            current_ack: vec![],
104        };
105        con.send_connect_request();
106        con
107    }
108
109    pub fn send<T: DPT+Default>(&mut self, ev: GroupEvent<T>) ->() {
110        let msg = match ev.event_type {
111            GroupEventType::GroupValueRead => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueRead, destination: ev.address , ..LData::<T>::default()}),
112            GroupEventType::GroupValueWrite => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueWrite(ev.data), destination: ev.address , ..LData::<T>::default()}),
113            GroupEventType::GroupValueResponse => Message::<T>::LDataReq(vec![], LData::<T>{data: Apdu::GroupValueResponse(ev.data), destination: ev.address , ..LData::<T>::default()}),
114        };
115        let req = Service::TunnelRequest(knxnet::tunnel::TunnelRequest{
116            channel: self.channel,
117            seq: self.outbound_seq,
118            data: msg,
119        });
120        self.outbound_seq += 1;
121        self.push_out_message(OutMessage{data: req.encoded(), need_ack: true, retried:0});
122    }
123
124    pub fn get_outbound_data(&mut self) -> Option<&[u8]> {
125        if !self.ack_queue.is_empty(){
126            self.current_ack = self.ack_queue.pop_front().unwrap().data;
127            return Some(&self.current_ack)
128        }
129        if self.message_pending && !self.out_queue.is_empty() {
130            self.message_pending = false;
131            self.next_resent = Instant::now().add(self.config.resent_interval);
132            self.out_queue[0].retried += 1;
133            //println!("Data {:?} to be send {}", &self.out_queue[0].data, self.out_queue[0].retried);
134            return Some(&self.out_queue[0].data)
135        }
136
137        return None
138    }
139
140    fn remove_first_message(&mut self){
141        self.out_queue.pop_front();
142        if !self.out_queue.is_empty(){
143            self.message_pending = true;
144            self.next_timeout = Instant::now().add(self.config.response_timeout);
145        } else {
146            // effectively disable resend and timeout by setting it bigger than heartbeat
147            self.next_resent = Instant::now().add(self.config.heartbeat_interval);
148            self.next_timeout = Instant::now().add(self.config.heartbeat_interval);
149        }
150    }
151
152    fn handle_outbount_send(&mut self){
153        self.remove_first_message();
154    }
155
156    pub fn get_next_time_event(&self) -> Instant{
157        return min(self.next_heartbeat, min(self.next_resent, self.next_timeout))
158    }
159
160    pub fn connected(&self) -> bool {return self.state == TunnelConnectionState::Connected}
161
162    pub fn handle_time_events(&mut self) -> () {
163        if self.next_timeout < Instant::now() && !self.out_queue.is_empty() {
164            match self.state {
165                TunnelConnectionState::Connecting => {
166                    // we might want to handle the connect Future here to return an error
167                    panic!("Failed to initialize tunnel connection")
168                }
169                TunnelConnectionState::Disconnected | TunnelConnectionState::Connected => {
170                    // outbound message timed out so skip sending it
171                    self.remove_first_message();
172                }
173                // in case we don't get a response for disconnection connection is probably already lost
174                TunnelConnectionState::Disconnecting => {
175                    self.remove_first_message();
176                    self.state = TunnelConnectionState::Disconnected;
177                    self.send_connect_request();
178                }
179            }
180        }
181        if self.next_heartbeat < Instant::now() && self.state == TunnelConnectionState::Connected{
182            if self.awaiting_heartbeat_response {
183                //we did not receive a heartbeat response for 2 periods (120s) so disconnect
184                self.send_disconnect_request();
185            } else {
186                self.send_connection_state_request();
187                self.awaiting_heartbeat_response = true;
188            }
189            self.next_heartbeat += self.config.heartbeat_interval;
190        }
191        if self.next_resent < Instant::now() && !self.out_queue.is_empty(){
192            // set message back to due to send
193            self.message_pending = true
194        }
195    }
196
197    pub fn handle_inbound_message(&mut self, data: &[u8]) -> Option<GroupEvent::<Vec<u8>>> {
198        let service = Service::<Vec<u8>>::decoded(data);
199        //println!("inbound {:?}", service);
200        let service = match service {
201            Ok(s) => s,
202            Err(e) => return None
203        };
204        return match service {
205            Service::DisconnectRequest(req) => {
206                if req.channel == self.channel {
207                    self.push_out_message(OutMessage{
208                        data: Service::<()>::DisconnectResponse(
209                            DisconnectResponse{
210                                channel: self.channel,
211                                status: StatusCode::NoError,
212                            }).encoded(),
213                        need_ack: false,
214                        retried: 0,
215                    });
216                    self.state = TunnelConnectionState::Disconnected;
217                    self.send_connect_request();
218                }
219                None
220            },
221            Service::DisconnectResponse(resp) => {
222                if resp.channel == self.channel && resp.status == StatusCode::NoError{
223                    self.handle_outbount_send();
224                    self.state = TunnelConnectionState::Disconnected;
225                    self.send_connect_request();
226                }
227                None
228            }
229            Service::ConnectResponse(connect) => {
230                if connect.status == StatusCode::NoError {
231                    self.inbound_seq = 0;
232                    self.outbound_seq = 0;
233                    self.channel = connect.channel;
234                    self.handle_outbount_send();
235                    self.state = TunnelConnectionState::Connected;
236                }
237                None
238            }
239            Service::ConnectionstateResponse(con_res) => {
240                if con_res.status == StatusCode::NoError {
241                    self.awaiting_heartbeat_response = false;
242                    self.handle_outbount_send()
243                } else {
244                    self.awaiting_heartbeat_response = false;
245                    self.handle_outbount_send();
246                    self.state = TunnelConnectionState::Disconnected;
247                    self.send_connect_request();
248                }
249                None
250            }
251            Service::TunnelAck(tack) => {
252                if tack.status == StatusCode::NoError {
253                    self.handle_outbount_send()
254                }
255                None
256            },
257            Service::TunnelRequest(treq) => {
258                //only messages with the expected seq or one less should be accepted (and thereby acked). See 03/08/04 Tunneling 2.6
259                if !(self.inbound_seq == treq.seq || self.inbound_seq == treq.seq.wrapping_add(1)) || self.channel != treq.channel{
260                    println!("Discarding due to not matching seq {}, channel {}", self.inbound_seq, self.channel);
261                    return None
262                }
263                self.push_out_message(OutMessage{
264                    data: Service::<()>::TunnelAck(
265                        TunnelAck{
266                            seq: treq.seq,
267                            channel: treq.channel,
268                            status: StatusCode::NoError,
269                        }).encoded(),
270                    need_ack: false,
271                    retried: 0,
272                });
273                //println!("Outqueue size {}, pending {}", self.out_queue.len(), self.message_pending);
274                self.inbound_seq = treq.seq.wrapping_add(1);
275
276                match treq.data {
277                    Message::LDataInd(i, d) => {
278                        match d.data {
279                            Apdu::GroupValueRead => {
280                                Some(GroupEvent::<Vec<u8>> {
281                                    data: vec![],
282                                    address: d.destination,
283                                    event_type: GroupEventType::GroupValueRead,
284                                })
285                            }
286                            Apdu::GroupValueResponse(data) => {
287                                Some(GroupEvent::<Vec<u8>> {
288                                    data,
289                                    address: d.destination,
290                                    event_type: GroupEventType::GroupValueResponse,
291                                })
292                            }
293                            Apdu::GroupValueWrite(data) => {
294                                Some(GroupEvent::<Vec<u8>> {
295                                    data,
296                                    address: d.destination,
297                                    event_type: GroupEventType::GroupValueWrite,
298                                })
299                            }
300                            _ => None
301                        }
302                    }
303                    _ => None
304                }
305            },
306            _ => None,
307        }
308
309    }
310
311    fn push_out_message(&mut self, msg: OutMessage)  {
312        if !msg.need_ack {
313            return self.ack_queue.push_back(msg)
314        }
315        if self.out_queue.is_empty() {
316            self.message_pending = true;
317            self.next_timeout = Instant::now().add(self.config.response_timeout)
318        }
319
320        // take ack messages first
321        self.out_queue.push_back(msg)
322    }
323
324    fn send_connection_state_request(&mut self){
325        let req: Service<()> = Service::ConnectionstateRequest(ConnectionstateRequest{
326            channel: self.channel,
327            control: self.host_info
328        });
329
330        self.push_out_message(OutMessage {
331            data: req.encoded(),
332            need_ack: true,
333            retried: 0,
334        });
335    }
336
337    fn send_connect_request(&mut self){
338        let tunnel_request: Service<()> = Service::ConnectRequest(crate::knxnet::connect::ConnectRequest{
339            data: self.host_info,
340            control: self.host_info,
341            connection_type: cri::ConnectionReqType::TunnelConnection {layer: cri::TunnelingLayer::TunnelLinkLayer}
342        });
343
344
345        let buf = tunnel_request.encoded();
346        self.out_queue.clear();
347        self.ack_queue.clear();
348        self.inbound_seq = 0;
349        self.outbound_seq = 0;
350        self.state = TunnelConnectionState::Connecting;
351        self.push_out_message(OutMessage{
352            data: buf,
353            need_ack: true,
354            retried: 0,
355        });
356    }
357
358    fn send_disconnect_request(&mut self){
359        let disconnect_request: Service<()> = Service::DisconnectRequest(crate::knxnet::disconnect::DisconnectRequest{
360            channel: self.channel,
361            control: self.host_info,
362        });
363
364
365        let buf = disconnect_request.encoded();
366        self.state = TunnelConnectionState::Disconnecting;
367        self.push_out_message(OutMessage{
368            data: buf,
369            need_ack: true,
370            retried: 0,
371        });
372    }
373}