doip_sockets/tcp/
tcp_split.rs

1use doip_codec::{DecodeError, DoipCodec};
2use doip_definitions::{header::DoipPayload, message::DoipMessage};
3use futures::{SinkExt, StreamExt};
4use tokio::{
5    io::{ReadHalf, WriteHalf},
6    net::TcpStream as TokioTcpStream,
7};
8use tokio_util::codec::{FramedRead, FramedWrite};
9
10use crate::error::SocketSendError;
11
12use super::{DoipTcpPayload, SocketConfig};
13
14/// Simple implementation of a TCP Stream Read Half
15///
16/// Allows for the passing of the read half being passed into a different thread
17/// seperate to the write half. Will be dropped if the Write Half is dropped.
18pub struct TcpStreamReadHalf {
19    io: FramedRead<ReadHalf<TokioTcpStream>, DoipCodec>,
20    #[allow(dead_code)]
21    config: SocketConfig,
22}
23
24impl TcpStreamReadHalf {
25    /// Creates a new TCP Stream Read Half from an existing Tokio TCP Stream and
26    /// config
27    pub fn new(
28        io: FramedRead<ReadHalf<TokioTcpStream>, DoipCodec>,
29        config: Option<SocketConfig>,
30    ) -> Self {
31        TcpStreamReadHalf {
32            io,
33            config: config.unwrap_or_default(),
34        }
35    }
36
37    /// Read from the stream
38    pub async fn read(&mut self) -> Option<Result<DoipMessage, DecodeError>> {
39        self.io.next().await
40    }
41}
42
43/// Simple implementation of a TCP Stream Write Half
44///
45/// Can be used to write messages to the sink. If dropped this will close the
46/// connection on the TcpStreamReadHalf.
47pub struct TcpStreamWriteHalf {
48    io: FramedWrite<WriteHalf<TokioTcpStream>, DoipCodec>,
49    config: SocketConfig,
50}
51
52impl TcpStreamWriteHalf {
53    /// Creates a new TCP Stream Read Half from an existing Tokio TCP Stream and
54    /// config
55    pub fn new(
56        io: FramedWrite<WriteHalf<TokioTcpStream>, DoipCodec>,
57        config: Option<SocketConfig>,
58    ) -> Self {
59        TcpStreamWriteHalf {
60            io,
61            config: config.unwrap_or_default(),
62        }
63    }
64
65    /// Send a message to the sink
66    pub async fn send<A: DoipTcpPayload + DoipPayload + 'static>(
67        &mut self,
68        payload: A,
69    ) -> Result<(), SocketSendError> {
70        let msg = DoipMessage::new(self.config.protocol_version, Box::new(payload));
71
72        match self.io.send(msg).await {
73            Ok(_) => Ok(()),
74            Err(err) => Err(SocketSendError::EncodeError(err)),
75        }
76    }
77}