1#[cfg(feature = "metrics")]
2use crate::metrics::ContextMetricsSnapshot;
3use crate::{MctxError, Publication, PublicationConfig, PublicationId, SendReport};
4use socket2::Socket;
5
6#[derive(Debug)]
8pub struct Context {
9 publications: Vec<Publication>,
10 next_id: u64,
11}
12
13impl Default for Context {
14 fn default() -> Self {
15 Self::new()
16 }
17}
18
19impl Context {
20 pub fn new() -> Self {
22 Self {
23 publications: Vec::new(),
24 next_id: 1,
25 }
26 }
27
28 pub fn publication_count(&self) -> usize {
30 self.publications.len()
31 }
32
33 pub fn contains_publication(&self, id: PublicationId) -> bool {
35 self.publications
36 .iter()
37 .any(|publication| publication.id() == id)
38 }
39
40 pub fn get_publication(&self, id: PublicationId) -> Option<&Publication> {
42 self.publications
43 .iter()
44 .find(|publication| publication.id() == id)
45 }
46
47 pub fn get_publication_mut(&mut self, id: PublicationId) -> Option<&mut Publication> {
49 self.publications
50 .iter_mut()
51 .find(|publication| publication.id() == id)
52 }
53
54 pub fn add_publication(
56 &mut self,
57 config: PublicationConfig,
58 ) -> Result<PublicationId, MctxError> {
59 if self
60 .publications
61 .iter()
62 .any(|publication| publication.config() == &config)
63 {
64 return Err(MctxError::DuplicatePublication);
65 }
66
67 let id = self.next_publication_id();
68 let publication = Publication::new(id, config)?;
69 self.publications.push(publication);
70 Ok(id)
71 }
72
73 pub fn add_publication_with_socket(
75 &mut self,
76 config: PublicationConfig,
77 socket: Socket,
78 ) -> Result<PublicationId, MctxError> {
79 if self
80 .publications
81 .iter()
82 .any(|publication| publication.config() == &config)
83 {
84 return Err(MctxError::DuplicatePublication);
85 }
86
87 let id = self.next_publication_id();
88 let publication = Publication::new_with_socket(id, config, socket)?;
89 self.publications.push(publication);
90 Ok(id)
91 }
92
93 pub fn remove_publication(&mut self, id: PublicationId) -> bool {
95 let Some(index) = self
96 .publications
97 .iter()
98 .position(|publication| publication.id() == id)
99 else {
100 return false;
101 };
102
103 self.publications.swap_remove(index);
104 true
105 }
106
107 pub fn take_publication(&mut self, id: PublicationId) -> Option<Publication> {
109 let index = self
110 .publications
111 .iter()
112 .position(|publication| publication.id() == id)?;
113
114 Some(self.publications.swap_remove(index))
115 }
116
117 pub fn publications(&self) -> &[Publication] {
119 &self.publications
120 }
121
122 pub fn publications_mut(&mut self) -> &mut [Publication] {
124 &mut self.publications
125 }
126
127 pub fn send(&self, id: PublicationId, payload: &[u8]) -> Result<SendReport, MctxError> {
129 let publication = self
130 .get_publication(id)
131 .ok_or(MctxError::PublicationNotFound)?;
132
133 publication.send(payload)
134 }
135
136 pub fn send_all(&self, payload: &[u8], out: &mut Vec<SendReport>) -> Result<usize, MctxError> {
140 let before = out.len();
141
142 for publication in &self.publications {
143 out.push(publication.send(payload)?);
144 }
145
146 Ok(out.len() - before)
147 }
148
149 #[cfg(feature = "metrics")]
151 pub fn metrics_snapshot(&self) -> ContextMetricsSnapshot {
152 let mut snapshot = ContextMetricsSnapshot {
153 publication_count: self.publications.len(),
154 ..ContextMetricsSnapshot::default()
155 };
156
157 for publication in &self.publications {
158 let publication_metrics = publication.metrics_snapshot();
159 snapshot.send_calls += publication_metrics.send_calls;
160 snapshot.packets_sent += publication_metrics.packets_sent;
161 snapshot.bytes_sent += publication_metrics.bytes_sent;
162 snapshot.send_errors += publication_metrics.send_errors;
163 }
164
165 snapshot
166 }
167
168 fn next_publication_id(&mut self) -> PublicationId {
169 let id = PublicationId(self.next_id);
170 self.next_id += 1;
171 id
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 #[cfg(feature = "metrics")]
179 use crate::metrics::ContextMetricsSampler;
180 use crate::test_support::{TEST_GROUP, recv_payload, test_multicast_receiver};
181 use std::net::Ipv4Addr;
182
183 #[test]
184 fn context_send_reaches_a_local_receiver() {
185 let (receiver, port) = test_multicast_receiver();
186 let mut context = Context::new();
187 let config = PublicationConfig::new(TEST_GROUP, port);
188 let id = context.add_publication(config).unwrap();
189
190 let report = context.send(id, b"context hello").unwrap();
191 let payload = recv_payload(&receiver);
192
193 assert_eq!(report.bytes_sent, b"context hello".len());
194 assert_eq!(payload, b"context hello");
195 }
196
197 #[test]
198 fn duplicate_publications_are_rejected() {
199 let mut context = Context::new();
200 let config = PublicationConfig::new(Ipv4Addr::new(239, 1, 2, 3), 5000);
201
202 context.add_publication(config.clone()).unwrap();
203 let result = context.add_publication(config);
204
205 assert!(matches!(result, Err(MctxError::DuplicatePublication)));
206 }
207
208 #[cfg(feature = "metrics")]
209 #[test]
210 fn context_metrics_track_successful_sends() {
211 let (_receiver, port) = test_multicast_receiver();
212 let mut context = Context::new();
213 let id = context
214 .add_publication(PublicationConfig::new(TEST_GROUP, port))
215 .unwrap();
216 let sampler = ContextMetricsSampler::new(&context);
217
218 context.send(id, b"metrics").unwrap();
219
220 let delta = sampler.delta();
221 assert_eq!(delta.publication_count_change, 0);
222 assert_eq!(delta.send_calls, 1);
223 assert_eq!(delta.packets_sent, 1);
224 assert_eq!(delta.bytes_sent, b"metrics".len() as u64);
225 assert_eq!(delta.send_errors, 0);
226 }
227}