1mod event_publisher;
10mod service_info;
11mod subscription_manager;
12
13pub use event_publisher::EventPublisher;
14pub use service_info::{EventGroupInfo, ServiceInfo};
15pub use subscription_manager::SubscriptionManager;
16
17use crate::Error;
18use crate::protocol::sd::{
19 self, Entry, Flags, OptionsCount, SdEntries, SdOptions, ServiceEntry, TransportProtocol,
20};
21use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU16, Ordering};
24use tokio::net::UdpSocket;
25use tokio::sync::RwLock;
26
27pub(crate) fn someip_length(payload_len: usize) -> u32 {
31 const HEADER_OVERHEAD: usize = 8;
32 let total = payload_len + HEADER_OVERHEAD;
33 u32::try_from(total).expect("SOME/IP payload too large: length exceeds u32::MAX")
34}
35
36#[derive(Debug, Clone)]
38pub struct ServerConfig {
39 pub interface: Ipv4Addr,
41 pub local_port: u16,
43 pub service_id: u16,
45 pub instance_id: u16,
47 pub major_version: u8,
49 pub minor_version: u32,
51 pub ttl: u32,
53}
54
55impl ServerConfig {
56 #[must_use]
58 pub fn new(interface: Ipv4Addr, local_port: u16, service_id: u16, instance_id: u16) -> Self {
59 Self {
60 interface,
61 local_port,
62 service_id,
63 instance_id,
64 major_version: 1,
65 minor_version: 0,
66 ttl: 3, }
68 }
69}
70
71pub struct Server<
73 const MAX_ENTRIES: usize = { sd::MAX_SD_ENTRIES },
74 const MAX_OPTIONS: usize = { sd::MAX_SD_OPTIONS },
75> {
76 config: ServerConfig,
77 unicast_socket: Arc<UdpSocket>,
79 sd_socket: Arc<UdpSocket>,
81 subscriptions: Arc<RwLock<SubscriptionManager>>,
83 publisher: Arc<EventPublisher>,
85 sd_session_id: Arc<AtomicU16>,
87}
88
89impl<const E: usize, const O: usize> Server<E, O> {
90 pub async fn new(config: ServerConfig) -> Result<Self, Error> {
92 let unicast_addr = SocketAddrV4::new(config.interface, config.local_port);
94 let unicast_socket = Arc::new(UdpSocket::bind(unicast_addr).await?);
95 tracing::info!(
96 "Server bound to {} for service 0x{:04X}",
97 unicast_addr,
98 config.service_id
99 );
100
101 let expected_sd_port = crate::SD_MULTICAST_PORT;
103 let sd_bind_addr =
104 std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), expected_sd_port);
105 let sd_raw_socket = socket2::Socket::new(
106 socket2::Domain::IPV4,
107 socket2::Type::DGRAM,
108 Some(socket2::Protocol::UDP),
109 )?;
110 sd_raw_socket.set_reuse_address(true)?;
111 sd_raw_socket.bind(&sd_bind_addr.into())?;
112 sd_raw_socket.set_nonblocking(true)?;
113 let sd_std_socket: std::net::UdpSocket = sd_raw_socket.into();
114 let sd_socket = UdpSocket::from_std(sd_std_socket)?;
115
116 sd_socket.join_multicast_v4(crate::SD_MULTICAST_IP, config.interface)?;
118 let actual_sd_addr = sd_socket.local_addr()?;
119 tracing::info!(
120 "Server SD socket bound to {} (expected port {}), joined multicast {}",
121 actual_sd_addr,
122 expected_sd_port,
123 crate::SD_MULTICAST_IP
124 );
125 if let std::net::SocketAddr::V4(v4) = actual_sd_addr
126 && v4.port() != expected_sd_port
127 {
128 tracing::error!(
129 "SD socket port mismatch! Expected {}, got {}. Offers will use wrong source port.",
130 expected_sd_port,
131 v4.port()
132 );
133 }
134
135 let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new()));
136 let publisher = Arc::new(EventPublisher::new(
137 Arc::clone(&subscriptions),
138 Arc::clone(&unicast_socket),
139 ));
140
141 Ok(Self {
142 config,
143 unicast_socket,
144 sd_socket: Arc::new(sd_socket),
145 subscriptions,
146 publisher,
147 sd_session_id: Arc::new(AtomicU16::new(1)),
148 })
149 }
150
151 pub fn start_announcing(&self) -> Result<(), Error> {
155 let config = self.config.clone();
156 let sd_socket = Arc::clone(&self.sd_socket);
157 let sd_session_id = Arc::clone(&self.sd_session_id);
158
159 tokio::spawn(async move {
160 let mut announcement_count = 0u32;
161 loop {
162 match Self::send_offer_service(&config, &sd_socket, &sd_session_id).await {
163 Ok(()) => {
164 announcement_count += 1;
165 if announcement_count == 1 {
166 tracing::info!(
167 "Sent first SD announcement for service 0x{:04X}",
168 config.service_id
169 );
170 } else {
171 tracing::debug!(
172 "Sent {} SD announcements for service 0x{:04X}",
173 announcement_count,
174 config.service_id
175 );
176 }
177 }
178 Err(e) => {
179 tracing::error!("Failed to send OfferService: {:?}", e);
180 }
181 }
182
183 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
185 }
186 });
187
188 Ok(())
189 }
190
191 async fn send_offer_service(
193 config: &ServerConfig,
194 socket: &UdpSocket,
195 session_id: &AtomicU16,
196 ) -> Result<(), Error> {
197 use crate::protocol::{
198 Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
199 };
200 use crate::traits::WireFormat;
201
202 let entry = Entry::OfferService(ServiceEntry {
204 index_first_options_run: 0,
205 index_second_options_run: 0,
206 options_count: OptionsCount::new(1, 0),
207 service_id: config.service_id,
208 instance_id: config.instance_id,
209 major_version: config.major_version,
210 ttl: config.ttl,
211 minor_version: config.minor_version,
212 });
213
214 let option = sd::Options::IpV4Endpoint {
216 ip: config.interface,
217 port: config.local_port,
218 protocol: TransportProtocol::Udp,
219 };
220
221 let mut entries = SdEntries::<E>::new();
223 let mut options = SdOptions::<O>::new();
224 entries
225 .push(entry)
226 .expect("SdEntries capacity E must be at least 1 to send OfferService");
227 options
228 .push(option)
229 .expect("SdOptions capacity O must be at least 1 to send OfferService");
230 let sd_payload = sd::Header::<E, O> {
231 flags: Flags::new(true, true),
232 entries,
233 options,
234 };
235
236 let mut sd_data = Vec::new();
238 sd_payload.encode(&mut sd_data)?;
239
240 let prev = session_id
242 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
243 let next = v.wrapping_add(1);
244 Some(if next == 0 { 1 } else { next })
245 })
246 .unwrap();
247 let next = prev.wrapping_add(1);
248 let sid = u32::from(if next == 0 { 1 } else { next });
249
250 let someip_header = SomeIpHeader {
252 message_id: MessageId::SD,
253 length: someip_length(sd_data.len()),
254 request_id: sid,
255 protocol_version: 0x01,
256 interface_version: 0x01,
257 message_type: MessageTypeField::new(MessageType::Notification, false),
258 return_code: ReturnCode::Ok,
259 };
260
261 let mut buffer = Vec::new();
263 someip_header.encode(&mut buffer)?;
264 buffer.extend_from_slice(&sd_data);
265
266 let multicast_addr = SocketAddrV4::new(crate::SD_MULTICAST_IP, crate::SD_MULTICAST_PORT);
267
268 tracing::trace!(
269 "Sending OfferService: service=0x{:04X}, instance={}, port={}, size={} bytes",
270 config.service_id,
271 config.instance_id,
272 config.local_port,
273 buffer.len()
274 );
275 tracing::trace!(
276 "OfferService data: {:02X?}",
277 &buffer[..buffer.len().min(64)]
278 );
279
280 socket.send_to(&buffer, multicast_addr).await?;
281 tracing::trace!("Sent to {}", multicast_addr);
282
283 Ok(())
284 }
285
286 async fn send_unicast_offer(&self, target: std::net::SocketAddr) -> Result<(), Error> {
288 use crate::protocol::{
289 Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
290 };
291 use crate::traits::WireFormat;
292
293 let entry = Entry::OfferService(ServiceEntry {
294 index_first_options_run: 0,
295 index_second_options_run: 0,
296 options_count: OptionsCount::new(1, 0),
297 service_id: self.config.service_id,
298 instance_id: self.config.instance_id,
299 major_version: self.config.major_version,
300 ttl: self.config.ttl,
301 minor_version: self.config.minor_version,
302 });
303
304 let option = sd::Options::IpV4Endpoint {
305 ip: self.config.interface,
306 port: self.config.local_port,
307 protocol: TransportProtocol::Udp,
308 };
309
310 let mut entries = SdEntries::<E>::new();
311 let mut options = SdOptions::<O>::new();
312 entries
313 .push(entry)
314 .expect("SdEntries capacity E must be at least 1 for unicast offers");
315 options
316 .push(option)
317 .expect("SdOptions capacity O must be at least 1 for unicast offers");
318 let sd_payload = sd::Header::<E, O> {
319 flags: Flags::new(true, true), entries,
321 options,
322 };
323
324 let mut sd_data = Vec::new();
325 sd_payload.encode(&mut sd_data)?;
326
327 let sid = self.next_sd_session_id();
328 let someip_header = SomeIpHeader {
329 message_id: MessageId::SD,
330 length: someip_length(sd_data.len()),
331 request_id: sid,
332 protocol_version: 0x01,
333 interface_version: 0x01,
334 message_type: MessageTypeField::new(MessageType::Notification, false),
335 return_code: ReturnCode::Ok,
336 };
337
338 let mut buffer = Vec::new();
339 someip_header.encode(&mut buffer)?;
340 buffer.extend_from_slice(&sd_data);
341
342 self.sd_socket.send_to(&buffer, target).await?;
343 tracing::debug!(
344 "Sent unicast OfferService to {} for service 0x{:04X}",
345 target,
346 self.config.service_id
347 );
348
349 Ok(())
350 }
351
352 fn next_sd_session_id(&self) -> u32 {
354 let prev = self
355 .sd_session_id
356 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
357 let next = v.wrapping_add(1);
358 Some(if next == 0 { 1 } else { next })
359 })
360 .unwrap();
361 let next = prev.wrapping_add(1);
363 u32::from(if next == 0 { 1 } else { next })
364 }
365
366 #[must_use]
368 pub fn publisher(&self) -> Arc<EventPublisher> {
369 Arc::clone(&self.publisher)
370 }
371
372 pub async fn run(&mut self) -> Result<(), Error> {
378 use crate::protocol::Header as SomeIpHeader;
379 use crate::traits::WireFormat;
380
381 let mut unicast_buf = vec![0u8; 65535];
382 let mut sd_buf = vec![0u8; 65535];
383
384 loop {
385 let (data, len, addr, source) = tokio::select! {
386 result = self.unicast_socket.recv_from(&mut unicast_buf) => {
387 let (len, addr) = result?;
388 (&unicast_buf[..], len, addr, "unicast")
389 }
390 result = self.sd_socket.recv_from(&mut sd_buf) => {
391 let (len, addr) = result?;
392 (&sd_buf[..], len, addr, "sd-multicast")
393 }
394 };
395 let data = &data[..len];
396
397 if let std::net::SocketAddr::V4(v4) = addr
399 && *v4.ip() == self.config.interface
400 && source == "sd-multicast"
401 {
402 tracing::trace!("Ignoring our own SD multicast message");
403 continue;
404 }
405
406 tracing::trace!("Received {} bytes from {} on {} socket", len, addr, source);
407 tracing::trace!("Raw data: {:02X?}", &data[..len.min(64)]);
408
409 let mut reader = data;
411 match SomeIpHeader::decode(&mut reader) {
412 Ok(header) => {
413 tracing::trace!(
414 "SOME/IP Header: service=0x{:04X}, method=0x{:04X}, type={:?}",
415 header.message_id.service_id(),
416 header.message_id.method_id(),
417 header.message_type.message_type()
418 );
419
420 if header.message_id.service_id() == 0xFFFF
422 && header.message_id.method_id() == 0x8100
423 {
424 tracing::trace!("This is an SD message");
425 match sd::Header::<E, O>::decode(&mut reader) {
427 Ok(sd_msg) => {
428 tracing::trace!(
429 "SD message has {} entries, {} options",
430 sd_msg.entries.len(),
431 sd_msg.options.len()
432 );
433 self.handle_sd_message(sd_msg, addr).await?;
434 }
435 Err(e) => {
436 tracing::warn!("Failed to parse SD message: {:?}", e);
437 }
438 }
439 } else {
440 tracing::trace!("Non-SD SOME/IP message, ignoring");
441 }
442 }
443 Err(e) => {
444 tracing::warn!("Failed to parse SOME/IP header from {}: {:?}", addr, e);
445 tracing::trace!("Data: {:02X?}", &data[..len.min(32)]);
446 }
447 }
448 }
449 }
450
451 async fn handle_sd_message(
453 &mut self,
454 sd_msg: sd::Header<E, O>,
455 sender: std::net::SocketAddr,
456 ) -> Result<(), Error> {
457 tracing::trace!("Handling SD message from {}", sender);
458
459 for entry in &sd_msg.entries {
460 match entry {
461 Entry::SubscribeEventGroup(sub) => {
462 tracing::debug!(
463 "Received Subscribe from {}: service=0x{:04X}, instance={}, eventgroup=0x{:04X}",
464 sender,
465 sub.service_id,
466 sub.instance_id,
467 sub.event_group_id
468 );
469
470 if sub.service_id != self.config.service_id {
472 tracing::warn!(
473 "Subscribe for wrong service: expected 0x{:04X}, got 0x{:04X}",
474 self.config.service_id,
475 sub.service_id
476 );
477 self.send_subscribe_nack(sub, sender, "Wrong service ID")
478 .await?;
479 } else if sub.instance_id != self.config.instance_id {
480 tracing::warn!(
481 "Subscribe for wrong instance: expected {}, got {}",
482 self.config.instance_id,
483 sub.instance_id
484 );
485 self.send_subscribe_nack(sub, sender, "Wrong instance ID")
486 .await?;
487 } else {
488 if let Some(endpoint_addr) = Self::extract_endpoint(&sd_msg.options) {
490 let mut subs = self.subscriptions.write().await;
493 subs.subscribe(
494 sub.service_id,
495 sub.instance_id,
496 sub.event_group_id,
497 endpoint_addr,
498 );
499
500 self.send_subscribe_ack(sub, sender).await?;
502 } else {
503 tracing::warn!("No endpoint found in Subscribe message options");
504 self.send_subscribe_nack(sub, sender, "No endpoint in options")
505 .await?;
506 }
507 }
508 }
509 Entry::FindService(find) => {
510 if find.service_id == self.config.service_id || find.service_id == 0xFFFF {
512 tracing::debug!(
513 "Received FindService from {} for service 0x{:04X} (ours: 0x{:04X}), sending unicast offer",
514 sender,
515 find.service_id,
516 self.config.service_id
517 );
518 self.send_unicast_offer(sender).await?;
519 } else {
520 tracing::trace!(
521 "Ignoring FindService for service 0x{:04X} (not ours)",
522 find.service_id
523 );
524 }
525 }
526 _ => {
527 tracing::trace!("Ignoring SD entry: {:?}", entry);
528 }
529 }
530 }
531
532 Ok(())
533 }
534
535 fn extract_endpoint(options: &[sd::Options]) -> Option<SocketAddrV4> {
537 tracing::trace!("Extracting endpoint from {} options", options.len());
538 for option in options {
539 tracing::trace!("Option: {:?}", option);
540 if let sd::Options::IpV4Endpoint { ip, port, .. } = option {
541 tracing::trace!("Found IPv4 endpoint: {}:{}", ip, port);
542 return Some(SocketAddrV4::new(*ip, *port));
543 }
544 }
545 tracing::warn!("No IPv4 endpoint found in options");
546 None
547 }
548
549 async fn send_subscribe_ack(
551 &self,
552 subscription: &sd::EventGroupEntry,
553 subscriber: std::net::SocketAddr,
554 ) -> Result<(), Error> {
555 use crate::protocol::{
556 Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
557 };
558 use crate::traits::WireFormat;
559
560 let ack_entry = Entry::SubscribeAckEventGroup(sd::EventGroupEntry {
562 index_first_options_run: 0,
563 index_second_options_run: 0,
564 options_count: OptionsCount::new(0, 0),
565 service_id: subscription.service_id,
566 instance_id: subscription.instance_id,
567 major_version: subscription.major_version,
568 ttl: self.config.ttl,
569 counter: subscription.counter,
570 event_group_id: subscription.event_group_id,
571 });
572
573 let mut entries = SdEntries::<E>::new();
575 entries
576 .push(ack_entry)
577 .expect("SdEntries capacity E must allow at least one entry for SubscribeAck");
578 let sd_payload = sd::Header::<E, O> {
579 flags: Flags::new(true, true), entries,
581 options: SdOptions::<O>::new(),
582 };
583
584 let mut sd_data = Vec::new();
586 sd_payload.encode(&mut sd_data)?;
587
588 let sid = self.next_sd_session_id();
590 let someip_header = SomeIpHeader {
591 message_id: MessageId::SD,
592 length: someip_length(sd_data.len()),
593 request_id: sid,
594 protocol_version: 0x01,
595 interface_version: 0x01,
596 message_type: MessageTypeField::new(MessageType::Notification, false),
597 return_code: ReturnCode::Ok,
598 };
599
600 let mut buffer = Vec::new();
602 someip_header.encode(&mut buffer)?;
603 buffer.extend_from_slice(&sd_data);
604
605 self.unicast_socket.send_to(&buffer, subscriber).await?;
607
608 tracing::debug!(
609 "Sent SubscribeAck to {} for service 0x{:04X}, eventgroup 0x{:04X}",
610 subscriber,
611 subscription.service_id,
612 subscription.event_group_id
613 );
614
615 Ok(())
616 }
617
618 async fn send_subscribe_nack(
622 &self,
623 subscription: &sd::EventGroupEntry,
624 subscriber: std::net::SocketAddr,
625 reason: &str,
626 ) -> Result<(), Error> {
627 use crate::protocol::{
628 Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
629 };
630 use crate::traits::WireFormat;
631
632 let nack_entry = Entry::SubscribeAckEventGroup(sd::EventGroupEntry {
634 index_first_options_run: 0,
635 index_second_options_run: 0,
636 options_count: OptionsCount::new(0, 0),
637 service_id: subscription.service_id,
638 instance_id: subscription.instance_id,
639 major_version: subscription.major_version,
640 ttl: 0, counter: subscription.counter,
642 event_group_id: subscription.event_group_id,
643 });
644
645 let mut entries = SdEntries::<E>::new();
647 entries.push(nack_entry).expect(
648 "SdEntries<E> must have capacity for at least one entry when sending SubscribeNack",
649 );
650 let sd_payload = sd::Header::<E, O> {
651 flags: Flags::new(true, true), entries,
653 options: SdOptions::<O>::new(),
654 };
655
656 let mut sd_data = Vec::new();
658 sd_payload.encode(&mut sd_data)?;
659
660 let sid = self.next_sd_session_id();
662 let someip_header = SomeIpHeader {
663 message_id: MessageId::SD,
664 length: someip_length(sd_data.len()),
665 request_id: sid,
666 protocol_version: 0x01,
667 interface_version: 0x01,
668 message_type: MessageTypeField::new(MessageType::Notification, false),
669 return_code: ReturnCode::Ok,
670 };
671
672 let mut buffer = Vec::new();
674 someip_header.encode(&mut buffer)?;
675 buffer.extend_from_slice(&sd_data);
676
677 self.unicast_socket.send_to(&buffer, subscriber).await?;
679
680 tracing::warn!(
681 "Sent SubscribeNack to {} for service 0x{:04X}, eventgroup 0x{:04X} (reason: {})",
682 subscriber,
683 subscription.service_id,
684 subscription.event_group_id,
685 reason
686 );
687
688 Ok(())
689 }
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695 use crate::protocol::{
696 Header as SomeIpHeader, MessageId, MessageType, MessageTypeField, ReturnCode,
697 };
698 use crate::traits::WireFormat;
699
700 static SD_PORT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
703
704 #[tokio::test]
705 async fn test_server_creation() {
706 let _lock = SD_PORT_LOCK.lock().await;
707 let config = ServerConfig::new(Ipv4Addr::new(127, 0, 0, 1), 30682, 0x5B, 1);
708
709 let server: Result<Server, _> = Server::new(config).await;
710 assert!(server.is_ok());
711 }
712
713 fn build_sd_message(sd_header: &sd::Header) -> Vec<u8> {
715 let mut sd_data = Vec::new();
716 sd_header.encode(&mut sd_data).unwrap();
717
718 let someip_header = SomeIpHeader {
719 message_id: MessageId::SD,
720 length: someip_length(sd_data.len()),
721 request_id: 0x0001,
722 protocol_version: 0x01,
723 interface_version: 0x01,
724 message_type: MessageTypeField::new(MessageType::Notification, false),
725 return_code: ReturnCode::Ok,
726 };
727
728 let mut buffer = Vec::new();
729 someip_header.encode(&mut buffer).unwrap();
730 buffer.extend_from_slice(&sd_data);
731 buffer
732 }
733
734 fn parse_subscribe_ack_ttl(data: &[u8]) -> u32 {
736 let mut reader = data;
737 let _header = SomeIpHeader::decode(&mut reader).expect("Failed to parse SOME/IP header");
738 let sd_msg: sd::Header =
739 sd::Header::decode(&mut reader).expect("Failed to parse SD header");
740 assert_eq!(
741 sd_msg.entries.len(),
742 1,
743 "Expected exactly 1 entry in response"
744 );
745 match &sd_msg.entries[0] {
746 sd::Entry::SubscribeAckEventGroup(entry) => entry.ttl,
747 other => panic!("Expected SubscribeAckEventGroup, got {:?}", other),
748 }
749 }
750
751 async fn create_test_server(service_id: u16, instance_id: u16) -> (Server, u16) {
753 let config = ServerConfig::new(Ipv4Addr::new(127, 0, 0, 1), 0, service_id, instance_id);
755 let mut server: Server = Server::new(config).await.expect("Failed to create server");
756 let local_addr = server.unicast_socket.local_addr().unwrap();
757 let port = match local_addr {
758 std::net::SocketAddr::V4(addr) => addr.port(),
759 _ => panic!("Expected IPv4 address"),
760 };
761 server.config.local_port = port;
763 (server, port)
764 }
765
766 #[tokio::test]
767 async fn test_subscribe_ack_success() {
768 let _lock = SD_PORT_LOCK.lock().await;
769 let (mut server, server_port) = create_test_server(0x5B, 1).await;
770
771 let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
773 let _client_addr = client_socket.local_addr().unwrap();
774
775 let sd_header = sd::Header::new_subscription(
777 0x5B,
778 1,
779 1,
780 3,
781 0x01,
782 Ipv4Addr::new(127, 0, 0, 1),
783 sd::TransportProtocol::Udp,
784 server_port, );
786 let message = build_sd_message(&sd_header);
787
788 client_socket
790 .send_to(&message, format!("127.0.0.1:{}", server_port))
791 .await
792 .unwrap();
793
794 let server_handle = tokio::spawn(async move {
796 let mut buf = vec![0u8; 65535];
798 let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
799 let data = &buf[..len];
800 let mut reader: &[u8] = data;
801 let header = SomeIpHeader::decode(&mut reader).unwrap();
802 assert_eq!(header.message_id.service_id(), 0xFFFF);
803 let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
804 server.handle_sd_message(sd_msg, addr).await.unwrap();
805
806 let subs = server.subscriptions.read().await;
808 assert_eq!(subs.subscription_count(), 1);
809 let subscribers = subs.get_subscribers(0x5B, 1, 0x01);
810 assert_eq!(subscribers.len(), 1);
811 });
812
813 let mut resp_buf = vec![0u8; 65535];
815 let (resp_len, _) = tokio::time::timeout(
816 std::time::Duration::from_secs(2),
817 client_socket.recv_from(&mut resp_buf),
818 )
819 .await
820 .expect("Timeout waiting for SubscribeAck")
821 .unwrap();
822
823 let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
824 assert!(ttl > 0, "Expected ACK (TTL > 0), got TTL={}", ttl);
825
826 server_handle.await.unwrap();
827 }
828
829 #[tokio::test]
830 async fn test_subscribe_nack_wrong_service() {
831 let _lock = SD_PORT_LOCK.lock().await;
832 let (mut server, server_port) = create_test_server(0x5B, 1).await;
833 let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
834
835 let sd_header = sd::Header::new_subscription(
837 0x99, 1,
839 1,
840 3,
841 0x01,
842 Ipv4Addr::new(127, 0, 0, 1),
843 sd::TransportProtocol::Udp,
844 server_port,
845 );
846 let message = build_sd_message(&sd_header);
847 client_socket
848 .send_to(&message, format!("127.0.0.1:{}", server_port))
849 .await
850 .unwrap();
851
852 let server_handle = tokio::spawn(async move {
854 let mut buf = vec![0u8; 65535];
855 let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
856 let mut reader: &[u8] = &buf[..len];
857 let _header = SomeIpHeader::decode(&mut reader).unwrap();
858 let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
859 server.handle_sd_message(sd_msg, addr).await.unwrap();
860
861 let subs = server.subscriptions.read().await;
863 assert_eq!(subs.subscription_count(), 0);
864 });
865
866 let mut resp_buf = vec![0u8; 65535];
868 let (resp_len, _) = tokio::time::timeout(
869 std::time::Duration::from_secs(2),
870 client_socket.recv_from(&mut resp_buf),
871 )
872 .await
873 .expect("Timeout waiting for SubscribeNack")
874 .unwrap();
875
876 let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
877 assert_eq!(ttl, 0, "Expected NACK (TTL=0), got TTL={}", ttl);
878
879 server_handle.await.unwrap();
880 }
881
882 #[tokio::test]
883 async fn test_subscribe_nack_wrong_instance() {
884 let _lock = SD_PORT_LOCK.lock().await;
885 let (mut server, server_port) = create_test_server(0x5B, 1).await;
886 let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
887
888 let sd_header = sd::Header::new_subscription(
890 0x5B,
891 99, 1,
893 3,
894 0x01,
895 Ipv4Addr::new(127, 0, 0, 1),
896 sd::TransportProtocol::Udp,
897 server_port,
898 );
899 let message = build_sd_message(&sd_header);
900 client_socket
901 .send_to(&message, format!("127.0.0.1:{}", server_port))
902 .await
903 .unwrap();
904
905 let server_handle = tokio::spawn(async move {
906 let mut buf = vec![0u8; 65535];
907 let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
908 let mut reader: &[u8] = &buf[..len];
909 let _header = SomeIpHeader::decode(&mut reader).unwrap();
910 let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
911 server.handle_sd_message(sd_msg, addr).await.unwrap();
912
913 let subs = server.subscriptions.read().await;
914 assert_eq!(subs.subscription_count(), 0);
915 });
916
917 let mut resp_buf = vec![0u8; 65535];
918 let (resp_len, _) = tokio::time::timeout(
919 std::time::Duration::from_secs(2),
920 client_socket.recv_from(&mut resp_buf),
921 )
922 .await
923 .expect("Timeout waiting for SubscribeNack")
924 .unwrap();
925
926 let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
927 assert_eq!(ttl, 0, "Expected NACK (TTL=0), got TTL={}", ttl);
928
929 server_handle.await.unwrap();
930 }
931
932 #[tokio::test]
933 async fn test_subscribe_ack_different_endpoint_port() {
934 let _lock = SD_PORT_LOCK.lock().await;
935 let (mut server, server_port) = create_test_server(0x5B, 1).await;
936 let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
937
938 let sd_header = sd::Header::new_subscription(
941 0x5B,
942 1,
943 1,
944 3,
945 0x01,
946 Ipv4Addr::new(127, 0, 0, 1),
947 sd::TransportProtocol::Udp,
948 server_port.wrapping_add(1), );
950 let message = build_sd_message(&sd_header);
951 client_socket
952 .send_to(&message, format!("127.0.0.1:{}", server_port))
953 .await
954 .unwrap();
955
956 let server_handle = tokio::spawn(async move {
957 let mut buf = vec![0u8; 65535];
958 let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap();
959 let mut reader: &[u8] = &buf[..len];
960 let _header = SomeIpHeader::decode(&mut reader).unwrap();
961 let sd_msg: sd::Header = sd::Header::decode(&mut reader).unwrap();
962 server.handle_sd_message(sd_msg, addr).await.unwrap();
963
964 let subs = server.subscriptions.read().await;
966 assert_eq!(subs.subscription_count(), 1);
967 });
968
969 let mut resp_buf = vec![0u8; 65535];
970 let (resp_len, _) = tokio::time::timeout(
971 std::time::Duration::from_secs(2),
972 client_socket.recv_from(&mut resp_buf),
973 )
974 .await
975 .expect("Timeout waiting for SubscribeAck")
976 .unwrap();
977
978 let ttl = parse_subscribe_ack_ttl(&resp_buf[..resp_len]);
979 assert!(ttl > 0, "Expected ACK (TTL > 0), got TTL={}", ttl);
980
981 server_handle.await.unwrap();
982 }
983}