Skip to main content

simple_someip/server/
mod.rs

1//! SOME/IP Server/Provider functionality
2//!
3//! This module provides server-side SOME/IP functionality including:
4//! - Service offering/announcement via Service Discovery
5//! - Event publishing to subscribers
6//! - Event group management
7//! - Request/Response handling
8
9mod event_publisher;
10mod service_info;
11mod subscription_manager;
12
13pub use event_publisher::EventPublisher;
14pub use service_info::{EventGroupInfo, ServiceInfo};
15pub use subscription_manager::SubscriptionManager;
16
17use crate::Error;
18use crate::protocol::sd::{
19    self, Entry, Flags, OptionsCount, SdEntries, SdOptions, ServiceEntry, TransportProtocol,
20};
21use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU16, Ordering};
24use tokio::net::UdpSocket;
25use tokio::sync::RwLock;
26
27/// Compute the SOME/IP header `length` field (payload + 8 bytes of header overhead).
28///
29/// Panics if the total exceeds `u32::MAX`, which would cause silent truncation.
30pub(crate) fn someip_length(payload_len: usize) -> u32 {
31    const HEADER_OVERHEAD: usize = 8;
32    let total = payload_len + HEADER_OVERHEAD;
33    u32::try_from(total).expect("SOME/IP payload too large: length exceeds u32::MAX")
34}
35
36/// Configuration for a SOME/IP service provider
37#[derive(Debug, Clone)]
38pub struct ServerConfig {
39    /// Local interface IP address
40    pub interface: Ipv4Addr,
41    /// Port to bind for receiving subscriptions and requests
42    pub local_port: u16,
43    /// Service ID being offered
44    pub service_id: u16,
45    /// Instance ID
46    pub instance_id: u16,
47    /// Major version
48    pub major_version: u8,
49    /// Minor version
50    pub minor_version: u32,
51    /// Service Discovery TTL (time to live)
52    pub ttl: u32,
53}
54
55impl ServerConfig {
56    /// Create a new server configuration
57    #[must_use]
58    pub fn new(interface: Ipv4Addr, local_port: u16, service_id: u16, instance_id: u16) -> Self {
59        Self {
60            interface,
61            local_port,
62            service_id,
63            instance_id,
64            major_version: 1,
65            minor_version: 0,
66            ttl: 3, // 3 seconds is typical for SOME/IP
67        }
68    }
69}
70
71/// SOME/IP Server that can offer services and publish events
72pub struct Server<
73    const MAX_ENTRIES: usize = { sd::MAX_SD_ENTRIES },
74    const MAX_OPTIONS: usize = { sd::MAX_SD_OPTIONS },
75> {
76    config: ServerConfig,
77    /// Socket for receiving subscription requests
78    unicast_socket: Arc<UdpSocket>,
79    /// Socket for sending SD announcements
80    sd_socket: Arc<UdpSocket>,
81    /// Subscription manager
82    subscriptions: Arc<RwLock<SubscriptionManager>>,
83    /// Event publisher
84    publisher: Arc<EventPublisher>,
85    /// Incrementing session ID for SD messages
86    sd_session_id: Arc<AtomicU16>,
87}
88
89impl<const E: usize, const O: usize> Server<E, O> {
90    /// Create a new SOME/IP server
91    pub async fn new(config: ServerConfig) -> Result<Self, Error> {
92        // Bind unicast socket for receiving subscriptions
93        let unicast_addr = SocketAddrV4::new(config.interface, config.local_port);
94        let unicast_socket = Arc::new(UdpSocket::bind(unicast_addr).await?);
95        tracing::info!(
96            "Server bound to {} for service 0x{:04X}",
97            unicast_addr,
98            config.service_id
99        );
100
101        // Bind SD socket for sending/receiving SD messages (must use SD port 30490)
102        let expected_sd_port = crate::SD_MULTICAST_PORT;
103        let sd_bind_addr =
104            std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), expected_sd_port);
105        let sd_raw_socket = socket2::Socket::new(
106            socket2::Domain::IPV4,
107            socket2::Type::DGRAM,
108            Some(socket2::Protocol::UDP),
109        )?;
110        sd_raw_socket.set_reuse_address(true)?;
111        sd_raw_socket.bind(&sd_bind_addr.into())?;
112        sd_raw_socket.set_nonblocking(true)?;
113        let sd_std_socket: std::net::UdpSocket = sd_raw_socket.into();
114        let sd_socket = UdpSocket::from_std(sd_std_socket)?;
115
116        // Join SD multicast group to receive FindService and SubscribeEventGroup
117        sd_socket.join_multicast_v4(crate::SD_MULTICAST_IP, config.interface)?;
118        let actual_sd_addr = sd_socket.local_addr()?;
119        tracing::info!(
120            "Server SD socket bound to {} (expected port {}), joined multicast {}",
121            actual_sd_addr,
122            expected_sd_port,
123            crate::SD_MULTICAST_IP
124        );
125        if let std::net::SocketAddr::V4(v4) = actual_sd_addr
126            && v4.port() != expected_sd_port
127        {
128            tracing::error!(
129                "SD socket port mismatch! Expected {}, got {}. Offers will use wrong source port.",
130                expected_sd_port,
131                v4.port()
132            );
133        }
134
135        let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new()));
136        let publisher = Arc::new(EventPublisher::new(
137            Arc::clone(&subscriptions),
138            Arc::clone(&unicast_socket),
139        ));
140
141        Ok(Self {
142            config,
143            unicast_socket,
144            sd_socket: Arc::new(sd_socket),
145            subscriptions,
146            publisher,
147            sd_session_id: Arc::new(AtomicU16::new(1)),
148        })
149    }
150
151    /// Start announcing the service via Service Discovery
152    ///
153    /// This sends periodic `OfferService` messages to the SD multicast group
154    pub fn start_announcing(&self) -> Result<(), Error> {
155        let config = self.config.clone();
156        let sd_socket = Arc::clone(&self.sd_socket);
157        let sd_session_id = Arc::clone(&self.sd_session_id);
158
159        tokio::spawn(async move {
160            let mut announcement_count = 0u32;
161            loop {
162                match Self::send_offer_service(&config, &sd_socket, &sd_session_id).await {
163                    Ok(()) => {
164                        announcement_count += 1;
165                        if announcement_count == 1 {
166                            tracing::info!(
167                                "Sent first SD announcement for service 0x{:04X}",
168                                config.service_id
169                            );
170                        } else {
171                            tracing::debug!(
172                                "Sent {} SD announcements for service 0x{:04X}",
173                                announcement_count,
174                                config.service_id
175                            );
176                        }
177                    }
178                    Err(e) => {
179                        tracing::error!("Failed to send OfferService: {:?}", e);
180                    }
181                }
182
183                // Send announcements every 1 second
184                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
185            }
186        });
187
188        Ok(())
189    }
190
191    /// Send an `OfferService` message via Service Discovery
192    async fn send_offer_service(
193        config: &ServerConfig,
194        socket: &UdpSocket,
195        session_id: &AtomicU16,
196    ) -> Result<(), Error> {
197        use crate::protocol::{
198            Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
199        };
200        use crate::traits::WireFormat;
201
202        // Create OfferService entry
203        let entry = Entry::OfferService(ServiceEntry {
204            index_first_options_run: 0,
205            index_second_options_run: 0,
206            options_count: OptionsCount::new(1, 0),
207            service_id: config.service_id,
208            instance_id: config.instance_id,
209            major_version: config.major_version,
210            ttl: config.ttl,
211            minor_version: config.minor_version,
212        });
213
214        // Create IPv4 endpoint option
215        let option = sd::Options::IpV4Endpoint {
216            ip: config.interface,
217            port: config.local_port,
218            protocol: TransportProtocol::Udp,
219        };
220
221        // Create SD header with reboot flag set
222        let mut entries = SdEntries::<E>::new();
223        let mut options = SdOptions::<O>::new();
224        entries
225            .push(entry)
226            .expect("SdEntries capacity E must be at least 1 to send OfferService");
227        options
228            .push(option)
229            .expect("SdOptions capacity O must be at least 1 to send OfferService");
230        let sd_payload = sd::Header::<E, O> {
231            flags: Flags::new(true, true),
232            entries,
233            options,
234        };
235
236        // Encode SD payload
237        let mut sd_data = Vec::new();
238        sd_payload.encode(&mut sd_data)?;
239
240        // Increment session ID (wrapping from 0xFFFF back to 0x0001, skipping 0)
241        let prev = session_id
242            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
243                let next = v.wrapping_add(1);
244                Some(if next == 0 { 1 } else { next })
245            })
246            .unwrap();
247        let next = prev.wrapping_add(1);
248        let sid = u32::from(if next == 0 { 1 } else { next });
249
250        // Wrap in SOME/IP header for SD (service 0xFFFF, method 0x8100)
251        let someip_header = SomeIpHeader {
252            message_id: MessageId::SD,
253            length: someip_length(sd_data.len()),
254            request_id: sid,
255            protocol_version: 0x01,
256            interface_version: 0x01,
257            message_type: MessageTypeField::new(MessageType::Notification, false),
258            return_code: ReturnCode::Ok,
259        };
260
261        // Encode complete SOME/IP-SD message
262        let mut buffer = Vec::new();
263        someip_header.encode(&mut buffer)?;
264        buffer.extend_from_slice(&sd_data);
265
266        let multicast_addr = SocketAddrV4::new(crate::SD_MULTICAST_IP, crate::SD_MULTICAST_PORT);
267
268        tracing::trace!(
269            "Sending OfferService: service=0x{:04X}, instance={}, port={}, size={} bytes",
270            config.service_id,
271            config.instance_id,
272            config.local_port,
273            buffer.len()
274        );
275        tracing::trace!(
276            "OfferService data: {:02X?}",
277            &buffer[..buffer.len().min(64)]
278        );
279
280        socket.send_to(&buffer, multicast_addr).await?;
281        tracing::trace!("Sent to {}", multicast_addr);
282
283        Ok(())
284    }
285
286    /// Send a unicast `OfferService` to a specific address (in response to `FindService`)
287    async fn send_unicast_offer(&self, target: std::net::SocketAddr) -> Result<(), Error> {
288        use crate::protocol::{
289            Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
290        };
291        use crate::traits::WireFormat;
292
293        let entry = Entry::OfferService(ServiceEntry {
294            index_first_options_run: 0,
295            index_second_options_run: 0,
296            options_count: OptionsCount::new(1, 0),
297            service_id: self.config.service_id,
298            instance_id: self.config.instance_id,
299            major_version: self.config.major_version,
300            ttl: self.config.ttl,
301            minor_version: self.config.minor_version,
302        });
303
304        let option = sd::Options::IpV4Endpoint {
305            ip: self.config.interface,
306            port: self.config.local_port,
307            protocol: TransportProtocol::Udp,
308        };
309
310        let mut entries = SdEntries::<E>::new();
311        let mut options = SdOptions::<O>::new();
312        entries
313            .push(entry)
314            .expect("SdEntries capacity E must be at least 1 for unicast offers");
315        options
316            .push(option)
317            .expect("SdOptions capacity O must be at least 1 for unicast offers");
318        let sd_payload = sd::Header::<E, O> {
319            flags: Flags::new(true, true), // reboot + unicast flags set
320            entries,
321            options,
322        };
323
324        let mut sd_data = Vec::new();
325        sd_payload.encode(&mut sd_data)?;
326
327        let sid = self.next_sd_session_id();
328        let someip_header = SomeIpHeader {
329            message_id: MessageId::SD,
330            length: someip_length(sd_data.len()),
331            request_id: sid,
332            protocol_version: 0x01,
333            interface_version: 0x01,
334            message_type: MessageTypeField::new(MessageType::Notification, false),
335            return_code: ReturnCode::Ok,
336        };
337
338        let mut buffer = Vec::new();
339        someip_header.encode(&mut buffer)?;
340        buffer.extend_from_slice(&sd_data);
341
342        self.sd_socket.send_to(&buffer, target).await?;
343        tracing::debug!(
344            "Sent unicast OfferService to {} for service 0x{:04X}",
345            target,
346            self.config.service_id
347        );
348
349        Ok(())
350    }
351
352    /// Get the next SD session ID (`client_id=0`, `session_id` incrementing), skipping 0
353    fn next_sd_session_id(&self) -> u32 {
354        let prev = self
355            .sd_session_id
356            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
357                let next = v.wrapping_add(1);
358                Some(if next == 0 { 1 } else { next })
359            })
360            .unwrap();
361        // fetch_update returns the previous value; compute the same next value
362        let next = prev.wrapping_add(1);
363        u32::from(if next == 0 { 1 } else { next })
364    }
365
366    /// Get the event publisher for sending events
367    #[must_use]
368    pub fn publisher(&self) -> Arc<EventPublisher> {
369        Arc::clone(&self.publisher)
370    }
371
372    /// Run the server event loop
373    ///
374    /// Handles incoming subscription requests and manages event groups.
375    /// Listens on both the unicast socket (for direct requests) and the
376    /// SD multicast socket (for `FindService` and `SubscribeEventGroup`).
377    pub async fn run(&mut self) -> Result<(), Error> {
378        use crate::protocol::Header as SomeIpHeader;
379        use crate::traits::WireFormat;
380
381        let mut unicast_buf = vec![0u8; 65535];
382        let mut sd_buf = vec![0u8; 65535];
383
384        loop {
385            let (data, len, addr, source) = tokio::select! {
386                result = self.unicast_socket.recv_from(&mut unicast_buf) => {
387                    let (len, addr) = result?;
388                    (&unicast_buf[..], len, addr, "unicast")
389                }
390                result = self.sd_socket.recv_from(&mut sd_buf) => {
391                    let (len, addr) = result?;
392                    (&sd_buf[..], len, addr, "sd-multicast")
393                }
394            };
395            let data = &data[..len];
396
397            // Skip our own multicast messages
398            if let std::net::SocketAddr::V4(v4) = addr
399                && *v4.ip() == self.config.interface
400                && source == "sd-multicast"
401            {
402                tracing::trace!("Ignoring our own SD multicast message");
403                continue;
404            }
405
406            tracing::trace!("Received {} bytes from {} on {} socket", len, addr, source);
407            tracing::trace!("Raw data: {:02X?}", &data[..len.min(64)]);
408
409            // Try to parse as SOME/IP message
410            let mut reader = data;
411            match SomeIpHeader::decode(&mut reader) {
412                Ok(header) => {
413                    tracing::trace!(
414                        "SOME/IP Header: service=0x{:04X}, method=0x{:04X}, type={:?}",
415                        header.message_id.service_id(),
416                        header.message_id.method_id(),
417                        header.message_type.message_type()
418                    );
419
420                    // Check if this is a Service Discovery message (0xFFFF8100)
421                    if header.message_id.service_id() == 0xFFFF
422                        && header.message_id.method_id() == 0x8100
423                    {
424                        tracing::trace!("This is an SD message");
425                        // Parse SD payload
426                        match sd::Header::<E, O>::decode(&mut reader) {
427                            Ok(sd_msg) => {
428                                tracing::trace!(
429                                    "SD message has {} entries, {} options",
430                                    sd_msg.entries.len(),
431                                    sd_msg.options.len()
432                                );
433                                self.handle_sd_message(sd_msg, addr).await?;
434                            }
435                            Err(e) => {
436                                tracing::warn!("Failed to parse SD message: {:?}", e);
437                            }
438                        }
439                    } else {
440                        tracing::trace!("Non-SD SOME/IP message, ignoring");
441                    }
442                }
443                Err(e) => {
444                    tracing::warn!("Failed to parse SOME/IP header from {}: {:?}", addr, e);
445                    tracing::trace!("Data: {:02X?}", &data[..len.min(32)]);
446                }
447            }
448        }
449    }
450
451    /// Handle a Service Discovery message
452    async fn handle_sd_message(
453        &mut self,
454        sd_msg: sd::Header<E, O>,
455        sender: std::net::SocketAddr,
456    ) -> Result<(), Error> {
457        tracing::trace!("Handling SD message from {}", sender);
458
459        for entry in &sd_msg.entries {
460            match entry {
461                Entry::SubscribeEventGroup(sub) => {
462                    tracing::debug!(
463                        "Received Subscribe from {}: service=0x{:04X}, instance={}, eventgroup=0x{:04X}",
464                        sender,
465                        sub.service_id,
466                        sub.instance_id,
467                        sub.event_group_id
468                    );
469
470                    // Check if this is for our service
471                    if sub.service_id != self.config.service_id {
472                        tracing::warn!(
473                            "Subscribe for wrong service: expected 0x{:04X}, got 0x{:04X}",
474                            self.config.service_id,
475                            sub.service_id
476                        );
477                        self.send_subscribe_nack(sub, sender, "Wrong service ID")
478                            .await?;
479                    } else if sub.instance_id != self.config.instance_id {
480                        tracing::warn!(
481                            "Subscribe for wrong instance: expected {}, got {}",
482                            self.config.instance_id,
483                            sub.instance_id
484                        );
485                        self.send_subscribe_nack(sub, sender, "Wrong instance ID")
486                            .await?;
487                    } else {
488                        // Extract subscriber endpoint from options
489                        if let Some(endpoint_addr) = Self::extract_endpoint(&sd_msg.options) {
490                            // The endpoint in SubscribeEventGroup is the subscriber's
491                            // receive address — where they want events sent to.
492                            let mut subs = self.subscriptions.write().await;
493                            subs.subscribe(
494                                sub.service_id,
495                                sub.instance_id,
496                                sub.event_group_id,
497                                endpoint_addr,
498                            );
499
500                            // Send SubscribeAck
501                            self.send_subscribe_ack(sub, sender).await?;
502                        } else {
503                            tracing::warn!("No endpoint found in Subscribe message options");
504                            self.send_subscribe_nack(sub, sender, "No endpoint in options")
505                                .await?;
506                        }
507                    }
508                }
509                Entry::FindService(find) => {
510                    // Check if this FindService is for our service (or wildcard 0xFFFF)
511                    if find.service_id == self.config.service_id || find.service_id == 0xFFFF {
512                        tracing::debug!(
513                            "Received FindService from {} for service 0x{:04X} (ours: 0x{:04X}), sending unicast offer",
514                            sender,
515                            find.service_id,
516                            self.config.service_id
517                        );
518                        self.send_unicast_offer(sender).await?;
519                    } else {
520                        tracing::trace!(
521                            "Ignoring FindService for service 0x{:04X} (not ours)",
522                            find.service_id
523                        );
524                    }
525                }
526                _ => {
527                    tracing::trace!("Ignoring SD entry: {:?}", entry);
528                }
529            }
530        }
531
532        Ok(())
533    }
534
535    /// Extract endpoint address from SD options
536    fn extract_endpoint(options: &[sd::Options]) -> Option<SocketAddrV4> {
537        tracing::trace!("Extracting endpoint from {} options", options.len());
538        for option in options {
539            tracing::trace!("Option: {:?}", option);
540            if let sd::Options::IpV4Endpoint { ip, port, .. } = option {
541                tracing::trace!("Found IPv4 endpoint: {}:{}", ip, port);
542                return Some(SocketAddrV4::new(*ip, *port));
543            }
544        }
545        tracing::warn!("No IPv4 endpoint found in options");
546        None
547    }
548
549    /// Send `SubscribeAck` in response to a subscription request
550    async fn send_subscribe_ack(
551        &self,
552        subscription: &sd::EventGroupEntry,
553        subscriber: std::net::SocketAddr,
554    ) -> Result<(), Error> {
555        use crate::protocol::{
556            Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
557        };
558        use crate::traits::WireFormat;
559
560        // Create SubscribeAck entry
561        let ack_entry = Entry::SubscribeAckEventGroup(sd::EventGroupEntry {
562            index_first_options_run: 0,
563            index_second_options_run: 0,
564            options_count: OptionsCount::new(0, 0),
565            service_id: subscription.service_id,
566            instance_id: subscription.instance_id,
567            major_version: subscription.major_version,
568            ttl: self.config.ttl,
569            counter: subscription.counter,
570            event_group_id: subscription.event_group_id,
571        });
572
573        // Create SD header
574        let mut entries = SdEntries::<E>::new();
575        entries
576            .push(ack_entry)
577            .expect("SdEntries capacity E must allow at least one entry for SubscribeAck");
578        let sd_payload = sd::Header::<E, O> {
579            flags: Flags::new(true, true), // reboot + unicast flags set
580            entries,
581            options: SdOptions::<O>::new(),
582        };
583
584        // Encode SD payload
585        let mut sd_data = Vec::new();
586        sd_payload.encode(&mut sd_data)?;
587
588        // Wrap in SOME/IP header
589        let sid = self.next_sd_session_id();
590        let someip_header = SomeIpHeader {
591            message_id: MessageId::SD,
592            length: someip_length(sd_data.len()),
593            request_id: sid,
594            protocol_version: 0x01,
595            interface_version: 0x01,
596            message_type: MessageTypeField::new(MessageType::Notification, false),
597            return_code: ReturnCode::Ok,
598        };
599
600        // Encode complete message
601        let mut buffer = Vec::new();
602        someip_header.encode(&mut buffer)?;
603        buffer.extend_from_slice(&sd_data);
604
605        // Send SubscribeAck to the subscriber
606        self.unicast_socket.send_to(&buffer, subscriber).await?;
607
608        tracing::debug!(
609            "Sent SubscribeAck to {} for service 0x{:04X}, eventgroup 0x{:04X}",
610            subscriber,
611            subscription.service_id,
612            subscription.event_group_id
613        );
614
615        Ok(())
616    }
617
618    /// Send `SubscribeNack` (Negative Acknowledgement) for rejected subscription
619    ///
620    /// According to SOME/IP spec, `SubscribeNack` is indicated by TTL=0 in `SubscribeAckEventGroup`
621    async fn send_subscribe_nack(
622        &self,
623        subscription: &sd::EventGroupEntry,
624        subscriber: std::net::SocketAddr,
625        reason: &str,
626    ) -> Result<(), Error> {
627        use crate::protocol::{
628            Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
629        };
630        use crate::traits::WireFormat;
631
632        // Create SubscribeNack entry (SubscribeAck with TTL=0 indicates rejection)
633        let nack_entry = Entry::SubscribeAckEventGroup(sd::EventGroupEntry {
634            index_first_options_run: 0,
635            index_second_options_run: 0,
636            options_count: OptionsCount::new(0, 0),
637            service_id: subscription.service_id,
638            instance_id: subscription.instance_id,
639            major_version: subscription.major_version,
640            ttl: 0, // TTL=0 indicates NACK
641            counter: subscription.counter,
642            event_group_id: subscription.event_group_id,
643        });
644
645        // Create SD header
646        let mut entries = SdEntries::<E>::new();
647        entries.push(nack_entry).expect(
648            "SdEntries<E> must have capacity for at least one entry when sending SubscribeNack",
649        );
650        let sd_payload = sd::Header::<E, O> {
651            flags: Flags::new(true, true), // reboot + unicast flags set
652            entries,
653            options: SdOptions::<O>::new(),
654        };
655
656        // Encode SD payload
657        let mut sd_data = Vec::new();
658        sd_payload.encode(&mut sd_data)?;
659
660        // Wrap in SOME/IP header
661        let sid = self.next_sd_session_id();
662        let someip_header = SomeIpHeader {
663            message_id: MessageId::SD,
664            length: someip_length(sd_data.len()),
665            request_id: sid,
666            protocol_version: 0x01,
667            interface_version: 0x01,
668            message_type: MessageTypeField::new(MessageType::Notification, false),
669            return_code: ReturnCode::Ok,
670        };
671
672        // Encode complete message
673        let mut buffer = Vec::new();
674        someip_header.encode(&mut buffer)?;
675        buffer.extend_from_slice(&sd_data);
676
677        // Send SubscribeNack to the subscriber
678        self.unicast_socket.send_to(&buffer, subscriber).await?;
679
680        tracing::warn!(
681            "Sent SubscribeNack to {} for service 0x{:04X}, eventgroup 0x{:04X} (reason: {})",
682            subscriber,
683            subscription.service_id,
684            subscription.event_group_id,
685            reason
686        );
687
688        Ok(())
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695    use crate::protocol::{
696        Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
697    };
698    use crate::traits::WireFormat;
699
700    /// All server tests bind the SD multicast port (30490), so they must run
701    /// serially to avoid `AddrInUse` failures.
702    static SD_PORT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
703
704    #[tokio::test]
705    async fn test_server_creation() {
706        let _lock = SD_PORT_LOCK.lock().await;
707        let config = ServerConfig::new(Ipv4Addr::new(127, 0, 0, 1), 30682, 0x5B, 1);
708
709        let server: Result<Server, _> = Server::new(config).await;
710        assert!(server.is_ok());
711    }
712
713    /// Helper: wrap an SD header in a SOME/IP SD message and return the bytes
714    fn build_sd_message(sd_header: &sd::Header) -> Vec<u8> {
715        let mut sd_data = Vec::new();
716        sd_header.encode(&mut sd_data).unwrap();
717
718        let someip_header = SomeIpHeader {
719            message_id: MessageId::SD,
720            length: someip_length(sd_data.len()),
721            request_id: 0x0001,
722            protocol_version: 0x01,
723            interface_version: 0x01,
724            message_type: MessageTypeField::new(MessageType::Notification, false),
725            return_code: ReturnCode::Ok,
726        };
727
728        let mut buffer = Vec::new();
729        someip_header.encode(&mut buffer).unwrap();
730        buffer.extend_from_slice(&sd_data);
731        buffer
732    }
733
734    /// Helper: parse a SubscribeAck/Nack from raw response bytes, returns the TTL
735    fn parse_subscribe_ack_ttl(data: &[u8]) -> u32 {
736        let mut reader = data;
737        let _header = SomeIpHeader::decode(&mut reader).expect("Failed to parse SOME/IP header");
738        let sd_msg: sd::Header =
739            sd::Header::decode(&mut reader).expect("Failed to parse SD header");
740        assert_eq!(
741            sd_msg.entries.len(),
742            1,
743            "Expected exactly 1 entry in response"
744        );
745        match &sd_msg.entries[0] {
746            sd::Entry::SubscribeAckEventGroup(entry) => entry.ttl,
747            other => panic!("Expected SubscribeAckEventGroup, got {:?}", other),
748        }
749    }
750
751    /// Helper: create a server on an ephemeral port and return (Server, port)
752    async fn create_test_server(service_id: u16, instance_id: u16) -> (Server, u16) {
753        // Use port 0 to get an ephemeral port
754        let config = ServerConfig::new(Ipv4Addr::new(127, 0, 0, 1), 0, service_id, instance_id);
755        let mut server: Server = Server::new(config).await.expect("Failed to create server");
756        let local_addr = server.unicast_socket.local_addr().unwrap();
757        let port = match local_addr {
758            std::net::SocketAddr::V4(addr) => addr.port(),
759            _ => panic!("Expected IPv4 address"),
760        };
761        // Update config to reflect actual bound port
762        server.config.local_port = port;
763        (server, port)
764    }
765
766    #[tokio::test]
767    async fn test_subscribe_ack_success() {
768        let _lock = SD_PORT_LOCK.lock().await;
769        let (mut server, server_port) = create_test_server(0x5B, 1).await;
770
771        // Create a client socket to send subscription and receive response
772        let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
773        let _client_addr = client_socket.local_addr().unwrap();
774
775        // Build a SubscribeEventGroup message with the correct port
776        let sd_header = sd::Header::new_subscription(
777            0x5B,
778            1,
779            1,
780            3,
781            0x01,
782            Ipv4Addr::new(127, 0, 0, 1),
783            sd::TransportProtocol::Udp,
784            server_port, // Correct port
785        );
786        let message = build_sd_message(&sd_header);
787
788        // Send to the server
789        client_socket
790            .send_to(&message, format!("127.0.0.1:{}", server_port))
791            .await
792            .unwrap();
793
794        // Run server to process one message (with a timeout)
795        let server_handle = tokio::spawn(async move {
796            // We'll manually process one iteration instead of calling run() which loops forever
797            let mut buf = vec![0u8; 65535];
798            let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
799            let data = &buf[..len];
800            let mut reader: &[u8] = data;
801            let header = SomeIpHeader::decode(&mut reader).unwrap();
802            assert_eq!(header.message_id.service_id(), 0xFFFF);
803            let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
804            server.handle_sd_message(sd_msg, addr).await.unwrap();
805
806            // Check subscription was added
807            let subs = server.subscriptions.read().await;
808            assert_eq!(subs.subscription_count(), 1);
809            let subscribers = subs.get_subscribers(0x5B, 1, 0x01);
810            assert_eq!(subscribers.len(), 1);
811        });
812
813        // Receive the ACK response
814        let mut resp_buf = vec![0u8; 65535];
815        let (resp_len, _) = tokio::time::timeout(
816            std::time::Duration::from_secs(2),
817            client_socket.recv_from(&mut resp_buf),
818        )
819        .await
820        .expect("Timeout waiting for SubscribeAck")
821        .unwrap();
822
823        let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
824        assert!(ttl > 0, "Expected ACK (TTL > 0), got TTL={}", ttl);
825
826        server_handle.await.unwrap();
827    }
828
829    #[tokio::test]
830    async fn test_subscribe_nack_wrong_service() {
831        let _lock = SD_PORT_LOCK.lock().await;
832        let (mut server, server_port) = create_test_server(0x5B, 1).await;
833        let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
834
835        // Subscribe with wrong service ID (0x99 instead of 0x5B)
836        let sd_header = sd::Header::new_subscription(
837            0x99, // Wrong service
838            1,
839            1,
840            3,
841            0x01,
842            Ipv4Addr::new(127, 0, 0, 1),
843            sd::TransportProtocol::Udp,
844            server_port,
845        );
846        let message = build_sd_message(&sd_header);
847        client_socket
848            .send_to(&message, format!("127.0.0.1:{}", server_port))
849            .await
850            .unwrap();
851
852        // Process the message
853        let server_handle = tokio::spawn(async move {
854            let mut buf = vec![0u8; 65535];
855            let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
856            let mut reader: &[u8] = &buf[..len];
857            let _header = SomeIpHeader::decode(&mut reader).unwrap();
858            let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
859            server.handle_sd_message(sd_msg, addr).await.unwrap();
860
861            // No subscription should have been added
862            let subs = server.subscriptions.read().await;
863            assert_eq!(subs.subscription_count(), 0);
864        });
865
866        // Receive the NACK response
867        let mut resp_buf = vec![0u8; 65535];
868        let (resp_len, _) = tokio::time::timeout(
869            std::time::Duration::from_secs(2),
870            client_socket.recv_from(&mut resp_buf),
871        )
872        .await
873        .expect("Timeout waiting for SubscribeNack")
874        .unwrap();
875
876        let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
877        assert_eq!(ttl, 0, "Expected NACK (TTL=0), got TTL={}", ttl);
878
879        server_handle.await.unwrap();
880    }
881
882    #[tokio::test]
883    async fn test_subscribe_nack_wrong_instance() {
884        let _lock = SD_PORT_LOCK.lock().await;
885        let (mut server, server_port) = create_test_server(0x5B, 1).await;
886        let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
887
888        // Subscribe with wrong instance ID (99 instead of 1)
889        let sd_header = sd::Header::new_subscription(
890            0x5B,
891            99, // Wrong instance
892            1,
893            3,
894            0x01,
895            Ipv4Addr::new(127, 0, 0, 1),
896            sd::TransportProtocol::Udp,
897            server_port,
898        );
899        let message = build_sd_message(&sd_header);
900        client_socket
901            .send_to(&message, format!("127.0.0.1:{}", server_port))
902            .await
903            .unwrap();
904
905        let server_handle = tokio::spawn(async move {
906            let mut buf = vec![0u8; 65535];
907            let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
908            let mut reader: &[u8] = &buf[..len];
909            let _header = SomeIpHeader::decode(&mut reader).unwrap();
910            let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
911            server.handle_sd_message(sd_msg, addr).await.unwrap();
912
913            let subs = server.subscriptions.read().await;
914            assert_eq!(subs.subscription_count(), 0);
915        });
916
917        let mut resp_buf = vec![0u8; 65535];
918        let (resp_len, _) = tokio::time::timeout(
919            std::time::Duration::from_secs(2),
920            client_socket.recv_from(&mut resp_buf),
921        )
922        .await
923        .expect("Timeout waiting for SubscribeNack")
924        .unwrap();
925
926        let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
927        assert_eq!(ttl, 0, "Expected NACK (TTL=0), got TTL={}", ttl);
928
929        server_handle.await.unwrap();
930    }
931
932    #[tokio::test]
933    async fn test_subscribe_ack_different_endpoint_port() {
934        let _lock = SD_PORT_LOCK.lock().await;
935        let (mut server, server_port) = create_test_server(0x5B, 1).await;
936        let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
937
938        // Subscribe with a different endpoint port (subscriber's own receive port)
939        // This should succeed — the endpoint port is where the subscriber wants events sent
940        let sd_header = sd::Header::new_subscription(
941            0x5B,
942            1,
943            1,
944            3,
945            0x01,
946            Ipv4Addr::new(127, 0, 0, 1),
947            sd::TransportProtocol::Udp,
948            server_port.wrapping_add(1), // Subscriber's port, different from server
949        );
950        let message = build_sd_message(&sd_header);
951        client_socket
952            .send_to(&message, format!("127.0.0.1:{}", server_port))
953            .await
954            .unwrap();
955
956        let server_handle = tokio::spawn(async move {
957            let mut buf = vec![0u8; 65535];
958            let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
959            let mut reader: &[u8] = &buf[..len];
960            let _header = SomeIpHeader::decode(&mut reader).unwrap();
961            let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
962            server.handle_sd_message(sd_msg, addr).await.unwrap();
963
964            // Subscription should have been added
965            let subs = server.subscriptions.read().await;
966            assert_eq!(subs.subscription_count(), 1);
967        });
968
969        let mut resp_buf = vec![0u8; 65535];
970        let (resp_len, _) = tokio::time::timeout(
971            std::time::Duration::from_secs(2),
972            client_socket.recv_from(&mut resp_buf),
973        )
974        .await
975        .expect("Timeout waiting for SubscribeAck")
976        .unwrap();
977
978        let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
979        assert!(ttl > 0, "Expected ACK (TTL > 0), got TTL={}", ttl);
980
981        server_handle.await.unwrap();
982    }
983}