someip/
server.rs

1/*
2    Copyright 2021 Sojan James
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6        http://www.apache.org/licenses/LICENSE-2.0
7    Unless required by applicable law or agreed to in writing, software
8    distributed under the License is distributed on an "AS IS" BASIS,
9    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10    See the License for the specific language governing permissions and
11    limitations under the License.
12*/
13
14// A SOME/IP server
15
16use futures::future::BoxFuture;
17use std::{io, net::SocketAddr, sync::Arc};
18use tokio::sync::mpsc::channel;
19use tokio::sync::mpsc::{Receiver, Sender};
20
21use crate::config::Configuration;
22use crate::someip_codec::SomeIpPacket;
23use crate::tasks::{tcp_server_task, udp_task, uds_task};
24use crate::{ConnectionInfo, DispatcherCommand, DispatcherReply};
25
26#[derive(Default)]
27pub struct Server {}
28
29impl Server {
30    pub fn new() -> Self {
31        Server::default()
32    }
33
34    pub fn create_notify_channel(
35        size: usize,
36    ) -> (Sender<ConnectionInfo>, Receiver<ConnectionInfo>) {
37        channel::<ConnectionInfo>(size)
38    }
39}
40
41pub struct ServerRequestHandlerEntry {
42    pub name: &'static str,
43    pub instance_id: u16,
44    pub major_version: u8,
45    pub minor_version: u32,
46    pub handler: std::sync::Arc<dyn ServerRequestHandler>,
47}
48
49pub trait CreateServerRequestHandler {
50    type Item;
51    fn create_server_request_handler(
52        server: std::sync::Arc<Self::Item>,
53    ) -> Vec<ServerRequestHandlerEntry>;
54}
55pub trait ServerRequestHandler: Send + Sync {
56    /// Return a boxed future that can be used to dispatch this message
57    fn get_handler(&self, message: SomeIpPacket) -> BoxFuture<'static, Option<SomeIpPacket>>;
58}
59pub trait ServiceIdentifier {
60    /// The service name for this service
61    fn service_name() -> &'static str;
62}
63
64pub trait ServiceVersion {
65    fn __major_version__() -> u8 {
66        0
67    }
68    fn __minor_version__() -> u32 {
69        0
70    }
71}
72
73pub trait ServiceInstance {
74    fn __instance_id__() -> u16 {
75        0
76    }
77}
78
79#[allow(clippy::type_complexity)]
80impl Server {
81    /// Serve services on a UDS connection.
82    ///
83    /// Since UDS doesn't have the concept of ports, we allow multiple service IDs on a single connection.
84    /// the handlers variable is a slice of (service_id, handler, major_version, minor_version)
85    /// This call does not return as long as the connection is active.
86    pub async fn serve_uds(
87        uds: std::os::unix::net::UnixStream,
88        handlers: &[(
89            u16,                           // service_id
90            Arc<dyn ServerRequestHandler>, // handler
91            u8,                            // major number
92            u32,                           // minor number
93        )],
94    ) -> Result<(), io::Error> {
95        let (dx_tx, mut dx_rx) = channel::<DispatcherCommand>(10);
96        let _uds_task = tokio::spawn(async move { uds_task(dx_tx, uds).await });
97
98        // Servers are also async and should not block
99        while let Some(command) = dx_rx.recv().await {
100            let (response, tx) = match command {
101                DispatcherCommand::DispatchUds(packet, tx) => {
102                    if let Some(handler) = handlers
103                        .iter()
104                        .find(|e| packet.header().service_id() == e.0)
105                    {
106                        (Self::server_dispatch(handler.1.clone(), packet).await, tx)
107                    } else {
108                        panic!("{}", "unhandled service id");
109                    }
110                }
111                DispatcherCommand::DispatchUdp(_, _) => {
112                    panic!("{}", "UDP is not expected here");
113                }
114                DispatcherCommand::DispatchTcp(_, _) => {
115                    panic!("{}", "TCP is not expected here");
116                }
117                DispatcherCommand::Terminate => {
118                    log::debug!("Dispatcher terminating");
119                    break;
120                }
121            };
122            if let Err(_e) = tx.send(DispatcherReply::ResponsePacket(response)).await {
123                log::error!("Error sending response to UDS task");
124                break;
125            }
126        }
127        Ok(())
128    }
129
130    /// start serving.  This function doesn't return unless there is an
131    /// unrecoverable error.
132    pub async fn serve<'a>(
133        at: SocketAddr,
134        handler: Arc<dyn ServerRequestHandler>,
135        config: Arc<Configuration>,
136        service_id: u16,
137        _major_version: u8,
138        _minor_version: u32,
139        notify_tcp_tx: Sender<ConnectionInfo>,
140    ) -> Result<(), io::Error> {
141        let (dx_tx, mut dx_rx) = channel::<DispatcherCommand>(10);
142
143        // cloning dispatcher handle as it needs to move into the TCP task
144        let udp_config = config.clone();
145        let tcp_dx_tx = dx_tx.clone();
146        let tcp_notifier = notify_tcp_tx.clone();
147        let tcp_task = tokio::spawn(async move {
148            tcp_server_task(tcp_dx_tx, &at, udp_config, service_id, tcp_notifier).await
149        });
150
151        //cloning dx_tx tp move into the UDP task
152        let dx_tx = dx_tx.clone();
153        let udp_task =
154            tokio::spawn(
155                async move { udp_task(dx_tx, &at, config, service_id, notify_tcp_tx).await },
156            );
157
158        loop {
159            if let Some(command) = dx_rx.recv().await {
160                let (response, tx) = match command {
161                    DispatcherCommand::DispatchUdp(packet, tx) => {
162                        (Self::server_dispatch(handler.clone(), packet).await, tx)
163                    }
164                    DispatcherCommand::DispatchTcp(packet, tx) => {
165                        (Self::server_dispatch(handler.clone(), packet).await, tx)
166                    }
167                    DispatcherCommand::Terminate => {
168                        log::debug!("Dispatcher terminating");
169                        break;
170                    }
171                    DispatcherCommand::DispatchUds(_, _) => {
172                        panic!("{}", "UDS is not expected here");
173                    }
174                };
175                if let Err(_e) = tx.send(DispatcherReply::ResponsePacket(response)).await {
176                    log::error!("Error sending response to UDP task");
177                    break;
178                }
179            } else {
180                log::error!("Dispatcher task error");
181                break;
182            }
183        }
184
185        udp_task.abort();
186        tcp_task.abort();
187
188        Ok(())
189    }
190
191    async fn server_dispatch<'a>(
192        handler: Arc<dyn ServerRequestHandler>,
193        packet: SomeIpPacket,
194    ) -> Option<SomeIpPacket> {
195        match packet.header().message_type {
196            someip_parse::MessageType::Request => {
197                //let handler = if let Ok(handler) = handler.lock() {
198                //    handler.get_handler(packet)
199                //} else {
200                //    log::error!("Mutex poisoned?");
201                //    panic!("getting lock for handler");
202                //};
203                handler.get_handler(packet).await
204            }
205            someip_parse::MessageType::RequestNoReturn => {
206                //let handler = if let Ok(handler) = handler.lock() {
207                //    handler.get_handler(packet)
208                //} else {
209                //    log::error!("Mutex poisoned?");
210                //    panic!("getting lock for handler");
211                //};
212                handler.get_handler(packet).await;
213                None
214            }
215            someip_parse::MessageType::Notification => {
216                log::error!("Server received Notification packet, dropped");
217                None
218            }
219            someip_parse::MessageType::Response => {
220                log::error!("Server received Response packet, dropped");
221                None
222            }
223            someip_parse::MessageType::Error => {
224                log::error!("Server received Error packet, dropped");
225                None
226            }
227        }
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::ConnectionMessage;
235    use crate::{connection::SomeIPCodec, someip_codec::SomeIpPacket};
236    use async_trait::async_trait;
237    use bytes::{Bytes, BytesMut};
238    use futures::SinkExt;
239    use someip_parse::{MessageType, SomeIpHeader};
240    use std::{net::SocketAddr, time::Duration};
241    use tokio::runtime::Runtime;
242
243    #[test]
244    fn test_basic() {
245        struct TestService;
246
247        #[async_trait]
248        impl ServerRequestHandler for TestService {
249            /*async fn handle(&mut self, message: SomeIpPacket) -> Option<SomeIpPacket> {
250                println!("Packet received: {:?}", message);
251                assert_eq!(message.header().service_id(), 0x45);
252                assert_eq!(message.header().event_or_method_id(), 0x01);
253                Some(message)
254            }*/
255            fn get_handler(
256                &self,
257                message: SomeIpPacket,
258            ) -> BoxFuture<'static, Option<SomeIpPacket>> {
259                Box::pin(async move {
260                    println!("Packet received: {:?}", message);
261                    assert_eq!(message.header().service_id(), 0x45);
262                    assert_eq!(message.header().event_or_method_id(), 0x01);
263                    Some(message)
264                })
265            }
266        }
267
268        let rt = Runtime::new().unwrap();
269        let config = Arc::new(Configuration::default());
270
271        let at = "127.0.0.1:8091".parse::<SocketAddr>().unwrap();
272        println!("Test");
273        rt.block_on(async {
274            let (tx, mut rx) = Server::create_notify_channel(1);
275
276            tokio::spawn(async move {
277                loop {
278                    if let Some(msg) = rx.recv().await {
279                        match msg {
280                            ConnectionInfo::NewTcpConnection((_sender, i)) => {
281                                println!("New connection from {}", i);
282                            }
283                            ConnectionInfo::ConnectionDropped(_i) => {}
284                            ConnectionInfo::NewUdpConnection((sender, _i)) => {
285                                //test notification packet
286                                let header = SomeIpHeader {
287                                    message_type: MessageType::Notification,
288                                    ..Default::default()
289                                };
290                                let pkt = SomeIpPacket::new(header, Bytes::new());
291                                let _res = sender
292                                    .send(ConnectionMessage::SendUdpNotification((
293                                        pkt,
294                                        "127.0.0.1:9001".parse::<SocketAddr>().unwrap(),
295                                    )))
296                                    .await;
297                            }
298                            ConnectionInfo::UdpServerSocket(s) => {
299                                assert_eq!(s, at);
300                            }
301                            ConnectionInfo::TcpServerSocket(s) => {
302                                assert_eq!(s, at);
303                            }
304                        }
305                    }
306                }
307            });
308
309            tokio::spawn(async move {
310                //let test_service: Box<dyn ServerRequestHandler> = Box::new(TestService {});
311                let service = Arc::new(TestService {});
312                println!("Going to run server");
313                let res = Server::serve(at, service, config, 45, 1, 0, tx).await;
314                println!("Server terminated");
315                if let Err(e) = res {
316                    println!("Server error:{}", e);
317                }
318            });
319
320            tokio::time::sleep(Duration::from_millis(20)).await;
321
322            let addr = "127.0.0.1:8091".parse::<SocketAddr>().unwrap();
323            let mut tx_connection = SomeIPCodec::default().connect(&addr).await.unwrap();
324
325            let mut header = SomeIpHeader::default();
326            header.set_service_id(0x45);
327            header.set_method_id(0x01);
328
329            let payload = BytesMut::new().freeze();
330            let packet = SomeIpPacket::new(header, payload);
331
332            tx_connection.send(packet).await.unwrap();
333
334            println!("Sending terminate");
335            //let res = &mut handle.terminate().await;
336        });
337    }
338}