doip_sockets/tcp/
tcp_split.rs1use doip_codec::{DecodeError, DoipCodec};
2use doip_definitions::{header::DoipPayload, message::DoipMessage};
3use futures::{SinkExt, StreamExt};
4use tokio::{
5 io::{ReadHalf, WriteHalf},
6 net::TcpStream as TokioTcpStream,
7};
8use tokio_util::codec::{FramedRead, FramedWrite};
9
10use crate::error::SocketSendError;
11
12use super::{DoipTcpPayload, SocketConfig};
13
14pub struct TcpStreamReadHalf {
19 io: FramedRead<ReadHalf<TokioTcpStream>, DoipCodec>,
20 #[allow(dead_code)]
21 config: SocketConfig,
22}
23
24impl TcpStreamReadHalf {
25 pub fn new(
28 io: FramedRead<ReadHalf<TokioTcpStream>, DoipCodec>,
29 config: Option<SocketConfig>,
30 ) -> Self {
31 TcpStreamReadHalf {
32 io,
33 config: config.unwrap_or_default(),
34 }
35 }
36
37 pub async fn read(&mut self) -> Option<Result<DoipMessage, DecodeError>> {
39 self.io.next().await
40 }
41}
42
43pub struct TcpStreamWriteHalf {
48 io: FramedWrite<WriteHalf<TokioTcpStream>, DoipCodec>,
49 config: SocketConfig,
50}
51
52impl TcpStreamWriteHalf {
53 pub fn new(
56 io: FramedWrite<WriteHalf<TokioTcpStream>, DoipCodec>,
57 config: Option<SocketConfig>,
58 ) -> Self {
59 TcpStreamWriteHalf {
60 io,
61 config: config.unwrap_or_default(),
62 }
63 }
64
65 pub async fn send<A: DoipTcpPayload + DoipPayload + 'static>(
67 &mut self,
68 payload: A,
69 ) -> Result<(), SocketSendError> {
70 let msg = DoipMessage::new(self.config.protocol_version, Box::new(payload));
71
72 match self.io.send(msg).await {
73 Ok(_) => Ok(()),
74 Err(err) => Err(SocketSendError::EncodeError(err)),
75 }
76 }
77}