1use futures::future::BoxFuture;
17use std::{io, net::SocketAddr, sync::Arc};
18use tokio::sync::mpsc::channel;
19use tokio::sync::mpsc::{Receiver, Sender};
20
21use crate::config::Configuration;
22use crate::someip_codec::SomeIpPacket;
23use crate::tasks::{tcp_server_task, udp_task, uds_task};
24use crate::{ConnectionInfo, DispatcherCommand, DispatcherReply};
25
26#[derive(Default)]
27pub struct Server {}
28
29impl Server {
30 pub fn new() -> Self {
31 Server::default()
32 }
33
34 pub fn create_notify_channel(
35 size: usize,
36 ) -> (Sender<ConnectionInfo>, Receiver<ConnectionInfo>) {
37 channel::<ConnectionInfo>(size)
38 }
39}
40
41pub struct ServerRequestHandlerEntry {
42 pub name: &'static str,
43 pub instance_id: u16,
44 pub major_version: u8,
45 pub minor_version: u32,
46 pub handler: std::sync::Arc<dyn ServerRequestHandler>,
47}
48
49pub trait CreateServerRequestHandler {
50 type Item;
51 fn create_server_request_handler(
52 server: std::sync::Arc<Self::Item>,
53 ) -> Vec<ServerRequestHandlerEntry>;
54}
55pub trait ServerRequestHandler: Send + Sync {
56 fn get_handler(&self, message: SomeIpPacket) -> BoxFuture<'static, Option<SomeIpPacket>>;
58}
59pub trait ServiceIdentifier {
60 fn service_name() -> &'static str;
62}
63
64pub trait ServiceVersion {
65 fn __major_version__() -> u8 {
66 0
67 }
68 fn __minor_version__() -> u32 {
69 0
70 }
71}
72
73pub trait ServiceInstance {
74 fn __instance_id__() -> u16 {
75 0
76 }
77}
78
79#[allow(clippy::type_complexity)]
80impl Server {
81 pub async fn serve_uds(
87 uds: std::os::unix::net::UnixStream,
88 handlers: &[(
89 u16, Arc<dyn ServerRequestHandler>, u8, u32, )],
94 ) -> Result<(), io::Error> {
95 let (dx_tx, mut dx_rx) = channel::<DispatcherCommand>(10);
96 let _uds_task = tokio::spawn(async move { uds_task(dx_tx, uds).await });
97
98 while let Some(command) = dx_rx.recv().await {
100 let (response, tx) = match command {
101 DispatcherCommand::DispatchUds(packet, tx) => {
102 if let Some(handler) = handlers
103 .iter()
104 .find(|e| packet.header().service_id() == e.0)
105 {
106 (Self::server_dispatch(handler.1.clone(), packet).await, tx)
107 } else {
108 panic!("{}", "unhandled service id");
109 }
110 }
111 DispatcherCommand::DispatchUdp(_, _) => {
112 panic!("{}", "UDP is not expected here");
113 }
114 DispatcherCommand::DispatchTcp(_, _) => {
115 panic!("{}", "TCP is not expected here");
116 }
117 DispatcherCommand::Terminate => {
118 log::debug!("Dispatcher terminating");
119 break;
120 }
121 };
122 if let Err(_e) = tx.send(DispatcherReply::ResponsePacket(response)).await {
123 log::error!("Error sending response to UDS task");
124 break;
125 }
126 }
127 Ok(())
128 }
129
130 pub async fn serve<'a>(
133 at: SocketAddr,
134 handler: Arc<dyn ServerRequestHandler>,
135 config: Arc<Configuration>,
136 service_id: u16,
137 _major_version: u8,
138 _minor_version: u32,
139 notify_tcp_tx: Sender<ConnectionInfo>,
140 ) -> Result<(), io::Error> {
141 let (dx_tx, mut dx_rx) = channel::<DispatcherCommand>(10);
142
143 let udp_config = config.clone();
145 let tcp_dx_tx = dx_tx.clone();
146 let tcp_notifier = notify_tcp_tx.clone();
147 let tcp_task = tokio::spawn(async move {
148 tcp_server_task(tcp_dx_tx, &at, udp_config, service_id, tcp_notifier).await
149 });
150
151 let dx_tx = dx_tx.clone();
153 let udp_task =
154 tokio::spawn(
155 async move { udp_task(dx_tx, &at, config, service_id, notify_tcp_tx).await },
156 );
157
158 loop {
159 if let Some(command) = dx_rx.recv().await {
160 let (response, tx) = match command {
161 DispatcherCommand::DispatchUdp(packet, tx) => {
162 (Self::server_dispatch(handler.clone(), packet).await, tx)
163 }
164 DispatcherCommand::DispatchTcp(packet, tx) => {
165 (Self::server_dispatch(handler.clone(), packet).await, tx)
166 }
167 DispatcherCommand::Terminate => {
168 log::debug!("Dispatcher terminating");
169 break;
170 }
171 DispatcherCommand::DispatchUds(_, _) => {
172 panic!("{}", "UDS is not expected here");
173 }
174 };
175 if let Err(_e) = tx.send(DispatcherReply::ResponsePacket(response)).await {
176 log::error!("Error sending response to UDP task");
177 break;
178 }
179 } else {
180 log::error!("Dispatcher task error");
181 break;
182 }
183 }
184
185 udp_task.abort();
186 tcp_task.abort();
187
188 Ok(())
189 }
190
191 async fn server_dispatch<'a>(
192 handler: Arc<dyn ServerRequestHandler>,
193 packet: SomeIpPacket,
194 ) -> Option<SomeIpPacket> {
195 match packet.header().message_type {
196 someip_parse::MessageType::Request => {
197 handler.get_handler(packet).await
204 }
205 someip_parse::MessageType::RequestNoReturn => {
206 handler.get_handler(packet).await;
213 None
214 }
215 someip_parse::MessageType::Notification => {
216 log::error!("Server received Notification packet, dropped");
217 None
218 }
219 someip_parse::MessageType::Response => {
220 log::error!("Server received Response packet, dropped");
221 None
222 }
223 someip_parse::MessageType::Error => {
224 log::error!("Server received Error packet, dropped");
225 None
226 }
227 }
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::ConnectionMessage;
235 use crate::{connection::SomeIPCodec, someip_codec::SomeIpPacket};
236 use async_trait::async_trait;
237 use bytes::{Bytes, BytesMut};
238 use futures::SinkExt;
239 use someip_parse::{MessageType, SomeIpHeader};
240 use std::{net::SocketAddr, time::Duration};
241 use tokio::runtime::Runtime;
242
243 #[test]
244 fn test_basic() {
245 struct TestService;
246
247 #[async_trait]
248 impl ServerRequestHandler for TestService {
249 fn get_handler(
256 &self,
257 message: SomeIpPacket,
258 ) -> BoxFuture<'static, Option<SomeIpPacket>> {
259 Box::pin(async move {
260 println!("Packet received: {:?}", message);
261 assert_eq!(message.header().service_id(), 0x45);
262 assert_eq!(message.header().event_or_method_id(), 0x01);
263 Some(message)
264 })
265 }
266 }
267
268 let rt = Runtime::new().unwrap();
269 let config = Arc::new(Configuration::default());
270
271 let at = "127.0.0.1:8091".parse::<SocketAddr>().unwrap();
272 println!("Test");
273 rt.block_on(async {
274 let (tx, mut rx) = Server::create_notify_channel(1);
275
276 tokio::spawn(async move {
277 loop {
278 if let Some(msg) = rx.recv().await {
279 match msg {
280 ConnectionInfo::NewTcpConnection((_sender, i)) => {
281 println!("New connection from {}", i);
282 }
283 ConnectionInfo::ConnectionDropped(_i) => {}
284 ConnectionInfo::NewUdpConnection((sender, _i)) => {
285 let header = SomeIpHeader {
287 message_type: MessageType::Notification,
288 ..Default::default()
289 };
290 let pkt = SomeIpPacket::new(header, Bytes::new());
291 let _res = sender
292 .send(ConnectionMessage::SendUdpNotification((
293 pkt,
294 "127.0.0.1:9001".parse::<SocketAddr>().unwrap(),
295 )))
296 .await;
297 }
298 ConnectionInfo::UdpServerSocket(s) => {
299 assert_eq!(s, at);
300 }
301 ConnectionInfo::TcpServerSocket(s) => {
302 assert_eq!(s, at);
303 }
304 }
305 }
306 }
307 });
308
309 tokio::spawn(async move {
310 let service = Arc::new(TestService {});
312 println!("Going to run server");
313 let res = Server::serve(at, service, config, 45, 1, 0, tx).await;
314 println!("Server terminated");
315 if let Err(e) = res {
316 println!("Server error:{}", e);
317 }
318 });
319
320 tokio::time::sleep(Duration::from_millis(20)).await;
321
322 let addr = "127.0.0.1:8091".parse::<SocketAddr>().unwrap();
323 let mut tx_connection = SomeIPCodec::default().connect(&addr).await.unwrap();
324
325 let mut header = SomeIpHeader::default();
326 header.set_service_id(0x45);
327 header.set_method_id(0x01);
328
329 let payload = BytesMut::new().freeze();
330 let packet = SomeIpPacket::new(header, payload);
331
332 tx_connection.send(packet).await.unwrap();
333
334 println!("Sending terminate");
335 });
337 }
338}