use std::collections::HashMap;
use std::io::Cursor;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use fastrand::Rng;
use if_addrs::{IfAddr, get_if_addrs};
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::task::JoinSet;
use tokio::time;
use tracing::{debug, info, trace, warn};
use viva_gencp::{AckHeader, CommandFlags, GenCpAck, OpCode, StatusCode, decode_ack};
use crate::nic::{self, Iface};
pub mod consts {
use std::time::Duration;
pub const PORT: u16 = 3956;
pub const DISCOVERY_COMMAND: u16 = 0x0002;
pub const DISCOVERY_ACK: u16 = 0x0003;
pub const FORCEIP_COMMAND: u16 = 0x0004;
pub const FORCEIP_ACK: u16 = 0x0005;
pub const PACKET_RESEND_COMMAND: u16 = 0x0040;
pub const PACKET_RESEND_ACK: u16 = 0x0041;
pub const CURRENT_IP_CONFIG: u64 = 0x0014;
pub const PERSISTENT_IP_ADDRESS: u64 = 0x064C;
pub const PERSISTENT_SUBNET_MASK: u64 = 0x065C;
pub const PERSISTENT_DEFAULT_GATEWAY: u64 = 0x066C;
pub const CONTROL_CHANNEL_PRIVILEGE: u64 = 0x0a00;
pub const CCP_CONTROL: u32 = 1 << 1;
pub const CCP_EXCLUSIVE: u32 = 1 << 0;
pub const MESSAGE_DESTINATION_ADDRESS: u64 = 0x0900_0200;
pub const MESSAGE_DESTINATION_PORT: u64 = 0x0900_0204;
pub const EVENT_NOTIFICATION_BASE: u64 = 0x0900_0300;
pub const EVENT_NOTIFICATION_STRIDE: u64 = 4;
pub const GENCP_MAX_BLOCK: usize = 512;
pub const GENCP_WRITE_OVERHEAD: usize = 8;
pub const CONTROL_TIMEOUT: Duration = Duration::from_millis(500);
pub const MAX_RETRIES: usize = 4;
pub const RETRY_BASE_DELAY: Duration = Duration::from_millis(20);
pub const RETRY_JITTER: Duration = Duration::from_millis(10);
pub const DISCOVERY_BUFFER: usize = 2048;
pub const STREAM_CHANNEL_BASE: u64 = 0x0d00;
pub const STREAM_CHANNEL_STRIDE: u64 = 0x40;
pub const STREAM_DESTINATION_PORT: u64 = 0x00;
pub const STREAM_PACKET_SIZE: u64 = 0x04;
pub const STREAM_PACKET_DELAY: u64 = 0x08;
pub const STREAM_DESTINATION_ADDRESS: u64 = 0x18;
}
pub use consts::PORT as GVCP_PORT;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GvcpRequestHeader {
pub flags: CommandFlags,
pub command: u16,
pub length: u16,
pub request_id: u16,
}
const GVCP_CMD_KEY: u8 = 0x42;
impl GvcpRequestHeader {
pub fn encode(self, payload: &[u8]) -> Bytes {
let mut buf = BytesMut::with_capacity(viva_gencp::HEADER_SIZE + payload.len());
buf.put_u8(GVCP_CMD_KEY);
buf.put_u8(self.gvcp_flags_byte());
buf.put_u16(self.command);
buf.put_u16(self.length);
buf.put_u16(self.request_id);
buf.extend_from_slice(payload);
buf.freeze()
}
fn gvcp_flags_byte(&self) -> u8 {
let mut byte = 0u8;
if self.flags.contains(CommandFlags::ACK_REQUIRED) {
byte |= 0x01;
}
if self.flags.contains(CommandFlags::BROADCAST) {
byte |= 0x10;
}
byte
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GvcpAckHeader {
pub status: StatusCode,
pub command: u16,
pub length: u16,
pub request_id: u16,
}
impl From<AckHeader> for GvcpAckHeader {
fn from(value: AckHeader) -> Self {
Self {
status: value.status,
command: value.opcode.ack_code(),
length: value.length,
request_id: value.request_id,
}
}
}
#[derive(Debug, Error)]
pub enum GigeError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("protocol: {0}")]
Protocol(String),
#[error("timeout waiting for acknowledgement")]
Timeout,
#[error("GenCP: {0}")]
GenCp(#[from] viva_gencp::GenCpError),
#[error("device reported status {0:?}")]
Status(StatusCode),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeviceInfo {
pub ip: Ipv4Addr,
pub mac: [u8; 6],
pub model: Option<String>,
pub manufacturer: Option<String>,
}
impl DeviceInfo {
fn mac_string(&self) -> String {
self.mac
.iter()
.map(|byte| format!("{byte:02X}"))
.collect::<Vec<_>>()
.join(":")
}
}
pub async fn discover(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
discover_impl(timeout, None, false).await
}
pub async fn discover_on_interface(
timeout: Duration,
interface: &str,
) -> Result<Vec<DeviceInfo>, GigeError> {
discover_impl(timeout, Some(interface), true).await
}
pub async fn discover_all(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
discover_impl(timeout, None, true).await
}
pub async fn force_ip(
mac: [u8; 6],
ip: Ipv4Addr,
subnet: Ipv4Addr,
gateway: Ipv4Addr,
iface: Option<&Iface>,
) -> Result<(), GigeError> {
let payload = encode_forceip_payload(mac, ip, subnet, gateway);
let local_ip = match iface {
Some(iface) => iface
.ipv4()
.ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?,
None => Ipv4Addr::UNSPECIFIED,
};
let socket = UdpSocket::bind(SocketAddr::new(IpAddr::V4(local_ip), 0)).await?;
socket.set_broadcast(true)?;
let dest = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), consts::PORT);
let header = GvcpRequestHeader {
flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
command: consts::FORCEIP_COMMAND,
length: payload.len() as u16,
request_id: 1,
};
let packet = header.encode(&payload);
info!(mac = ?mac, %ip, %subnet, %gateway, "sending FORCEIP command");
socket.send_to(&packet, dest).await?;
let mut buf = vec![0u8; consts::DISCOVERY_BUFFER];
match time::timeout(consts::CONTROL_TIMEOUT, socket.recv_from(&mut buf)).await {
Ok(Ok((len, _src))) => {
if len < viva_gencp::HEADER_SIZE {
return Err(GigeError::Protocol("FORCEIP ack too short".into()));
}
let mut cursor = &buf[..];
let status = cursor.get_u16();
let command = cursor.get_u16();
if command != consts::FORCEIP_ACK {
return Err(GigeError::Protocol(format!(
"unexpected FORCEIP ack opcode {command:#06x}"
)));
}
if status != 0 {
return Err(GigeError::Protocol(format!(
"FORCEIP returned status {status:#06x}"
)));
}
info!(%ip, "FORCEIP accepted");
Ok(())
}
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(GigeError::Timeout),
}
}
fn encode_forceip_payload(
mac: [u8; 6],
ip: Ipv4Addr,
subnet: Ipv4Addr,
gateway: Ipv4Addr,
) -> Vec<u8> {
let mut buf = vec![0u8; 56];
buf[2..8].copy_from_slice(&mac);
buf[20..24].copy_from_slice(&ip.octets());
buf[36..40].copy_from_slice(&subnet.octets());
buf[52..56].copy_from_slice(&gateway.octets());
buf
}
async fn discover_impl(
timeout: Duration,
iface_filter: Option<&str>,
include_loopback: bool,
) -> Result<Vec<DeviceInfo>, GigeError> {
let mut interfaces = Vec::new();
for iface in get_if_addrs()? {
let IfAddr::V4(v4) = iface.addr else {
continue;
};
if !include_loopback && v4.ip.is_loopback() {
continue;
}
if let Some(filter) = iface_filter
&& iface.name != filter
{
continue;
}
interfaces.push((iface.name, v4));
}
if interfaces.is_empty() {
return Ok(Vec::new());
}
let mut join_set = JoinSet::new();
for (idx, (name, v4)) in interfaces.into_iter().enumerate() {
let request_id = 0x0100u16.wrapping_add(idx as u16);
let interface_name = name.clone();
join_set.spawn(async move {
let local_addr = SocketAddr::new(IpAddr::V4(v4.ip), 0);
let socket = UdpSocket::bind(local_addr).await?;
let destination = if v4.ip.is_loopback() {
SocketAddr::new(IpAddr::V4(v4.ip), consts::PORT)
} else {
socket.set_broadcast(true)?;
let broadcast = v4.broadcast.unwrap_or(Ipv4Addr::BROADCAST);
SocketAddr::new(IpAddr::V4(broadcast), consts::PORT)
};
let header = GvcpRequestHeader {
flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
command: consts::DISCOVERY_COMMAND,
length: 0,
request_id,
};
let packet = header.encode(&[]);
info!(%interface_name, local = %v4.ip, dest = %destination, "sending GVCP discovery");
trace!(%interface_name, bytes = packet.len(), "GVCP discovery payload size");
socket.send_to(&packet, destination).await?;
let mut responses = Vec::new();
let mut buffer = vec![0u8; consts::DISCOVERY_BUFFER];
let timer = time::sleep(timeout);
tokio::pin!(timer);
loop {
tokio::select! {
_ = &mut timer => break,
recv = socket.recv_from(&mut buffer) => {
let (len, src) = recv?;
info!(%interface_name, %src, "received GVCP response");
trace!(%interface_name, bytes = len, "GVCP response length");
if let Some(info) = parse_discovery_ack(&buffer[..len], request_id)? {
trace!(ip = %info.ip, mac = %info.mac_string(), "parsed discovery ack");
responses.push(info);
}
}
}
}
Ok::<_, GigeError>(responses)
});
}
let mut seen = HashMap::new();
while let Some(res) = join_set.join_next().await {
let devices =
res.map_err(|e| GigeError::Protocol(format!("discovery task failed: {e}")))??;
for dev in devices {
seen.entry((dev.ip, dev.mac)).or_insert(dev);
}
}
let mut devices: Vec<_> = seen.into_values().collect();
devices.sort_by_key(|d| d.ip);
Ok(devices)
}
fn parse_discovery_ack(buf: &[u8], expected_request: u16) -> Result<Option<DeviceInfo>, GigeError> {
if buf.len() < viva_gencp::HEADER_SIZE {
return Err(GigeError::Protocol("GVCP ack too short".into()));
}
let mut header = buf;
let status = header.get_u16();
let command = header.get_u16();
let length = header.get_u16() as usize;
let request_id = header.get_u16();
if request_id != expected_request {
return Ok(None);
}
if command != consts::DISCOVERY_ACK {
return Err(GigeError::Protocol(format!(
"unexpected discovery opcode {command:#06x}"
)));
}
if status != 0 {
return Err(GigeError::Protocol(format!(
"discovery returned status {status:#06x}"
)));
}
if buf.len() < viva_gencp::HEADER_SIZE + length {
return Err(GigeError::Protocol("discovery payload truncated".into()));
}
let payload = &buf[viva_gencp::HEADER_SIZE..viva_gencp::HEADER_SIZE + length];
let info = parse_discovery_payload(payload)?;
Ok(Some(info))
}
fn parse_discovery_payload(payload: &[u8]) -> Result<DeviceInfo, GigeError> {
if payload.len() < 40 {
return Err(GigeError::Protocol("discovery payload too small".into()));
}
let mut cursor = Cursor::new(payload);
let _spec_major = cursor.get_u16(); let _spec_minor = cursor.get_u16(); let _device_mode = cursor.get_u32(); let _reserved = cursor.get_u32();
let mut mac = [0u8; 6];
cursor.copy_to_slice(&mut mac);
let _supported_ip_config = cursor.get_u32(); let _current_ip_config = cursor.get_u32();
cursor.advance(10); let ip = Ipv4Addr::from(cursor.get_u32());
cursor.advance(12); let _subnet = cursor.get_u32();
cursor.advance(12); let _gateway = cursor.get_u32();
let manufacturer = read_fixed_string(&mut cursor, 32)?; let model = read_fixed_string(&mut cursor, 32)?;
Ok(DeviceInfo {
ip,
mac,
manufacturer,
model,
})
}
fn read_fixed_string(cursor: &mut Cursor<&[u8]>, len: usize) -> Result<Option<String>, GigeError> {
if cursor.remaining() < len {
return Err(GigeError::Protocol("discovery string truncated".into()));
}
let mut buf = vec![0u8; len];
cursor.copy_to_slice(&mut buf);
Ok(parse_string(&buf))
}
fn parse_string(bytes: &[u8]) -> Option<String> {
let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
let slice = &bytes[..end];
let s = String::from_utf8_lossy(slice).trim().to_string();
if s.is_empty() { None } else { Some(s) }
}
pub struct GigeDevice {
socket: UdpSocket,
remote: SocketAddr,
request_id: u16,
rng: Rng,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamParams {
pub packet_size: u32,
pub packet_delay: u32,
pub mtu: u32,
pub host: Ipv4Addr,
pub port: u16,
}
impl GigeDevice {
pub async fn open(addr: SocketAddr) -> Result<Self, GigeError> {
let local_ip = match addr.ip() {
IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
IpAddr::V6(_) => {
return Err(GigeError::Protocol("IPv6 GVCP is not supported".into()));
}
};
let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
socket.connect(addr).await?;
Ok(Self {
socket,
remote: addr,
request_id: 1,
rng: Rng::new(),
})
}
pub async fn claim_control(&mut self) -> Result<(), GigeError> {
self.write_register(
consts::CONTROL_CHANNEL_PRIVILEGE as u32,
consts::CCP_CONTROL,
)
.await?;
debug!(addr = %self.remote, "claimed control channel privilege");
Ok(())
}
pub async fn release_control(&mut self) -> Result<(), GigeError> {
self.write_register(consts::CONTROL_CHANNEL_PRIVILEGE as u32, 0)
.await
}
pub fn remote_addr(&self) -> SocketAddr {
self.remote
}
fn next_request_id(&mut self) -> u16 {
let id = self.request_id;
self.request_id = self.request_id.wrapping_add(1);
if self.request_id == 0 {
self.request_id = 1;
}
id
}
async fn transact_with_retry(
&mut self,
opcode: OpCode,
payload: BytesMut,
) -> Result<GenCpAck, GigeError> {
let mut attempt = 0usize;
let mut payload = payload;
loop {
attempt += 1;
let request_id = self.next_request_id();
let payload_bytes = payload.clone().freeze();
let header = GvcpRequestHeader {
flags: CommandFlags::ACK_REQUIRED,
command: opcode.command_code(),
length: payload_bytes.len() as u16,
request_id,
};
let encoded = header.encode(&payload_bytes);
trace!(request_id, opcode = ?opcode, bytes = encoded.len(), attempt, "sending GVCP command");
if let Err(err) = self.socket.send(&encoded).await {
if attempt >= consts::MAX_RETRIES {
return Err(err.into());
}
warn!(request_id, ?opcode, attempt, "send failed, retrying");
self.backoff(attempt).await;
payload = BytesMut::from(&payload_bytes[..]);
continue;
}
let mut buf = vec![
0u8;
viva_gencp::HEADER_SIZE
+ consts::GENCP_MAX_BLOCK
+ consts::GENCP_WRITE_OVERHEAD
];
match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
Ok(Ok(len)) => {
trace!(request_id, bytes = len, attempt, "received GenCP ack");
let ack = decode_ack(&buf[..len])?;
if ack.header.request_id != request_id {
debug!(
request_id,
got = ack.header.request_id,
attempt,
"acknowledgement id mismatch"
);
if attempt >= consts::MAX_RETRIES {
return Err(GigeError::Protocol("acknowledgement id mismatch".into()));
}
self.backoff(attempt).await;
payload = BytesMut::from(&payload_bytes[..]);
continue;
}
if ack.header.opcode != opcode {
return Err(GigeError::Protocol(
"unexpected opcode in acknowledgement".into(),
));
}
match ack.header.status {
StatusCode::Success => return Ok(ack),
StatusCode::DeviceBusy if attempt < consts::MAX_RETRIES => {
warn!(request_id, attempt, "device busy, retrying");
self.backoff(attempt).await;
payload = BytesMut::from(&payload_bytes[..]);
continue;
}
other => return Err(GigeError::Status(other)),
}
}
Ok(Err(err)) => {
if attempt >= consts::MAX_RETRIES {
return Err(err.into());
}
warn!(request_id, ?opcode, attempt, "receive error, retrying");
self.backoff(attempt).await;
payload = BytesMut::from(&payload_bytes[..]);
}
Err(_) => {
if attempt >= consts::MAX_RETRIES {
return Err(GigeError::Timeout);
}
warn!(request_id, ?opcode, attempt, "command timeout, retrying");
self.backoff(attempt).await;
payload = BytesMut::from(&payload_bytes[..]);
}
}
}
}
async fn backoff(&mut self, attempt: usize) {
let multiplier = 1u32 << (attempt.saturating_sub(1)).min(3);
let base_ms = consts::RETRY_BASE_DELAY.as_millis() as u64;
let base = Duration::from_millis(base_ms.saturating_mul(multiplier as u64).max(base_ms));
let jitter_ms = self.rng.u64(..=consts::RETRY_JITTER.as_millis() as u64);
let jitter = Duration::from_millis(jitter_ms);
let delay = base + jitter;
debug!(attempt, delay = ?delay, "gvcp retry backoff");
time::sleep(delay).await;
}
pub async fn read_register(&mut self, addr: u32) -> Result<u32, GigeError> {
let mut payload = BytesMut::with_capacity(4);
payload.put_u32(addr);
let ack = self
.transact_with_retry(OpCode::ReadRegister, payload)
.await?;
if ack.payload.len() != 4 {
return Err(GigeError::Protocol(format!(
"expected 4-byte register ack but device returned {} bytes",
ack.payload.len()
)));
}
let mut cursor = &ack.payload[..];
Ok(cursor.get_u32())
}
pub async fn write_register(&mut self, addr: u32, value: u32) -> Result<(), GigeError> {
let mut payload = BytesMut::with_capacity(8);
payload.put_u32(addr);
payload.put_u32(value);
let ack = self
.transact_with_retry(OpCode::WriteRegister, payload)
.await?;
if ack.payload.len() != 4 {
return Err(GigeError::Protocol(format!(
"expected 4-byte register write ack but device returned {} bytes",
ack.payload.len()
)));
}
Ok(())
}
pub async fn read_mem(&mut self, addr: u64, len: usize) -> Result<Vec<u8>, GigeError> {
let mut remaining = len;
let mut offset = 0usize;
let mut data = Vec::with_capacity(len);
while remaining > 0 {
let chunk = remaining.min(consts::GENCP_MAX_BLOCK);
let mut payload = BytesMut::with_capacity(8);
payload.put_u32((addr + offset as u64) as u32);
payload.put_u16(0); payload.put_u16(chunk as u16);
let ack = self.transact_with_retry(OpCode::ReadMem, payload).await?;
let ack_data = if ack.payload.len() >= 4 + chunk {
&ack.payload[4..4 + chunk]
} else if ack.payload.len() == chunk {
&ack.payload[..chunk]
} else {
return Err(GigeError::Protocol(format!(
"expected {} bytes but device returned {}",
chunk,
ack.payload.len()
)));
};
data.extend_from_slice(ack_data);
remaining -= chunk;
offset += chunk;
}
Ok(data)
}
pub async fn write_mem(&mut self, addr: u64, data: &[u8]) -> Result<(), GigeError> {
const GVCP_WRITE_OVERHEAD: usize = 4;
let mut offset = 0usize;
while offset < data.len() {
let chunk = (data.len() - offset).min(consts::GENCP_MAX_BLOCK - GVCP_WRITE_OVERHEAD);
if chunk == 0 {
return Err(GigeError::Protocol("write chunk size is zero".into()));
}
let mut payload = BytesMut::with_capacity(GVCP_WRITE_OVERHEAD + chunk);
payload.put_u32((addr + offset as u64) as u32);
payload.extend_from_slice(&data[offset..offset + chunk]);
let ack = self.transact_with_retry(OpCode::WriteMem, payload).await?;
if ack.payload.len() > 4 {
return Err(GigeError::Protocol(
"write acknowledgement carried unexpected payload".into(),
));
}
offset += chunk;
}
Ok(())
}
pub async fn set_message_destination(
&mut self,
ip: Ipv4Addr,
port: u16,
) -> Result<(), GigeError> {
info!(%ip, port, "configuring message channel destination");
self.write_mem(consts::MESSAGE_DESTINATION_ADDRESS, &ip.octets())
.await?;
self.write_mem(consts::MESSAGE_DESTINATION_PORT, &port.to_be_bytes())
.await?;
Ok(())
}
fn stream_reg(channel: u32, offset: u64) -> u64 {
consts::STREAM_CHANNEL_BASE + channel as u64 * consts::STREAM_CHANNEL_STRIDE + offset
}
pub async fn set_stream_destination(
&mut self,
channel: u32,
ip: Ipv4Addr,
port: u16,
) -> Result<(), GigeError> {
info!(channel, %ip, port, "configuring stream destination");
let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_ADDRESS);
self.write_mem(addr, &ip.octets()).await?;
let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_PORT);
self.write_mem(addr, &(port as u32).to_be_bytes()).await?;
Ok(())
}
pub async fn set_stream_packet_size(
&mut self,
channel: u32,
packet_size: u32,
) -> Result<(), GigeError> {
info!(channel, packet_size, "configuring stream packet size");
let addr = Self::stream_reg(channel, consts::STREAM_PACKET_SIZE);
self.write_mem(addr, &packet_size.to_be_bytes()).await
}
pub async fn set_stream_packet_delay(
&mut self,
channel: u32,
packet_delay: u32,
) -> Result<(), GigeError> {
debug!(channel, packet_delay, "configuring stream packet delay");
let addr = Self::stream_reg(channel, consts::STREAM_PACKET_DELAY);
self.write_mem(addr, &packet_delay.to_be_bytes()).await
}
pub async fn negotiate_stream(
&mut self,
channel: u32,
iface: &Iface,
port: u16,
target_mtu: Option<u32>,
) -> Result<StreamParams, GigeError> {
let host_ip = iface
.ipv4()
.ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?;
let iface_mtu = nic::mtu(iface)?;
let mtu = target_mtu.map_or(iface_mtu, |limit| limit.min(iface_mtu));
let packet_size = nic::best_packet_size(mtu);
let packet_delay = if mtu <= 1500 {
const DELAY_NS: u32 = 2_000; DELAY_NS / 80
} else {
0
};
self.set_stream_destination(channel, host_ip, port).await?;
self.set_stream_packet_size(channel, packet_size).await?;
self.set_stream_packet_delay(channel, packet_delay).await?;
Ok(StreamParams {
packet_size,
packet_delay,
mtu,
host: host_ip,
port,
})
}
pub async fn enable_event_raw(&mut self, id: u16, on: bool) -> Result<(), GigeError> {
let index = (id / 32) as u64;
let bit = 1u32 << (id % 32);
let addr = consts::EVENT_NOTIFICATION_BASE + index * consts::EVENT_NOTIFICATION_STRIDE;
let current = self.read_mem(addr, 4).await?;
if current.len() != 4 {
return Err(GigeError::Protocol(
"event notification register length mismatch".into(),
));
}
let mut bytes = [0u8; 4];
bytes.copy_from_slice(¤t);
let mut value = u32::from_be_bytes(bytes);
if on {
value |= bit;
} else {
value &= !bit;
}
let new_bytes = value.to_be_bytes();
self.write_mem(addr, &new_bytes).await?;
debug!(event_id = id, enabled = on, "updated event mask");
Ok(())
}
pub async fn read_persistent_ip(
&mut self,
) -> Result<(Ipv4Addr, Ipv4Addr, Ipv4Addr), GigeError> {
let ip = Ipv4Addr::from(
self.read_register(consts::PERSISTENT_IP_ADDRESS as u32)
.await?,
);
let subnet = Ipv4Addr::from(
self.read_register(consts::PERSISTENT_SUBNET_MASK as u32)
.await?,
);
let gateway = Ipv4Addr::from(
self.read_register(consts::PERSISTENT_DEFAULT_GATEWAY as u32)
.await?,
);
Ok((ip, subnet, gateway))
}
pub async fn write_persistent_ip(
&mut self,
ip: Ipv4Addr,
subnet: Ipv4Addr,
gateway: Ipv4Addr,
) -> Result<(), GigeError> {
self.write_register(consts::PERSISTENT_IP_ADDRESS as u32, u32::from(ip))
.await?;
self.write_register(consts::PERSISTENT_SUBNET_MASK as u32, u32::from(subnet))
.await?;
self.write_register(
consts::PERSISTENT_DEFAULT_GATEWAY as u32,
u32::from(gateway),
)
.await?;
info!(%ip, %subnet, %gateway, "wrote persistent IP configuration");
Ok(())
}
pub async fn enable_persistent_ip(&mut self) -> Result<(), GigeError> {
let current = self.read_register(consts::CURRENT_IP_CONFIG as u32).await?;
let updated = current | 0x02; self.write_register(consts::CURRENT_IP_CONFIG as u32, updated)
.await?;
info!(config = format!("0x{updated:08x}"), "enabled persistent IP");
Ok(())
}
pub async fn request_resend(
&mut self,
block_id: u16,
first_packet: u16,
last_packet: u16,
) -> Result<(), GigeError> {
let mut payload = BytesMut::with_capacity(8);
payload.put_u16(block_id);
payload.put_u16(0); payload.put_u16(first_packet);
payload.put_u16(last_packet);
let request_id = self.next_request_id();
let header = GvcpRequestHeader {
flags: CommandFlags::ACK_REQUIRED,
command: consts::PACKET_RESEND_COMMAND,
length: payload.len() as u16,
request_id,
};
let packet = header.encode(&payload);
trace!(
block_id,
first_packet, last_packet, request_id, "sending packet resend request"
);
self.socket.send(&packet).await?;
let mut buf = [0u8; viva_gencp::HEADER_SIZE];
match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
Ok(Ok(len)) => {
if len != viva_gencp::HEADER_SIZE {
return Err(GigeError::Protocol("resend ack length mismatch".into()));
}
let mut cursor = &buf[..];
let status = StatusCode::from_raw(cursor.get_u16());
let command = cursor.get_u16();
let length = cursor.get_u16();
let ack_request_id = cursor.get_u16();
if command != consts::PACKET_RESEND_ACK {
return Err(GigeError::Protocol("unexpected resend ack opcode".into()));
}
if length != 0 {
return Err(GigeError::Protocol("resend ack carried payload".into()));
}
if ack_request_id != request_id {
return Err(GigeError::Protocol("resend ack request id mismatch".into()));
}
if status != StatusCode::Success {
return Err(GigeError::Status(status));
}
Ok(())
}
Ok(Err(err)) => Err(err.into()),
Err(_) => Err(GigeError::Timeout),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_header_roundtrip() {
let header = GvcpRequestHeader {
flags: CommandFlags::ACK_REQUIRED,
command: 0x1234,
length: 4,
request_id: 0xBEEF,
};
let payload = [1u8, 2, 3, 4];
let encoded = header.encode(&payload);
assert_eq!(encoded.len(), viva_gencp::HEADER_SIZE + payload.len());
assert_eq!(encoded[0], GVCP_CMD_KEY);
assert_eq!(encoded[1], 0x01); assert_eq!(&encoded[2..4], &header.command.to_be_bytes());
assert_eq!(&encoded[4..6], &header.length.to_be_bytes());
assert_eq!(&encoded[6..8], &header.request_id.to_be_bytes());
assert_eq!(&encoded[8..], &payload);
}
#[test]
fn forceip_payload_encoding() {
let mac = [0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
let ip = Ipv4Addr::new(192, 168, 1, 100);
let subnet = Ipv4Addr::new(255, 255, 255, 0);
let gateway = Ipv4Addr::new(192, 168, 1, 1);
let payload = encode_forceip_payload(mac, ip, subnet, gateway);
assert_eq!(payload.len(), 56);
assert_eq!(&payload[2..8], &mac);
assert_eq!(&payload[20..24], &ip.octets());
assert_eq!(&payload[36..40], &subnet.octets());
assert_eq!(&payload[52..56], &gateway.octets());
assert_eq!(&payload[0..2], &[0, 0]);
assert_eq!(&payload[8..20], &[0u8; 12]);
assert_eq!(&payload[24..36], &[0u8; 12]);
assert_eq!(&payload[40..52], &[0u8; 12]);
}
#[test]
fn ack_header_conversion() {
let ack = AckHeader {
status: StatusCode::DeviceBusy,
opcode: OpCode::ReadMem,
length: 12,
request_id: 0x44,
};
let converted = GvcpAckHeader::from(ack);
assert_eq!(converted.status, StatusCode::DeviceBusy);
assert_eq!(converted.command, OpCode::ReadMem.ack_code());
assert_eq!(converted.length, 12);
assert_eq!(converted.request_id, 0x44);
}
}