opcua_core/comms/
tcp_codec.rs1use std::io;
15use std::sync::{Arc, RwLock};
16
17use bytes::{BufMut, BytesMut};
18use tokio_util::codec::{Decoder, Encoder};
19
20use opcua_types::{
21 encoding::{BinaryEncoder, DecodingOptions},
22 status_code::StatusCode,
23};
24
25use crate::comms::{
26 message_chunk::MessageChunk,
27 tcp_types::{
28 AcknowledgeMessage, ErrorMessage, HelloMessage, MessageHeader, MessageType,
29 MESSAGE_HEADER_LEN,
30 },
31};
32
33#[derive(Debug)]
34pub enum Message {
35 Hello(HelloMessage),
36 Acknowledge(AcknowledgeMessage),
37 Error(ErrorMessage),
38 Chunk(MessageChunk),
39}
40
41pub struct TcpCodec {
45 decoding_options: DecodingOptions,
46 abort: Arc<RwLock<bool>>,
47}
48
49impl Decoder for TcpCodec {
50 type Item = Message;
51 type Error = io::Error;
52
53 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
54 if self.is_abort() {
55 debug!("TcpCodec decode abort flag has been set and is terminating");
57 Err(io::Error::from(StatusCode::BadOperationAbandoned))
58 } else if buf.len() > MESSAGE_HEADER_LEN {
59 let message_header = {
63 let mut buf = io::Cursor::new(&buf[0..MESSAGE_HEADER_LEN]);
64 MessageHeader::decode(&mut buf, &self.decoding_options)?
65 };
66
67 let message_size = message_header.message_size as usize;
71 if buf.len() >= message_size {
72 let mut buf = buf.split_to(message_size);
74 let message =
75 Self::decode_message(message_header, &mut buf, &self.decoding_options)
76 .map_err(|e| {
77 error!("Codec got an error {} while decoding a message", e);
78 io::Error::from(e)
79 })?;
80 Ok(Some(message))
81 } else {
82 Ok(None)
84 }
85 } else {
86 Ok(None)
87 }
88 }
89}
90
91impl Encoder<Message> for TcpCodec {
92 type Error = io::Error;
93
94 fn encode(&mut self, data: Message, buf: &mut BytesMut) -> Result<(), io::Error> {
95 match data {
96 Message::Hello(msg) => self.write(msg, buf),
97 Message::Acknowledge(msg) => self.write(msg, buf),
98 Message::Error(msg) => self.write(msg, buf),
99 Message::Chunk(msg) => self.write(msg, buf),
100 }
101 }
102}
103
104impl TcpCodec {
105 pub fn new(abort: Arc<RwLock<bool>>, decoding_options: DecodingOptions) -> TcpCodec {
108 TcpCodec {
109 abort,
110 decoding_options,
111 }
112 }
113
114 fn write<T>(&self, msg: T, buf: &mut BytesMut) -> Result<(), io::Error>
116 where
117 T: BinaryEncoder<T> + std::fmt::Debug,
118 {
119 buf.reserve(msg.byte_len());
120 msg.encode(&mut buf.writer()).map(|_| ()).map_err(|err| {
121 error!("Error writing message {:?}, err = {}", msg, err);
122 io::Error::new(io::ErrorKind::Other, format!("Error = {}", err))
123 })
124 }
125
126 fn is_abort(&self) -> bool {
127 let abort = self.abort.read().unwrap();
128 *abort
129 }
130
131 fn decode_message(
133 message_header: MessageHeader,
134 buf: &mut BytesMut,
135 decoding_options: &DecodingOptions,
136 ) -> Result<Message, StatusCode> {
137 let mut buf = io::Cursor::new(&buf[..]);
138 match message_header.message_type {
139 MessageType::Acknowledge => Ok(Message::Acknowledge(AcknowledgeMessage::decode(
140 &mut buf,
141 decoding_options,
142 )?)),
143 MessageType::Hello => Ok(Message::Hello(HelloMessage::decode(
144 &mut buf,
145 decoding_options,
146 )?)),
147 MessageType::Error => Ok(Message::Error(ErrorMessage::decode(
148 &mut buf,
149 decoding_options,
150 )?)),
151 MessageType::Chunk => Ok(Message::Chunk(MessageChunk::decode(
152 &mut buf,
153 decoding_options,
154 )?)),
155 MessageType::Invalid => {
156 error!("Message type for chunk is invalid.");
157 Err(StatusCode::BadCommunicationError)
158 }
159 }
160 }
161}