pub mod wire;
pub use wire::ReportingMessage;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use socket2::{Domain, Protocol, Socket, Type};
use tracing::warn;
use crate::controller::context::ControllerCtx;
use super::Dispatcher;
const MTU_PAYLOAD_BUDGET: usize = 1500 - 42;
const BYTES_PER_CHANNEL: usize = 8;
const ROW_OVERHEAD_BYTES: usize = 64;
#[derive(Serialize, Deserialize)]
pub struct ReportingDispatcher {
pub multicast_group: Ipv4Addr,
pub port: u16,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub outbound_interface: Option<Ipv4Addr>,
pub schema_period: Duration,
#[serde(skip)]
dropped_frames: Arc<AtomicU64>,
#[serde(skip)]
scratch_buf: Vec<u8>,
#[serde(skip)]
socket: Option<UdpSocket>,
#[serde(skip)]
multicast_addr: Option<SocketAddrV4>,
#[serde(skip)]
stored_schema: Option<ReportingMessage>,
#[serde(skip)]
seq: u64,
#[serde(skip)]
last_schema_sent: Option<Instant>,
#[serde(skip)]
last_error_log_at: Option<Instant>,
}
const ERROR_LOG_RATE_LIMIT: Duration = Duration::from_secs(1);
impl ReportingDispatcher {
pub fn new(
multicast_group: Ipv4Addr,
port: u16,
outbound_interface: Option<Ipv4Addr>,
schema_period: Duration,
) -> Box<Self> {
Box::new(Self {
multicast_group,
port,
outbound_interface,
schema_period,
dropped_frames: Arc::new(AtomicU64::new(0)),
scratch_buf: Vec::new(),
socket: None,
multicast_addr: None,
stored_schema: None,
seq: 0,
last_schema_sent: None,
last_error_log_at: None,
})
}
pub fn dropped_frames(&self) -> u64 {
self.dropped_frames.load(Ordering::Relaxed)
}
pub fn dropped_frames_handle(&self) -> Arc<AtomicU64> {
Arc::clone(&self.dropped_frames)
}
}
#[typetag::serde]
impl Dispatcher for ReportingDispatcher {
fn init(
&mut self,
ctx: &ControllerCtx,
channel_names: &[String],
_core_assignment: usize,
) -> Result<(), String> {
self.socket = None;
self.stored_schema = None;
self.seq = 0;
self.last_schema_sent = None;
self.last_error_log_at = None;
self.dropped_frames.store(0, Ordering::Relaxed);
self.scratch_buf.clear();
let raw = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| format!("ReportingDispatcher: failed to create UDP socket: {e}"))?;
raw.set_reuse_address(true)
.map_err(|e| format!("ReportingDispatcher: SO_REUSEADDR failed: {e}"))?;
let bind_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into();
raw.bind(&bind_addr.into())
.map_err(|e| format!("ReportingDispatcher: bind failed: {e}"))?;
raw.set_multicast_ttl_v4(1)
.map_err(|e| format!("ReportingDispatcher: set_multicast_ttl_v4 failed: {e}"))?;
if let Some(iface) = self.outbound_interface {
raw.set_multicast_if_v4(&iface)
.map_err(|e| format!("ReportingDispatcher: set_multicast_if_v4 failed: {e}"))?;
}
raw.set_nonblocking(true)
.map_err(|e| format!("ReportingDispatcher: set_nonblocking failed: {e}"))?;
let std_sock: UdpSocket = raw.into();
let projected_row_bytes = BYTES_PER_CHANNEL * channel_names.len() + ROW_OVERHEAD_BYTES;
if projected_row_bytes > MTU_PAYLOAD_BUDGET {
warn!(
"ReportingDispatcher: projected Row payload {projected_row_bytes} B exceeds \
MTU budget {MTU_PAYLOAD_BUDGET} B; IP fragmentation will occur on some networks"
);
}
let monotonic_epoch_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let schema = ReportingMessage::Schema {
channel_names: channel_names.to_vec(),
channel_units: ctx.channel_units.clone(),
monotonic_epoch_ns,
is_session_end: false,
};
self.stored_schema = Some(schema);
self.multicast_addr = Some(SocketAddrV4::new(self.multicast_group, self.port));
self.socket = Some(std_sock);
Ok(())
}
fn consume(
&mut self,
time: SystemTime,
timestamp: i64,
channel_values: Vec<f64>,
) -> Result<(), String> {
let (Some(socket), Some(multicast_addr)) =
(self.socket.as_ref(), self.multicast_addr.as_ref())
else {
return Ok(());
};
let now = Instant::now();
let should_emit_schema = match self.last_schema_sent {
None => true,
Some(last) => now.duration_since(last) >= self.schema_period,
};
if should_emit_schema && let Some(schema) = &self.stored_schema {
self.scratch_buf.clear();
match schema.encode_into(&mut self.scratch_buf) {
Err(e) => {
warn!("ReportingDispatcher: Schema encode error (skipping): {e}");
}
Ok(_) => {
self.last_schema_sent = Some(now);
match socket.send_to(&self.scratch_buf, *multicast_addr) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
}
Err(e) => {
let should_log = self.last_error_log_at.is_none_or(|last| {
now.duration_since(last) >= ERROR_LOG_RATE_LIMIT
});
if should_log {
warn!("ReportingDispatcher: Schema send_to error (skipping): {e}");
self.last_error_log_at = Some(now);
}
}
}
}
}
}
let msg = ReportingMessage::Row {
seq: self.seq,
timestamp: timestamp as f64 * 1e-9,
system_time: super::fmt_time(time),
values: channel_values,
};
self.scratch_buf.clear();
if let Err(e) = msg.encode_into(&mut self.scratch_buf) {
warn!("ReportingDispatcher: postcard encode error (skipping frame): {e}");
return Ok(());
}
self.seq += 1;
match socket.send_to(&self.scratch_buf, *multicast_addr) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
let should_log = self
.last_error_log_at
.is_none_or(|last| now.duration_since(last) >= ERROR_LOG_RATE_LIMIT);
if should_log {
warn!("ReportingDispatcher: send_to error (frame dropped): {e}");
self.last_error_log_at = Some(now);
}
}
}
Ok(())
}
fn terminate(&mut self) -> Result<(), String> {
if let (Some(socket), Some(multicast_addr), Some(stored_schema)) = (
self.socket.as_ref(),
self.multicast_addr.as_ref(),
self.stored_schema.take(),
) {
let session_end_schema = match stored_schema {
ReportingMessage::Schema {
channel_names,
channel_units,
monotonic_epoch_ns,
..
} => ReportingMessage::Schema {
channel_names,
channel_units,
monotonic_epoch_ns,
is_session_end: true,
},
other => other,
};
self.scratch_buf.clear();
match session_end_schema.encode_into(&mut self.scratch_buf) {
Err(e) => {
warn!("ReportingDispatcher: session-end Schema encode error (ignored): {e}");
}
Ok(_) => {
if let Err(e) = socket.set_nonblocking(false) {
warn!(
"ReportingDispatcher: could not set blocking mode for session-end \
Schema send (ignored): {e}"
);
}
if let Err(e) = socket.send_to(&self.scratch_buf, *multicast_addr) {
warn!(
"ReportingDispatcher: session-end Schema send_to error (ignored): {e}"
);
}
}
}
}
self.socket = None;
self.multicast_addr = None;
self.stored_schema = None;
self.seq = 0;
self.scratch_buf.clear();
self.last_schema_sent = None;
self.last_error_log_at = None;
Ok(())
}
}