simple_someip/server/
event_publisher.rs1use super::subscription_manager::SubscriptionManager;
4use crate::Error;
5use crate::protocol::{Header, Message, MessageType, MessageTypeField, ReturnCode};
6use crate::traits::{PayloadWireFormat, WireFormat};
7use std::sync::Arc;
8use tokio::net::UdpSocket;
9use tokio::sync::RwLock;
10
11pub struct EventPublisher {
13 subscriptions: Arc<RwLock<SubscriptionManager>>,
14 socket: Arc<UdpSocket>,
15}
16
17impl EventPublisher {
18 pub fn new(subscriptions: Arc<RwLock<SubscriptionManager>>, socket: Arc<UdpSocket>) -> Self {
20 Self {
21 subscriptions,
22 socket,
23 }
24 }
25
26 pub async fn publish_event<P: PayloadWireFormat>(
34 &self,
35 service_id: u16,
36 instance_id: u16,
37 event_group_id: u16,
38 message: &Message<P>,
39 ) -> Result<usize, Error> {
40 let subscribers = {
42 let mgr = self.subscriptions.read().await;
43 mgr.get_subscribers(service_id, instance_id, event_group_id)
44 };
45
46 if subscribers.is_empty() {
47 tracing::trace!(
48 "No subscribers for service 0x{:04X}, instance {}, event group 0x{:04X}",
49 service_id,
50 instance_id,
51 event_group_id
52 );
53 return Ok(0);
54 }
55
56 let mut buffer = Vec::new();
58 message.encode(&mut buffer)?;
59
60 let mut sent_count = 0;
62 for subscriber in &subscribers {
63 match self.socket.send_to(&buffer, subscriber.address).await {
64 Ok(_) => {
65 sent_count += 1;
66 tracing::trace!(
67 "Sent event to subscriber {} ({} bytes)",
68 subscriber.address,
69 buffer.len()
70 );
71 }
72 Err(e) => {
73 tracing::error!(
74 "Failed to send event to subscriber {}: {:?}",
75 subscriber.address,
76 e
77 );
78 }
79 }
80 }
81
82 tracing::debug!(
83 "Published event to {}/{} subscribers for service 0x{:04X}",
84 sent_count,
85 subscribers.len(),
86 service_id
87 );
88
89 Ok(sent_count)
90 }
91
92 #[allow(clippy::too_many_arguments)]
96 pub async fn publish_raw_event(
97 &self,
98 service_id: u16,
99 instance_id: u16,
100 event_group_id: u16,
101 event_id: u16,
102 request_id: u32,
103 protocol_version: u8,
104 interface_version: u8,
105 payload: &[u8],
106 ) -> Result<usize, Error> {
107 let subscribers = {
109 let mgr = self.subscriptions.read().await;
110 mgr.get_subscribers(service_id, instance_id, event_group_id)
111 };
112
113 if subscribers.is_empty() {
114 return Ok(0);
115 }
116
117 let header = Header {
119 message_id: crate::protocol::MessageId::new_from_service_and_method(
120 service_id, event_id,
121 ),
122 length: super::someip_length(payload.len()),
123 request_id,
124 protocol_version,
125 interface_version,
126 message_type: MessageTypeField::new(MessageType::Notification, false),
127 return_code: ReturnCode::Ok,
128 };
129
130 let mut buffer = Vec::new();
132 header.encode(&mut buffer)?;
133 buffer.extend_from_slice(payload);
134
135 let mut sent_count = 0;
137 for subscriber in &subscribers {
138 match self.socket.send_to(&buffer, subscriber.address).await {
139 Ok(_) => {
140 sent_count += 1;
141 }
142 Err(e) => {
143 tracing::error!(
144 "Failed to send raw event to {}: {:?}",
145 subscriber.address,
146 e
147 );
148 }
149 }
150 }
151
152 Ok(sent_count)
153 }
154
155 pub async fn has_subscribers(
165 &self,
166 service_id: u16,
167 instance_id: u16,
168 event_group_id: u16,
169 ) -> bool {
170 let mgr = self.subscriptions.read().await;
171 !mgr.get_subscribers(service_id, instance_id, event_group_id)
172 .is_empty()
173 }
174
175 pub async fn subscriber_count(
177 &self,
178 service_id: u16,
179 instance_id: u16,
180 event_group_id: u16,
181 ) -> usize {
182 let mgr = self.subscriptions.read().await;
183 mgr.get_subscribers(service_id, instance_id, event_group_id)
184 .len()
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 #[tokio::test]
192 async fn test_event_publisher_creation() {
193 let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new()));
194 let socket = Arc::new(
195 UdpSocket::bind("127.0.0.1:0")
196 .await
197 .expect("Failed to bind socket"),
198 );
199
200 let publisher = EventPublisher::new(subscriptions, socket);
201 assert!(std::mem::size_of_val(&publisher) > 0);
203 }
204}