Skip to main content

opcua/core/comms/
message_writer.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5use std::io::{Cursor, Write};
6
7use crate::types::{status_code::StatusCode, BinaryEncoder, EncodingResult};
8
9use super::{chunker::Chunker, secure_channel::SecureChannel, tcp_types::AcknowledgeMessage};
10
11use crate::core::supported_message::SupportedMessage;
12
13const DEFAULT_REQUEST_ID: u32 = 1000;
14const DEFAULT_SENT_SEQUENCE_NUMBER: u32 = 0;
15
16/// SocketWriter is a wrapper around the writable half of a tokio stream and a buffer which
17/// will be dumped into that stream.
18pub struct MessageWriter {
19    /// The send buffer
20    buffer: Cursor<Vec<u8>>,
21    /// The last request id
22    last_request_id: u32,
23    /// Last sent sequence number
24    last_sent_sequence_number: u32,
25    /// Maximum size of a message, total. Use 0 for no limit
26    max_message_size: usize,
27    /// Maximum size of a chunk. Use 0 for no limit
28    max_chunk_count: usize,
29}
30
31impl MessageWriter {
32    pub fn new(
33        buffer_size: usize,
34        max_message_size: usize,
35        max_chunk_count: usize,
36    ) -> MessageWriter {
37        MessageWriter {
38            buffer: Cursor::new(vec![0u8; buffer_size]),
39            last_request_id: DEFAULT_REQUEST_ID,
40            last_sent_sequence_number: DEFAULT_SENT_SEQUENCE_NUMBER,
41            max_message_size,
42            max_chunk_count,
43        }
44    }
45
46    pub fn write_ack(&mut self, ack: &AcknowledgeMessage) -> EncodingResult<usize> {
47        ack.encode(&mut self.buffer)
48    }
49
50    /// Encodes the message into a series of chunks, encrypts those chunks and writes the
51    /// result into the buffer ready to be sent.
52    pub fn write(
53        &mut self,
54        request_id: u32,
55        message: SupportedMessage,
56        secure_channel: &SecureChannel,
57    ) -> Result<u32, StatusCode> {
58        trace!("Writing request to buffer");
59        // Turn message to chunk(s)
60        let chunks = Chunker::encode(
61            self.last_sent_sequence_number + 1,
62            request_id,
63            self.max_message_size,
64            0,
65            secure_channel,
66            &message,
67        )?;
68
69        if self.max_chunk_count > 0 && chunks.len() > self.max_chunk_count {
70            error!(
71                "Cannot write message since {} chunks exceeds {} chunk limit",
72                chunks.len(),
73                self.max_chunk_count
74            );
75            Err(StatusCode::BadCommunicationError)
76        } else {
77            // Sequence number monotonically increases per chunk
78            self.last_sent_sequence_number += chunks.len() as u32;
79
80            // Send chunks
81
82            // This max chunk size allows the message to be encoded to a chunk with header + encoding
83            // which is just slightly larger in size (up to 1024 bytes).
84            let data_buffer_size = self.buffer.get_ref().len() + 1024;
85            let mut data = vec![0u8; data_buffer_size];
86            for chunk in chunks {
87                trace!("Sending chunk {:?}", chunk);
88                let size = secure_channel.apply_security(&chunk, &mut data)?;
89                self.buffer.write(&data[..size]).map_err(|error| {
90                    error!(
91                        "Error while writing bytes to stream, connection broken, check error {:?}",
92                        error
93                    );
94                    StatusCode::BadCommunicationError
95                })?;
96            }
97            trace!("Message written");
98            Ok(request_id)
99        }
100    }
101
102    pub fn next_request_id(&mut self) -> u32 {
103        self.last_request_id += 1;
104        self.last_request_id
105    }
106
107    /// Clears the buffer
108    fn clear(&mut self) {
109        self.buffer.set_position(0);
110    }
111
112    /// Yields any results to write, resetting the buffer back afterwards
113    pub fn bytes_to_write(&mut self) -> Vec<u8> {
114        let pos = self.buffer.position() as usize;
115        let result = (self.buffer.get_ref())[0..pos].to_vec();
116        // Buffer MUST be cleared here, otherwise races are possible
117        self.clear();
118        result
119    }
120}