opcua/core/comms/
tcp_codec.rs1use std::io;
15
16use bytes::{BufMut, BytesMut};
17use tokio_util::codec::{Decoder, Encoder};
18
19use crate::types::{
20 encoding::{BinaryEncoder, DecodingOptions},
21 status_code::StatusCode,
22};
23
24use super::{
25 message_chunk::MessageChunk,
26 tcp_types::{
27 AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
28 MESSAGE_HEADER_LEN,
29 },
30};
31
32#[derive(Debug)]
33pub enum Message {
34 Hello(HelloMessage),
35 Acknowledge(AcknowledgeMessage),
36 Error(ErrorMessage),
37 Chunk(MessageChunk),
38}
39
40pub struct TcpCodec {
44 decoding_options: DecodingOptions,
45}
46
47impl Decoder for TcpCodec {
48 type Item = Message;
49 type Error = io::Error;
50
51 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
52 if buf.len() > MESSAGE_HEADER_LEN {
53 let message_header = {
57 let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
58 MessageHeader::decode(&mut buf, &self.decoding_options)?
59 };
60
61 let message_size = message_header.message_size as usize;
65 if buf.len() >= message_size {
66 let mut buf = buf.split_to(message_size);
68 let message =
69 Self::decode_message(message_header, &mut buf, &self.decoding_options)
70 .map_err(|e| {
71 error!("Codec got an error {} while decoding a message", e);
72 io::Error::from(e)
73 })?;
74 Ok(Some(message))
75 } else {
76 Ok(None)
78 }
79 } else {
80 Ok(None)
81 }
82 }
83}
84
85impl Encoder<Message> for TcpCodec {
86 type Error = io::Error;
87
88 fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
89 match data {
90 Message::Hello(msg) => self.write(msg, buf),
91 Message::Acknowledge(msg) => self.write(msg, buf),
92 Message::Error(msg) => self.write(msg, buf),
93 Message::Chunk(msg) => self.write(msg, buf),
94 }
95 }
96}
97
98impl TcpCodec {
99 pub fn new(decoding_options: DecodingOptions) -> TcpCodec {
102 TcpCodec { decoding_options }
103 }
104
105 fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
107 where
108 T: BinaryEncoder<T> + std::fmt::Debug,
109 {
110 buf.reserve(msg.byte_len());
111 msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
112 error!("Error writing message {:?}, err = {}", msg, err);
113 io::Error::new(io::ErrorKind::Other, format!("Error = {}", err))
114 })
115 }
116
117 fn decode_message(
119 message_header: MessageHeader,
120 buf: &mut BytesMut,
121 decoding_options: &DecodingOptions,
122 ) -> Result<Message, StatusCode> {
123 let mut buf = io::Cursor::new(&buf[..]);
124 match message_header.message_type {
125 MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
126 &mut buf,
127 decoding_options,
128 )?)),
129 MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
130 &mut buf,
131 decoding_options,
132 )?)),
133 MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
134 &mut buf,
135 decoding_options,
136 )?)),
137 MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
138 &mut buf,
139 decoding_options,
140 )?)),
141 MessageType::Invalid => {
142 error!("Message type for chunk is invalid.");
143 Err(StatusCode::BadCommunicationError)
144 }
145 }
146 }
147}