Skip to main content

opcua_core/comms/
tcp_codec.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2024 Adam Lock
4
5//! The codec is an implementation of a tokio Encoder/Decoder which can be used to read
6//! data from the socket in terms of frames which in our case are any of the following:
7//!
8//! * HEL - Hello message
9//! * ACK - Acknowledge message
10//! * ERR - Error message
11//! * MSG - Message chunk
12//! * OPN - Open Secure Channel message
13//! * CLO - Close Secure Channel message
14use std::io;
15
16use bytes::{BufMut, BytesMut};
17use tokio_util::codec::{Decoder, Encoder};
18use tracing::error;
19
20use opcua_types::{
21    encoding::{DecodingOptions, SimpleBinaryDecodable, SimpleBinaryEncodable},
22    status_code::StatusCode,
23};
24
25use super::{
26    message_chunk::MessageChunk,
27    tcp_types::{
28        AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
29        ReverseHelloMessage, MESSAGE_HEADER_LEN,
30    },
31};
32
33#[derive(Debug)]
34/// Message type sent over OPC-UA streams.
35pub enum Message {
36    /// Hello message, the first part of a connection negotiation.
37    Hello(HelloMessage),
38    /// Acknowledge message, acceptance of negotiation.
39    Acknowledge(AcknowledgeMessage),
40    /// Error message, final fatal message describing reason for
41    /// why the channel will be closed.
42    Error(ErrorMessage),
43    /// Part of a general OPC-UA message.
44    Chunk(MessageChunk),
45    /// Reverse Hello message, sent by the server to the client.
46    ReverseHello(ReverseHelloMessage),
47}
48
49/// Implements a tokio codec that as close as possible, allows incoming data to be transformed into
50/// OPC UA message chunks with no intermediate buffers. Chunks are subsequently transformed into
51/// messages so there is still some buffers within message chunks, but not at the raw socket level.
52pub struct TcpCodec {
53    decoding_options: DecodingOptions,
54}
55
56impl Decoder for TcpCodec {
57    type Item = Message;
58    type Error = io::Error;
59
60    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
61        if buf.len() > MESSAGE_HEADER_LEN {
62            // Every OPC UA message has at least 8 bytes of header to be read to see what follows
63
64            // Get the message header
65            let message_header = {
66                let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
67                MessageHeader::decode(&mut buf, &self.decoding_options)?
68            };
69
70            // Once we have the header we can infer the message size required to read the rest of
71            // the message. The buffer needs to have at least that amount of bytes in it for the
72            // whole message to be extracted.
73            let message_size = message_header.message_size as usize;
74            if buf.len() >= message_size {
75                // Extract the message bytes from the buffer & decode them into a message
76                let mut buf = buf.split_to(message_size);
77                let message =
78                    Self::decode_message(message_header, &mut buf, &self.decoding_options)
79                        .map_err(|e| {
80                            error!("Codec got an error {} while decoding a message", e);
81                            io::Error::from(e)
82                        })?;
83                Ok(Some(message))
84            } else {
85                // Not enough bytes
86                Ok(None)
87            }
88        } else {
89            Ok(None)
90        }
91    }
92}
93
94impl Encoder<Message> for TcpCodec {
95    type Error = io::Error;
96
97    fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
98        match data {
99            Message::Hello(msg) => self.write(msg, buf),
100            Message::Acknowledge(msg) => self.write(msg, buf),
101            Message::Error(msg) => self.write(msg, buf),
102            Message::Chunk(msg) => self.write(msg, buf),
103            Message::ReverseHello(msg) => self.write(msg, buf),
104        }
105    }
106}
107
108impl TcpCodec {
109    /// Constructs a new TcpCodec. The abort flag is set to terminate the codec even while it is
110    /// waiting for a frame to arrive.
111    pub fn new(decoding_options: DecodingOptions) -> TcpCodec {
112        TcpCodec { decoding_options }
113    }
114
115    // Writes the encodable thing into the buffer.
116    fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
117    where
118        T: SimpleBinaryEncodable + std::fmt::Debug,
119    {
120        buf.reserve(msg.byte_len());
121        msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
122            error!("Error writing message {:?}, err = {}", msg, err);
123            io::Error::other(format!("Error = {err}"))
124        })
125    }
126
127    /// Reads a message out of the buffer, which is assumed by now to be the proper length
128    fn decode_message(
129        message_header: MessageHeader,
130        buf: &mut BytesMut,
131        decoding_options: &DecodingOptions,
132    ) -> Result<Message, StatusCode> {
133        let mut buf = io::Cursor::new(&buf[..]);
134        match message_header.message_type {
135            MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
136                &mut buf,
137                decoding_options,
138            )?)),
139            MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
140                &mut buf,
141                decoding_options,
142            )?)),
143            MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
144                &mut buf,
145                decoding_options,
146            )?)),
147            MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
148                &mut buf,
149                decoding_options,
150            )?)),
151            MessageType::ReverseHello => Ok(Message::ReverseHello(ReverseHelloMessage::decode(
152                &mut buf,
153                decoding_options,
154            )?)),
155            MessageType::Invalid => {
156                error!("Message type for chunk is invalid.");
157                Err(StatusCode::BadCommunicationError)
158            }
159        }
160    }
161}