1use bytes::{Buf, BufMut, Bytes, BytesMut};
56use doip::*;
57use futures::Stream;
58use std::io::Cursor;
59use std::num::TryFromIntError;
60use std::{io, pin::Pin, task::Poll};
61use thiserror::Error;
62use tokio_util::{
63 codec::{Decoder, Encoder},
64 udp::UdpFramed,
65};
66
67mod client;
68mod server;
69
70pub use client::{DoIpClient, DoIpClientOptions};
71pub use server::{ClientContext, DoIpServer, DoIpServerHandler, ServerError};
72
73pub const UDP_DISCOVERY_PORT: u16 = 13400;
74pub const TCP_DATA_PORT: u16 = 13400;
75pub const TCP_DATA_TLS_PORT: u16 = 3496;
76
77#[derive(Error, Debug)]
78pub enum DoIpTokioError {
79 #[error(transparent)]
80 Io(#[from] io::Error),
81 #[error(transparent)]
82 TryFromInt(#[from] TryFromIntError),
83 #[error("The client logical address: {0:X} is not within the valid range 0x0E00 - 0x0FFF")]
84 InvalidClientLogicalAddr(u16),
85 #[error(transparent)]
86 Anyhow(#[from] anyhow::Error),
87 #[error(transparent)]
88 Parse(#[from] DoIpError),
89 #[error("Diagnostic message negative acknowledgement code: {0:?}")]
90 DiagnosticMessageNegativeAck(DiagnosticMessageNegativeAck),
91}
92
93pub struct VehicleIdentificationStream {
95 udp_framed: UdpFramed<DoIpCodec>,
96}
97
98impl Stream for VehicleIdentificationStream {
99 type Item = Result<VehicleIdentificationResponse, DoIpTokioError>;
100
101 fn poll_next(
102 mut self: std::pin::Pin<&mut Self>,
103 cx: &mut std::task::Context<'_>,
104 ) -> std::task::Poll<Option<Self::Item>> {
105 match Pin::new(&mut self.udp_framed).poll_next(cx) {
106 Poll::Ready(Some(Ok(((header, payload), _socket_addr)))) => {
107 if header.payload_type
108 == PayloadType::VehicleAnnouncementMessageVehicleIdentificationResponse
109 {
110 let announcement =
111 VehicleIdentificationResponse::read(&mut Cursor::new(payload))
112 .map_err(DoIpTokioError::Parse);
113 Poll::Ready(Some(announcement))
114 } else {
115 Poll::Pending
116 }
117 }
118 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
119 Poll::Ready(None) => Poll::Ready(None),
120 Poll::Pending => Poll::Pending,
121 }
122 }
123}
124
125pub struct DoIpMessageStream {
126 udp_framed: UdpFramed<DoIpCodec>,
127}
128
129impl Stream for DoIpMessageStream {
130 type Item = Result<DoIpMessage, DoIpTokioError>;
131
132 fn poll_next(
133 mut self: std::pin::Pin<&mut Self>,
134 cx: &mut std::task::Context<'_>,
135 ) -> std::task::Poll<Option<Self::Item>> {
136 match Pin::new(&mut self.udp_framed).poll_next(cx) {
137 Poll::Ready(Some(Ok(((header, payload), _socket_addr)))) => {
138 Poll::Ready(Some(Ok(DoIpMessage {
139 header,
140 payload: payload.to_vec(),
141 })))
142 }
143 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
144 Poll::Ready(None) => Poll::Ready(None),
145 Poll::Pending => Poll::Pending,
146 }
147 }
148}
149
150#[derive(Default)]
151pub struct DoIpCodec {}
152
153impl Decoder for DoIpCodec {
154 type Item = (DoIpHeader, Bytes);
155 type Error = DoIpTokioError;
156
157 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
158 let header = DoIpHeader::read(&mut src.reader())?;
159 let payload = src.copy_to_bytes(header.payload_length as usize);
160
161 Ok(Some((header, payload)))
162 }
163}
164
165impl Encoder<(&DoIpHeader, &[u8])> for DoIpCodec {
166 type Error = DoIpTokioError;
167
168 fn encode(
169 &mut self,
170 message: (&DoIpHeader, &[u8]),
171 dst: &mut BytesMut,
172 ) -> Result<(), Self::Error> {
173 let (header, payload) = message;
174 header.write(&mut dst.writer())?;
175 dst.put(payload);
176 Ok(())
177 }
178}