opcua_core/comms/
tcp_codec.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! The codec is an implementation of a tokio Encoder/Decoder which can be used to read
6//! data from the socket in terms of frames which in our case are any of the following:
7//!
8//! * HEL - Hello message
9//! * ACK - Acknowledge message
10//! * ERR - Error message
11//! * MSG - Message chunk
12//! * OPN - Open Secure Channel message
13//! * CLO - Close Secure Channel message
14use std::io;
15use std::sync::{Arc, RwLock};
16
17use bytes::{BufMut, BytesMut};
18use tokio_util::codec::{Decoder, Encoder};
19
20use opcua_types::{
21    encoding::{BinaryEncoder, DecodingOptions},
22    status_code::StatusCode,
23};
24
25use crate::comms::{
26    message_chunk::MessageChunk,
27    tcp_types::{
28        AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
29        MESSAGE_HEADER_LEN,
30    },
31};
32
33#[derive(Debug)]
34pub enum Message {
35    Hello(HelloMessage),
36    Acknowledge(AcknowledgeMessage),
37    Error(ErrorMessage),
38    Chunk(MessageChunk),
39}
40
41/// Implements a tokio codec that as close as possible, allows incoming data to be transformed into
42/// OPC UA message chunks with no intermediate buffers. Chunks are subsequently transformed into
43/// messages so there is still some buffers within message chunks, but not at the raw socket level.
44pub struct TcpCodec {
45    decoding_options: DecodingOptions,
46    abort: Arc<RwLock<bool>>,
47}
48
49impl Decoder for TcpCodec {
50    type Item = Message;
51    type Error = io::Error;
52
53    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
54        if self.is_abort() {
55            // Abort immediately
56            debug!("TcpCodec decode abort flag has been set and is terminating");
57            Err(io::Error::from(StatusCode::BadOperationAbandoned))
58        } else if buf.len() > MESSAGE_HEADER_LEN {
59            // Every OPC UA message has at least 8 bytes of header to be read to see what follows
60
61            // Get the message header
62            let message_header = {
63                let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
64                MessageHeader::decode(&mut buf, &self.decoding_options)?
65            };
66
67            // Once we have the header we can infer the message size required to read the rest of
68            // the message. The buffer needs to have at least that amount of bytes in it for the
69            // whole message to be extracted.
70            let message_size = message_header.message_size as usize;
71            if buf.len() >= message_size {
72                // Extract the message bytes from the buffer & decode them into a message
73                let mut buf = buf.split_to(message_size);
74                let message =
75                    Self::decode_message(message_header, &mut buf, &self.decoding_options)
76                        .map_err(|e| {
77                            error!("Codec got an error {} while decoding a message", e);
78                            io::Error::from(e)
79                        })?;
80                Ok(Some(message))
81            } else {
82                // Not enough bytes
83                Ok(None)
84            }
85        } else {
86            Ok(None)
87        }
88    }
89}
90
91impl Encoder<Message> for TcpCodec {
92    type Error = io::Error;
93
94    fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
95        match data {
96            Message::Hello(msg) => self.write(msg, buf),
97            Message::Acknowledge(msg) => self.write(msg, buf),
98            Message::Error(msg) => self.write(msg, buf),
99            Message::Chunk(msg) => self.write(msg, buf),
100        }
101    }
102}
103
104impl TcpCodec {
105    /// Constructs a new TcpCodec. The abort flag is set to terminate the codec even while it is
106    /// waiting for a frame to arrive.
107    pub fn new(abort: Arc<RwLock<bool>>, decoding_options: DecodingOptions) -> TcpCodec {
108        TcpCodec {
109            abort,
110            decoding_options,
111        }
112    }
113
114    // Writes the encodable thing into the buffer.
115    fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
116    where
117        T: BinaryEncoder<T> + std::fmt::Debug,
118    {
119        buf.reserve(msg.byte_len());
120        msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
121            error!("Error writing message {:?}, err = {}", msg, err);
122            io::Error::new(io::ErrorKind::Other, format!("Error = {}", err))
123        })
124    }
125
126    fn is_abort(&self) -> bool {
127        let abort = self.abort.read().unwrap();
128        *abort
129    }
130
131    /// Reads a message out of the buffer, which is assumed by now to be the proper length
132    fn decode_message(
133        message_header: MessageHeader,
134        buf: &mut BytesMut,
135        decoding_options: &DecodingOptions,
136    ) -> Result<Message, StatusCode> {
137        let mut buf = io::Cursor::new(&buf[..]);
138        match message_header.message_type {
139            MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
140                &mut buf,
141                decoding_options,
142            )?)),
143            MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
144                &mut buf,
145                decoding_options,
146            )?)),
147            MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
148                &mut buf,
149                decoding_options,
150            )?)),
151            MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
152                &mut buf,
153                decoding_options,
154            )?)),
155            MessageType::Invalid => {
156                error!("Message type for chunk is invalid.");
157                Err(StatusCode::BadCommunicationError)
158            }
159        }
160    }
161}