Skip to main content

simple_someip/server/
event_publisher.rs

1//! Event publishing functionality
2
3use super::subscription_manager::SubscriptionManager;
4use crate::Error;
5use crate::protocol::{Header, Message, MessageType, MessageTypeField, ReturnCode};
6use crate::traits::{PayloadWireFormat, WireFormat};
7use std::sync::Arc;
8use tokio::net::UdpSocket;
9use tokio::sync::RwLock;
10
11/// Publishes events to subscribers
12pub struct EventPublisher {
13    subscriptions: Arc<RwLock<SubscriptionManager>>,
14    socket: Arc<UdpSocket>,
15}
16
17impl EventPublisher {
18    /// Create a new event publisher
19    pub fn new(subscriptions: Arc<RwLock<SubscriptionManager>>, socket: Arc<UdpSocket>) -> Self {
20        Self {
21            subscriptions,
22            socket,
23        }
24    }
25
26    /// Publish an event to all subscribers of an event group
27    ///
28    /// # Arguments
29    /// * `service_id` - Service ID
30    /// * `instance_id` - Instance ID
31    /// * `event_group_id` - Event group ID
32    /// * `message` - The SOME/IP message to send (must be a notification/event)
33    pub async fn publish_event<P: PayloadWireFormat>(
34        &self,
35        service_id: u16,
36        instance_id: u16,
37        event_group_id: u16,
38        message: &Message<P>,
39    ) -> Result<usize, Error> {
40        // Get subscribers
41        let subscribers = {
42            let mgr = self.subscriptions.read().await;
43            mgr.get_subscribers(service_id, instance_id, event_group_id)
44        };
45
46        if subscribers.is_empty() {
47            tracing::trace!(
48                "No subscribers for service 0x{:04X}, instance {}, event group 0x{:04X}",
49                service_id,
50                instance_id,
51                event_group_id
52            );
53            return Ok(0);
54        }
55
56        // Serialize the message once
57        let mut buffer = Vec::new();
58        message.encode(&mut buffer)?;
59
60        // Send to all subscribers
61        let mut sent_count = 0;
62        for subscriber in &subscribers {
63            match self.socket.send_to(&buffer, subscriber.address).await {
64                Ok(_) => {
65                    sent_count += 1;
66                    tracing::trace!(
67                        "Sent event to subscriber {} ({} bytes)",
68                        subscriber.address,
69                        buffer.len()
70                    );
71                }
72                Err(e) => {
73                    tracing::error!(
74                        "Failed to send event to subscriber {}: {:?}",
75                        subscriber.address,
76                        e
77                    );
78                }
79            }
80        }
81
82        tracing::debug!(
83            "Published event to {}/{} subscribers for service 0x{:04X}",
84            sent_count,
85            subscribers.len(),
86            service_id
87        );
88
89        Ok(sent_count)
90    }
91
92    /// Publish raw event data (already serialized with E2E protection)
93    ///
94    /// This is useful when you've already applied E2E protection to the payload
95    #[allow(clippy::too_many_arguments)]
96    pub async fn publish_raw_event(
97        &self,
98        service_id: u16,
99        instance_id: u16,
100        event_group_id: u16,
101        event_id: u16,
102        request_id: u32,
103        protocol_version: u8,
104        interface_version: u8,
105        payload: &[u8],
106    ) -> Result<usize, Error> {
107        // Get subscribers
108        let subscribers = {
109            let mgr = self.subscriptions.read().await;
110            mgr.get_subscribers(service_id, instance_id, event_group_id)
111        };
112
113        if subscribers.is_empty() {
114            return Ok(0);
115        }
116
117        // Build SOME/IP header
118        let header = Header {
119            message_id: crate::protocol::MessageId::new_from_service_and_method(
120                service_id, event_id,
121            ),
122            length: super::someip_length(payload.len()),
123            request_id,
124            protocol_version,
125            interface_version,
126            message_type: MessageTypeField::new(MessageType::Notification, false),
127            return_code: ReturnCode::Ok,
128        };
129
130        // Serialize header + payload
131        let mut buffer = Vec::new();
132        header.encode(&mut buffer)?;
133        buffer.extend_from_slice(payload);
134
135        // Send to all subscribers
136        let mut sent_count = 0;
137        for subscriber in &subscribers {
138            match self.socket.send_to(&buffer, subscriber.address).await {
139                Ok(_) => {
140                    sent_count += 1;
141                }
142                Err(e) => {
143                    tracing::error!(
144                        "Failed to send raw event to {}: {:?}",
145                        subscriber.address,
146                        e
147                    );
148                }
149            }
150        }
151
152        Ok(sent_count)
153    }
154
155    /// Check if there are any active subscribers for a specific event group
156    ///
157    /// # Arguments
158    /// * `service_id` - Service ID
159    /// * `instance_id` - Instance ID
160    /// * `event_group_id` - Event group ID
161    ///
162    /// # Returns
163    /// `true` if there are subscribers, `false` otherwise
164    pub async fn has_subscribers(
165        &self,
166        service_id: u16,
167        instance_id: u16,
168        event_group_id: u16,
169    ) -> bool {
170        let mgr = self.subscriptions.read().await;
171        !mgr.get_subscribers(service_id, instance_id, event_group_id)
172            .is_empty()
173    }
174
175    /// Get the current number of subscribers for a specific event group
176    pub async fn subscriber_count(
177        &self,
178        service_id: u16,
179        instance_id: u16,
180        event_group_id: u16,
181    ) -> usize {
182        let mgr = self.subscriptions.read().await;
183        mgr.get_subscribers(service_id, instance_id, event_group_id)
184            .len()
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    #[tokio::test]
192    async fn test_event_publisher_creation() {
193        let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new()));
194        let socket = Arc::new(
195            UdpSocket::bind("127.0.0.1:0")
196                .await
197                .expect("Failed to bind socket"),
198        );
199
200        let publisher = EventPublisher::new(subscriptions, socket);
201        // Just test that it was created successfully
202        assert!(std::mem::size_of_val(&publisher) > 0);
203    }
204}