opcua/core/comms/
message_writer.rs1use std::io::{Cursor, Write};
6
7use crate::types::{status_code::StatusCode, BinaryEncoder, EncodingResult};
8
9use super::{chunker::Chunker, secure_channel::SecureChannel, tcp_types::AcknowledgeMessage};
10
11use crate::core::supported_message::SupportedMessage;
12
13const DEFAULT_REQUEST_ID: u32 = 1000;
14const DEFAULT_SENT_SEQUENCE_NUMBER: u32 = 0;
15
16pub struct MessageWriter {
19 buffer: Cursor<Vec<u8>>,
21 last_request_id: u32,
23 last_sent_sequence_number: u32,
25 max_message_size: usize,
27 max_chunk_count: usize,
29}
30
31impl MessageWriter {
32 pub fn new(
33 buffer_size: usize,
34 max_message_size: usize,
35 max_chunk_count: usize,
36 ) -> MessageWriter {
37 MessageWriter {
38 buffer: Cursor::new(vec![0u8; buffer_size]),
39 last_request_id: DEFAULT_REQUEST_ID,
40 last_sent_sequence_number: DEFAULT_SENT_SEQUENCE_NUMBER,
41 max_message_size,
42 max_chunk_count,
43 }
44 }
45
46 pub fn write_ack(&mut self, ack: &AcknowledgeMessage) -> EncodingResult<usize> {
47 ack.encode(&mut self.buffer)
48 }
49
50 pub fn write(
53 &mut self,
54 request_id: u32,
55 message: SupportedMessage,
56 secure_channel: &SecureChannel,
57 ) -> Result<u32, StatusCode> {
58 trace!("Writing request to buffer");
59 let chunks = Chunker::encode(
61 self.last_sent_sequence_number + 1,
62 request_id,
63 self.max_message_size,
64 0,
65 secure_channel,
66 &message,
67 )?;
68
69 if self.max_chunk_count > 0 && chunks.len() > self.max_chunk_count {
70 error!(
71 "Cannot write message since {} chunks exceeds {} chunk limit",
72 chunks.len(),
73 self.max_chunk_count
74 );
75 Err(StatusCode::BadCommunicationError)
76 } else {
77 self.last_sent_sequence_number += chunks.len() as u32;
79
80 let data_buffer_size = self.buffer.get_ref().len() + 1024;
85 let mut data = vec![0u8; data_buffer_size];
86 for chunk in chunks {
87 trace!("Sending chunk {:?}", chunk);
88 let size = secure_channel.apply_security(&chunk, &mut data)?;
89 self.buffer.write(&data[..size]).map_err(|error| {
90 error!(
91 "Error while writing bytes to stream, connection broken, check error {:?}",
92 error
93 );
94 StatusCode::BadCommunicationError
95 })?;
96 }
97 trace!("Message written");
98 Ok(request_id)
99 }
100 }
101
102 pub fn next_request_id(&mut self) -> u32 {
103 self.last_request_id += 1;
104 self.last_request_id
105 }
106
107 fn clear(&mut self) {
109 self.buffer.set_position(0);
110 }
111
112 pub fn bytes_to_write(&mut self) -> Vec<u8> {
114 let pos = self.buffer.position() as usize;
115 let result = (self.buffer.get_ref())[0..pos].to_vec();
116 self.clear();
118 result
119 }
120}