Skip to main content

opcua/core/comms/
tcp_codec.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! The codec is an implementation of a tokio Encoder/Decoder which can be used to read
6//! data from the socket in terms of frames which in our case are any of the following:
7//!
8//! * HEL - Hello message
9//! * ACK - Acknowledge message
10//! * ERR - Error message
11//! * MSG - Message chunk
12//! * OPN - Open Secure Channel message
13//! * CLO - Close Secure Channel message
14use std::io;
15
16use bytes::{BufMut, BytesMut};
17use tokio_util::codec::{Decoder, Encoder};
18
19use crate::types::{
20    encoding::{BinaryEncoder, DecodingOptions},
21    status_code::StatusCode,
22};
23
24use super::{
25    message_chunk::MessageChunk,
26    tcp_types::{
27        AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
28        MESSAGE_HEADER_LEN,
29    },
30};
31
32#[derive(Debug)]
33pub enum Message {
34    Hello(HelloMessage),
35    Acknowledge(AcknowledgeMessage),
36    Error(ErrorMessage),
37    Chunk(MessageChunk),
38}
39
40/// Implements a tokio codec that as close as possible, allows incoming data to be transformed into
41/// OPC UA message chunks with no intermediate buffers. Chunks are subsequently transformed into
42/// messages so there is still some buffers within message chunks, but not at the raw socket level.
43pub struct TcpCodec {
44    decoding_options: DecodingOptions,
45}
46
47impl Decoder for TcpCodec {
48    type Item = Message;
49    type Error = io::Error;
50
51    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
52        if buf.len() > MESSAGE_HEADER_LEN {
53            // Every OPC UA message has at least 8 bytes of header to be read to see what follows
54
55            // Get the message header
56            let message_header = {
57                let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
58                MessageHeader::decode(&mut buf, &self.decoding_options)?
59            };
60
61            // Once we have the header we can infer the message size required to read the rest of
62            // the message. The buffer needs to have at least that amount of bytes in it for the
63            // whole message to be extracted.
64            let message_size = message_header.message_size as usize;
65            if buf.len() >= message_size {
66                // Extract the message bytes from the buffer & decode them into a message
67                let mut buf = buf.split_to(message_size);
68                let message =
69                    Self::decode_message(message_header, &mut buf, &self.decoding_options)
70                        .map_err(|e| {
71                            error!("Codec got an error {} while decoding a message", e);
72                            io::Error::from(e)
73                        })?;
74                Ok(Some(message))
75            } else {
76                // Not enough bytes
77                Ok(None)
78            }
79        } else {
80            Ok(None)
81        }
82    }
83}
84
85impl Encoder<Message> for TcpCodec {
86    type Error = io::Error;
87
88    fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
89        match data {
90            Message::Hello(msg) => self.write(msg, buf),
91            Message::Acknowledge(msg) => self.write(msg, buf),
92            Message::Error(msg) => self.write(msg, buf),
93            Message::Chunk(msg) => self.write(msg, buf),
94        }
95    }
96}
97
98impl TcpCodec {
99    /// Constructs a new TcpCodec. The abort flag is set to terminate the codec even while it is
100    /// waiting for a frame to arrive.
101    pub fn new(decoding_options: DecodingOptions) -> TcpCodec {
102        TcpCodec { decoding_options }
103    }
104
105    // Writes the encodable thing into the buffer.
106    fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
107    where
108        T: BinaryEncoder<T> + std::fmt::Debug,
109    {
110        buf.reserve(msg.byte_len());
111        msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
112            error!("Error writing message {:?}, err = {}", msg, err);
113            io::Error::new(io::ErrorKind::Other, format!("Error = {}", err))
114        })
115    }
116
117    /// Reads a message out of the buffer, which is assumed by now to be the proper length
118    fn decode_message(
119        message_header: MessageHeader,
120        buf: &mut BytesMut,
121        decoding_options: &DecodingOptions,
122    ) -> Result<Message, StatusCode> {
123        let mut buf = io::Cursor::new(&buf[..]);
124        match message_header.message_type {
125            MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
126                &mut buf,
127                decoding_options,
128            )?)),
129            MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
130                &mut buf,
131                decoding_options,
132            )?)),
133            MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
134                &mut buf,
135                decoding_options,
136            )?)),
137            MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
138                &mut buf,
139                decoding_options,
140            )?)),
141            MessageType::Invalid => {
142                error!("Message type for chunk is invalid.");
143                Err(StatusCode::BadCommunicationError)
144            }
145        }
146    }
147}