Skip to main content

mctx_core/
context.rs

1#[cfg(feature = "metrics")]
2use crate::metrics::ContextMetricsSnapshot;
3use crate::{MctxError, Publication, PublicationConfig, PublicationId, SendReport};
4use socket2::Socket;
5
6/// Small owner for a set of multicast publication sockets.
7#[derive(Debug)]
8pub struct Context {
9    publications: Vec<Publication>,
10    next_id: u64,
11}
12
13impl Default for Context {
14    fn default() -> Self {
15        Self::new()
16    }
17}
18
19impl Context {
20    /// Creates an empty multicast sender context.
21    pub fn new() -> Self {
22        Self {
23            publications: Vec::new(),
24            next_id: 1,
25        }
26    }
27
28    /// Returns the number of tracked publications.
29    pub fn publication_count(&self) -> usize {
30        self.publications.len()
31    }
32
33    /// Returns whether a publication ID exists in the context.
34    pub fn contains_publication(&self, id: PublicationId) -> bool {
35        self.publications
36            .iter()
37            .any(|publication| publication.id() == id)
38    }
39
40    /// Returns an immutable reference to one publication.
41    pub fn get_publication(&self, id: PublicationId) -> Option<&Publication> {
42        self.publications
43            .iter()
44            .find(|publication| publication.id() == id)
45    }
46
47    /// Returns a mutable reference to one publication.
48    pub fn get_publication_mut(&mut self, id: PublicationId) -> Option<&mut Publication> {
49        self.publications
50            .iter_mut()
51            .find(|publication| publication.id() == id)
52    }
53
54    /// Creates a new publication socket from configuration and stores it.
55    pub fn add_publication(
56        &mut self,
57        config: PublicationConfig,
58    ) -> Result<PublicationId, MctxError> {
59        if self
60            .publications
61            .iter()
62            .any(|publication| publication.config() == &config)
63        {
64            return Err(MctxError::DuplicatePublication);
65        }
66
67        let id = self.next_publication_id();
68        let publication = Publication::new(id, config)?;
69        self.publications.push(publication);
70        Ok(id)
71    }
72
73    /// Stores an existing socket as a publication after configuring it.
74    pub fn add_publication_with_socket(
75        &mut self,
76        config: PublicationConfig,
77        socket: Socket,
78    ) -> Result<PublicationId, MctxError> {
79        if self
80            .publications
81            .iter()
82            .any(|publication| publication.config() == &config)
83        {
84            return Err(MctxError::DuplicatePublication);
85        }
86
87        let id = self.next_publication_id();
88        let publication = Publication::new_with_socket(id, config, socket)?;
89        self.publications.push(publication);
90        Ok(id)
91    }
92
93    /// Removes one publication and drops its socket.
94    pub fn remove_publication(&mut self, id: PublicationId) -> bool {
95        let Some(index) = self
96            .publications
97            .iter()
98            .position(|publication| publication.id() == id)
99        else {
100            return false;
101        };
102
103        self.publications.swap_remove(index);
104        true
105    }
106
107    /// Extracts one publication from the context.
108    pub fn take_publication(&mut self, id: PublicationId) -> Option<Publication> {
109        let index = self
110            .publications
111            .iter()
112            .position(|publication| publication.id() == id)?;
113
114        Some(self.publications.swap_remove(index))
115    }
116
117    /// Returns all tracked publications.
118    pub fn publications(&self) -> &[Publication] {
119        &self.publications
120    }
121
122    /// Returns all tracked publications mutably.
123    pub fn publications_mut(&mut self) -> &mut [Publication] {
124        &mut self.publications
125    }
126
127    /// Sends one payload through the selected publication.
128    pub fn send(&self, id: PublicationId, payload: &[u8]) -> Result<SendReport, MctxError> {
129        let publication = self
130            .get_publication(id)
131            .ok_or(MctxError::PublicationNotFound)?;
132
133        publication.send(payload)
134    }
135
136    /// Sends the same payload through every publication and pushes reports into `out`.
137    ///
138    /// If one publication fails, reports already written into `out` are preserved.
139    pub fn send_all(&self, payload: &[u8], out: &mut Vec<SendReport>) -> Result<usize, MctxError> {
140        let before = out.len();
141
142        for publication in &self.publications {
143            out.push(publication.send(payload)?);
144        }
145
146        Ok(out.len() - before)
147    }
148
149    /// Returns a metrics snapshot aggregated across all publications.
150    #[cfg(feature = "metrics")]
151    pub fn metrics_snapshot(&self) -> ContextMetricsSnapshot {
152        let mut snapshot = ContextMetricsSnapshot {
153            publication_count: self.publications.len(),
154            ..ContextMetricsSnapshot::default()
155        };
156
157        for publication in &self.publications {
158            let publication_metrics = publication.metrics_snapshot();
159            snapshot.send_calls += publication_metrics.send_calls;
160            snapshot.packets_sent += publication_metrics.packets_sent;
161            snapshot.bytes_sent += publication_metrics.bytes_sent;
162            snapshot.send_errors += publication_metrics.send_errors;
163        }
164
165        snapshot
166    }
167
168    fn next_publication_id(&mut self) -> PublicationId {
169        let id = PublicationId(self.next_id);
170        self.next_id += 1;
171        id
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    #[cfg(feature = "metrics")]
179    use crate::metrics::ContextMetricsSampler;
180    use crate::test_support::{TEST_GROUP, recv_payload, test_multicast_receiver};
181    use std::net::Ipv4Addr;
182
183    #[test]
184    fn context_send_reaches_a_local_receiver() {
185        let (receiver, port) = test_multicast_receiver();
186        let mut context = Context::new();
187        let config = PublicationConfig::new(TEST_GROUP, port);
188        let id = context.add_publication(config).unwrap();
189
190        let report = context.send(id, b"context hello").unwrap();
191        let payload = recv_payload(&receiver);
192
193        assert_eq!(report.bytes_sent, b"context hello".len());
194        assert_eq!(payload, b"context hello");
195    }
196
197    #[test]
198    fn duplicate_publications_are_rejected() {
199        let mut context = Context::new();
200        let config = PublicationConfig::new(Ipv4Addr::new(239, 1, 2, 3), 5000);
201
202        context.add_publication(config.clone()).unwrap();
203        let result = context.add_publication(config);
204
205        assert!(matches!(result, Err(MctxError::DuplicatePublication)));
206    }
207
208    #[cfg(feature = "metrics")]
209    #[test]
210    fn context_metrics_track_successful_sends() {
211        let (_receiver, port) = test_multicast_receiver();
212        let mut context = Context::new();
213        let id = context
214            .add_publication(PublicationConfig::new(TEST_GROUP, port))
215            .unwrap();
216        let sampler = ContextMetricsSampler::new(&context);
217
218        context.send(id, b"metrics").unwrap();
219
220        let delta = sampler.delta();
221        assert_eq!(delta.publication_count_change, 0);
222        assert_eq!(delta.send_calls, 1);
223        assert_eq!(delta.packets_sent, 1);
224        assert_eq!(delta.bytes_sent, b"metrics".len() as u64);
225        assert_eq!(delta.send_errors, 0);
226    }
227}