use std::net::{IpAddr, Ipv4Addr};
use std::time::{Duration, Instant, SystemTime};
use bytes::{Bytes, BytesMut};
use tokio::net::UdpSocket;
use tracing::{debug, info, trace, warn};
use viva_pfnc::PixelFormat;
use crate::GenicamError;
use crate::frame::Frame;
use crate::time::TimeSync;
use viva_gige::gvcp::{GigeDevice, StreamParams};
use viva_gige::gvsp::{self, GvspPacket, PacketBitmap, StreamConfig};
use viva_gige::nic::{self, DEFAULT_RCVBUF_BYTES, Iface, McOptions};
use viva_gige::stats::{StreamStats, StreamStatsAccumulator};
pub use viva_gige::gvsp::StreamDest;
pub(crate) enum PacketSource {
Udp(UdpSocket),
}
impl PacketSource {
async fn recv(&self, buf: &mut [u8]) -> Result<Bytes, GenicamError> {
match self {
PacketSource::Udp(socket) => {
let (len, _) = socket
.recv_from(buf)
.await
.map_err(|e| GenicamError::transport(format!("socket recv failed: {e}")))?;
Ok(Bytes::copy_from_slice(&buf[..len]))
}
}
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
match self {
PacketSource::Udp(s) => Some(s),
}
}
}
pub struct StreamBuilder<'a> {
device: &'a mut GigeDevice,
iface: Option<Iface>,
dest: Option<StreamDest>,
rcvbuf_bytes: Option<usize>,
auto_packet_size: bool,
target_mtu: Option<u32>,
packet_size: Option<u32>,
packet_delay: Option<u32>,
channel: u32,
dst_port: u16,
}
impl<'a> StreamBuilder<'a> {
pub fn new(device: &'a mut GigeDevice) -> Self {
Self {
device,
iface: None,
dest: None,
rcvbuf_bytes: None,
auto_packet_size: true,
target_mtu: None,
packet_size: None,
packet_delay: None,
channel: 0,
dst_port: 0,
}
}
pub fn iface(mut self, iface: Iface) -> Self {
self.iface = Some(iface);
self
}
pub fn dest(mut self, dest: StreamDest) -> Self {
self.dest = Some(dest);
self
}
pub fn auto_packet_size(mut self, enable: bool) -> Self {
self.auto_packet_size = enable;
self
}
pub fn target_mtu(mut self, mtu: u32) -> Self {
self.target_mtu = Some(mtu);
self
}
pub fn packet_size(mut self, size: u32) -> Self {
self.packet_size = Some(size);
self
}
pub fn packet_delay(mut self, delay: u32) -> Self {
self.packet_delay = Some(delay);
self
}
pub fn destination_port(mut self, port: u16) -> Self {
self.dst_port = port;
if let Some(dest) = &mut self.dest {
*dest = match *dest {
StreamDest::Unicast { dst_ip, .. } => StreamDest::Unicast {
dst_ip,
dst_port: port,
},
StreamDest::Multicast {
group,
loopback,
ttl,
..
} => StreamDest::Multicast {
group,
port,
loopback,
ttl,
},
};
}
self
}
pub fn multicast(mut self, group: Option<Ipv4Addr>) -> Self {
if let Some(group) = group {
self.dest = Some(StreamDest::Multicast {
group,
port: self.dst_port,
loopback: false,
ttl: 1,
});
} else {
self.dest = None;
}
self
}
pub fn rcvbuf_bytes(mut self, size: usize) -> Self {
self.rcvbuf_bytes = Some(size);
self
}
pub fn channel(mut self, channel: u32) -> Self {
self.channel = channel;
self
}
pub async fn build(self) -> Result<Stream, GenicamError> {
let iface = self
.iface
.ok_or_else(|| GenicamError::transport("stream requires a network interface"))?;
let host_ip = iface
.ipv4()
.ok_or_else(|| GenicamError::transport("interface lacks IPv4 address"))?;
let default_port = if self.dst_port == 0 {
0x5FFF
} else {
self.dst_port
};
let mut dest = self.dest.unwrap_or(StreamDest::Unicast {
dst_ip: host_ip,
dst_port: default_port,
});
match &mut dest {
StreamDest::Unicast { dst_port, .. } => {
if *dst_port == 0 {
*dst_port = default_port;
}
}
StreamDest::Multicast { port, .. } => {
if *port == 0 {
*port = default_port;
}
}
}
let iface_mtu = nic::mtu(&iface).map_err(|err| GenicamError::transport(err.to_string()))?;
let mtu = self
.target_mtu
.map_or(iface_mtu, |limit| limit.min(iface_mtu));
let packet_size = if self.auto_packet_size {
nic::best_packet_size(mtu)
} else {
self.packet_size
.unwrap_or_else(|| nic::best_packet_size(1500))
};
let packet_delay = if self.auto_packet_size {
if mtu <= 1500 {
const DELAY_NS: u32 = 2_000;
DELAY_NS / 80
} else {
0
}
} else {
self.packet_delay.unwrap_or(0)
};
match &dest {
StreamDest::Unicast { dst_ip, dst_port } => {
info!(%dst_ip, dst_port, channel = self.channel, "configuring unicast stream");
self.device
.set_stream_destination(self.channel, *dst_ip, *dst_port)
.await
.map_err(|err| GenicamError::transport(err.to_string()))?;
}
StreamDest::Multicast { .. } => {
info!(
channel = self.channel,
port = dest.port(),
addr = %dest.addr(),
"configuring multicast stream parameters"
);
}
}
self.device
.set_stream_packet_size(self.channel, packet_size)
.await
.map_err(|err| GenicamError::transport(err.to_string()))?;
self.device
.set_stream_packet_delay(self.channel, packet_delay)
.await
.map_err(|err| GenicamError::transport(err.to_string()))?;
let source = PacketSource::Udp(Self::bind_socket(&dest, &iface, self.rcvbuf_bytes).await?);
let source_filter = if dest.is_multicast() {
None
} else {
Some(dest.addr())
};
let resend_enabled = !dest.is_multicast();
let params = StreamParams {
packet_size,
packet_delay,
mtu,
host: dest.addr(),
port: dest.port(),
};
let config = StreamConfig {
dest,
iface: iface.clone(),
packet_size: Some(packet_size),
packet_delay: Some(packet_delay),
source_filter,
resend_enabled,
};
let stats = StreamStatsAccumulator::new();
Ok(Stream {
source,
stats,
params,
config,
})
}
async fn bind_socket(
dest: &StreamDest,
iface: &Iface,
rcvbuf_bytes: Option<usize>,
) -> Result<UdpSocket, GenicamError> {
match dest {
StreamDest::Unicast { dst_port, .. } => {
let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
nic::bind_udp(bind_ip, *dst_port, Some(iface.clone()), rcvbuf_bytes)
.await
.map_err(|err| GenicamError::transport(err.to_string()))
}
StreamDest::Multicast {
group,
port,
loopback,
ttl,
} => {
let opts = McOptions {
loopback: *loopback,
ttl: *ttl,
rcvbuf_bytes: rcvbuf_bytes.unwrap_or(DEFAULT_RCVBUF_BYTES),
..McOptions::default()
};
nic::bind_multicast(iface, *group, *port, &opts)
.await
.map_err(|err| GenicamError::transport(err.to_string()))
}
}
}
}
pub struct Stream {
source: PacketSource,
stats: StreamStatsAccumulator,
params: StreamParams,
config: StreamConfig,
}
impl Stream {
pub fn socket(&self) -> Option<&UdpSocket> {
self.source.as_udp_socket()
}
pub(crate) fn into_parts(
self,
) -> (
PacketSource,
StreamStatsAccumulator,
StreamParams,
StreamConfig,
) {
(self.source, self.stats, self.params, self.config)
}
pub fn params(&self) -> StreamParams {
self.params
}
pub fn stats_handle(&self) -> StreamStatsAccumulator {
self.stats.clone()
}
pub fn stats(&self) -> StreamStats {
self.stats.snapshot()
}
pub fn config(&self) -> &StreamConfig {
&self.config
}
}
impl<'a> From<&'a mut GigeDevice> for StreamBuilder<'a> {
fn from(device: &'a mut GigeDevice) -> Self {
StreamBuilder::new(device)
}
}
const DEFAULT_FRAME_TIMEOUT: Duration = Duration::from_millis(100);
const GVSP_HEADER_SIZE: usize = 8;
#[derive(Debug)]
struct FrameAssemblyState {
block_id: u64,
width: u32,
height: u32,
pixel_format: PixelFormat,
timestamp: u64,
expected_packets: Option<usize>,
bitmap: Option<PacketBitmap>,
payload: BytesMut,
packet_payload_size: usize,
started: Instant,
}
impl FrameAssemblyState {
fn new(
block_id: u64,
width: u32,
height: u32,
pixel_format: PixelFormat,
timestamp: u64,
packet_payload_size: usize,
) -> Self {
Self {
block_id,
width,
height,
pixel_format,
timestamp,
expected_packets: None,
bitmap: None,
payload: BytesMut::new(),
packet_payload_size,
started: Instant::now(),
}
}
fn ingest(&mut self, packet_id: u32, data: &[u8]) -> bool {
let pid = packet_id as usize;
if let Some(ref mut bitmap) = self.bitmap
&& !bitmap.set(pid)
{
return false; }
let offset = pid.saturating_sub(1) * self.packet_payload_size;
let required = offset + data.len();
if self.payload.len() < required {
self.payload.resize(required, 0);
}
self.payload[offset..offset + data.len()].copy_from_slice(data);
true
}
fn set_expected_packets(&mut self, count: usize) {
if self.expected_packets.is_none() {
self.expected_packets = Some(count);
self.bitmap = Some(PacketBitmap::new(count));
}
}
#[allow(dead_code)]
fn is_complete(&self) -> bool {
self.bitmap.as_ref().is_some_and(|b| b.is_complete())
}
fn is_expired(&self, timeout: Duration) -> bool {
self.started.elapsed() > timeout
}
#[allow(dead_code)]
fn missing_ranges(&self) -> Vec<std::ops::RangeInclusive<u32>> {
self.bitmap
.as_ref()
.map(|b| b.missing_ranges())
.unwrap_or_default()
}
}
pub struct FrameStream {
source: PacketSource,
stats: StreamStatsAccumulator,
params: StreamParams,
config: StreamConfig,
recv_buffer: Vec<u8>,
active: Option<FrameAssemblyState>,
frame_timeout: Duration,
time_sync: Option<TimeSync>,
}
impl FrameStream {
pub fn new(stream: Stream, time_sync: Option<TimeSync>) -> Self {
let (source, stats, params, config) = stream.into_parts();
let buffer_size = (params.packet_size as usize + 64).max(4096);
Self {
source,
stats,
params,
config,
recv_buffer: vec![0u8; buffer_size],
active: None,
frame_timeout: DEFAULT_FRAME_TIMEOUT,
time_sync,
}
}
pub fn set_frame_timeout(&mut self, timeout: Duration) {
self.frame_timeout = timeout;
}
pub fn set_time_sync(&mut self, time_sync: TimeSync) {
self.time_sync = Some(time_sync);
}
pub fn stats_handle(&self) -> StreamStatsAccumulator {
self.stats.clone()
}
pub fn stats(&self) -> StreamStats {
self.stats.snapshot()
}
pub fn params(&self) -> StreamParams {
self.params
}
pub fn config(&self) -> &StreamConfig {
&self.config
}
pub fn socket(&self) -> Option<&UdpSocket> {
self.source.as_udp_socket()
}
pub async fn next_frame(&mut self) -> Result<Option<Frame>, GenicamError> {
loop {
if let Some(ref active) = self.active
&& active.is_expired(self.frame_timeout)
{
let block_id = active.block_id;
warn!(
block_id,
"frame assembly timeout, dropping incomplete frame"
);
self.stats.record_drop();
self.active = None;
}
let raw = match self.source.recv(&mut self.recv_buffer).await {
Ok(data) if data.is_empty() => return Ok(None), Ok(data) => data,
Err(e) => return Err(e),
};
let packet = match gvsp::parse_packet(&raw) {
Ok(p) => p,
Err(e) => {
trace!(error = %e, "discarding malformed GVSP packet");
continue;
}
};
match packet {
GvspPacket::Leader {
block_id,
width,
height,
pixel_format,
timestamp,
..
} => {
if let Some(ref prev) = self.active
&& prev.block_id != block_id
{
debug!(
old_block = prev.block_id,
new_block = block_id,
"new leader arrived, dropping incomplete frame"
);
self.stats.record_drop();
}
let pixel_format = PixelFormat::from_code(pixel_format);
let packet_payload = self.params.packet_size as usize - GVSP_HEADER_SIZE;
self.active = Some(FrameAssemblyState::new(
block_id,
width,
height,
pixel_format,
timestamp,
packet_payload,
));
trace!(block_id, %pixel_format, width, height, "frame leader received");
}
GvspPacket::Payload {
block_id,
packet_id,
data,
} => {
if let Some(ref mut active) = self.active
&& active.block_id == block_id
&& active.ingest(packet_id, data.as_ref())
{
self.stats.record_packet();
}
}
GvspPacket::Trailer {
block_id,
packet_id,
status,
chunk_data,
} => {
let Some(mut active) = self.active.take() else {
continue;
};
if active.block_id != block_id {
self.stats.record_drop();
continue;
}
active.set_expected_packets(packet_id as usize);
if status != 0 {
warn!(block_id, status, "trailer reported non-zero status");
}
let ts_host = self
.time_sync
.as_ref()
.map(|ts| ts.to_host_time(active.timestamp));
let chunks = if chunk_data.is_empty() {
None
} else {
match crate::chunks::parse_chunk_bytes(&chunk_data) {
Ok(map) => Some(map),
Err(e) => {
debug!(error = %e, "failed to parse chunk data");
None
}
}
};
let payload = active.payload.freeze();
let frame = Frame {
payload,
width: active.width,
height: active.height,
pixel_format: active.pixel_format,
chunks,
ts_dev: Some(active.timestamp),
ts_host,
};
let latency = frame
.host_time()
.and_then(|ts| SystemTime::now().duration_since(ts).ok());
self.stats.record_frame(frame.payload.len(), latency);
debug!(
block_id,
width = frame.width,
height = frame.height,
bytes = frame.payload.len(),
"frame complete"
);
return Ok(Some(frame));
}
}
}
}
}
#[cfg(feature = "u3v")]
#[cfg_attr(docsrs, doc(cfg(feature = "u3v")))]
pub struct U3vFrameStream {
rx: tokio::sync::mpsc::Receiver<Result<Frame, GenicamError>>,
stop_tx: tokio::sync::watch::Sender<bool>,
_reader: tokio::task::JoinHandle<()>,
}
#[cfg(feature = "u3v")]
impl U3vFrameStream {
pub fn start<T: crate::u3v::usb::UsbTransfer + 'static>(
mut stream: crate::u3v::stream::U3vStream<T>,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(4);
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
let reader = tokio::task::spawn_blocking(move || {
loop {
if *stop_rx.borrow() {
break;
}
match stream.next_frame() {
Ok(raw) => {
let pixel_format = PixelFormat::from_code(raw.leader.pixel_format);
let frame = Frame {
payload: raw.payload,
width: raw.leader.width,
height: raw.leader.height,
pixel_format,
chunks: None,
ts_dev: Some(raw.leader.timestamp),
ts_host: None,
};
if tx.blocking_send(Ok(frame)).is_err() {
break; }
}
Err(e) => {
let _ = tx.blocking_send(Err(GenicamError::transport(e.to_string())));
break;
}
}
}
});
Self {
rx,
stop_tx,
_reader: reader,
}
}
pub async fn next_frame(&mut self) -> Result<Option<Frame>, GenicamError> {
match self.rx.recv().await {
Some(Ok(frame)) => Ok(Some(frame)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
}
pub fn stop(&self) {
let _ = self.stop_tx.send(true);
}
}
#[cfg(feature = "u3v")]
#[cfg_attr(docsrs, doc(cfg(feature = "u3v")))]
pub struct U3vStreamBuilder<'a, T: crate::u3v::usb::UsbTransfer + 'static> {
camera: &'a mut crate::Camera<crate::U3vRegisterIo<T>>,
payload_size: Option<u64>,
}
#[cfg(feature = "u3v")]
impl<'a, T: crate::u3v::usb::UsbTransfer + 'static> U3vStreamBuilder<'a, T> {
pub fn new(camera: &'a mut crate::Camera<crate::U3vRegisterIo<T>>) -> Self {
Self {
camera,
payload_size: None,
}
}
pub fn payload_size(mut self, size: u64) -> Self {
self.payload_size = Some(size);
self
}
pub fn build(self) -> Result<U3vFrameStream, GenicamError> {
let payload_size = match self.payload_size {
Some(s) => s,
None => {
let width: u64 = self
.camera
.get("Width")?
.parse()
.map_err(|e| GenicamError::parse(format!("Width: {e}")))?;
let height: u64 = self
.camera
.get("Height")?
.parse()
.map_err(|e| GenicamError::parse(format!("Height: {e}")))?;
let pf_str = self.camera.get("PixelFormat")?;
let bpp = PixelFormat::from_name(&pf_str)
.bytes_per_pixel()
.unwrap_or(1) as u64;
width * height * bpp
}
};
let mut device = self.camera.transport().lock_device()?;
let stream = device
.open_stream(payload_size)
.map_err(|e| GenicamError::transport(e.to_string()))?;
Ok(U3vFrameStream::start(stream))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn frame_assembly_state_ingest_tracks_packets() {
let mut state = FrameAssemblyState::new(1, 640, 480, PixelFormat::Mono8, 0, 1400);
state.set_expected_packets(3);
assert!(state.ingest(1, &[1, 2, 3]));
assert!(state.ingest(2, &[4, 5, 6]));
assert!(!state.ingest(1, &[1, 2, 3]));
}
#[test]
fn frame_assembly_state_timeout() {
let state = FrameAssemblyState::new(1, 640, 480, PixelFormat::Mono8, 0, 1400);
assert!(!state.is_expired(Duration::from_secs(10)));
assert!(state.is_expired(Duration::ZERO));
}
}