opcua_core/comms/
tcp_codec.rs1use std::io;
15
16use bytes::{BufMut, BytesMut};
17use tokio_util::codec::{Decoder, Encoder};
18use tracing::error;
19
20use opcua_types::{
21 encoding::{DecodingOptions, SimpleBinaryDecodable, SimpleBinaryEncodable},
22 status_code::StatusCode,
23};
24
25use super::{
26 message_chunk::MessageChunk,
27 tcp_types::{
28 AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
29 ReverseHelloMessage, MESSAGE_HEADER_LEN,
30 },
31};
32
33#[derive(Debug)]
34pub enum Message {
36 Hello(HelloMessage),
38 Acknowledge(AcknowledgeMessage),
40 Error(ErrorMessage),
43 Chunk(MessageChunk),
45 ReverseHello(ReverseHelloMessage),
47}
48
49pub struct TcpCodec {
53 decoding_options: DecodingOptions,
54}
55
56impl Decoder for TcpCodec {
57 type Item = Message;
58 type Error = io::Error;
59
60 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
61 if buf.len() > MESSAGE_HEADER_LEN {
62 let message_header = {
66 let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
67 MessageHeader::decode(&mut buf, &self.decoding_options)?
68 };
69
70 let message_size = message_header.message_size as usize;
74 if buf.len() >= message_size {
75 let mut buf = buf.split_to(message_size);
77 let message =
78 Self::decode_message(message_header, &mut buf, &self.decoding_options)
79 .map_err(|e| {
80 error!("Codec got an error {} while decoding a message", e);
81 io::Error::from(e)
82 })?;
83 Ok(Some(message))
84 } else {
85 Ok(None)
87 }
88 } else {
89 Ok(None)
90 }
91 }
92}
93
94impl Encoder<Message> for TcpCodec {
95 type Error = io::Error;
96
97 fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
98 match data {
99 Message::Hello(msg) => self.write(msg, buf),
100 Message::Acknowledge(msg) => self.write(msg, buf),
101 Message::Error(msg) => self.write(msg, buf),
102 Message::Chunk(msg) => self.write(msg, buf),
103 Message::ReverseHello(msg) => self.write(msg, buf),
104 }
105 }
106}
107
108impl TcpCodec {
109 pub fn new(decoding_options: DecodingOptions) -> TcpCodec {
112 TcpCodec { decoding_options }
113 }
114
115 fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
117 where
118 T: SimpleBinaryEncodable + std::fmt::Debug,
119 {
120 buf.reserve(msg.byte_len());
121 msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
122 error!("Error writing message {:?}, err = {}", msg, err);
123 io::Error::other(format!("Error = {err}"))
124 })
125 }
126
127 fn decode_message(
129 message_header: MessageHeader,
130 buf: &mut BytesMut,
131 decoding_options: &DecodingOptions,
132 ) -> Result<Message, StatusCode> {
133 let mut buf = io::Cursor::new(&buf[..]);
134 match message_header.message_type {
135 MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
136 &mut buf,
137 decoding_options,
138 )?)),
139 MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
140 &mut buf,
141 decoding_options,
142 )?)),
143 MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
144 &mut buf,
145 decoding_options,
146 )?)),
147 MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
148 &mut buf,
149 decoding_options,
150 )?)),
151 MessageType::ReverseHello => Ok(Message::ReverseHello(ReverseHelloMessage::decode(
152 &mut buf,
153 decoding_options,
154 )?)),
155 MessageType::Invalid => {
156 error!("Message type for chunk is invalid.");
157 Err(StatusCode::BadCommunicationError)
158 }
159 }
160 }
161}