use crate::udt::UdtManager;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
pub trait EtherNetIpStream: AsyncRead + AsyncWrite + Unpin + Send {}
impl<S> EtherNetIpStream for S where S: AsyncRead + AsyncWrite + Unpin + Send {}
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration, Instant};
pub mod config; pub mod error;
pub mod ffi;
pub mod monitoring; pub mod plc_manager;
pub mod subscription;
pub mod tag_group;
pub mod tag_manager;
pub mod tag_path;
pub mod tag_subscription; pub mod udt;
pub mod version;
pub use config::{
ConnectionConfig, LoggingConfig, MonitoringConfig, PerformanceConfig, PlcSpecificConfig,
ProductionConfig, SecurityConfig,
};
pub use error::{EtherNetIpError, Result};
pub use monitoring::{
ConnectionMetrics, ErrorMetrics, HealthMetrics, HealthStatus, MonitoringMetrics,
OperationMetrics, PerformanceMetrics, ProductionMonitor,
};
pub use plc_manager::{PlcConfig, PlcConnection, PlcManager};
pub use subscription::{SubscriptionManager, SubscriptionOptions, TagSubscription};
pub use tag_group::{
TagGroupConfig, TagGroupEvent, TagGroupEventKind, TagGroupFailureCategory,
TagGroupFailureDiagnostic, TagGroupSnapshot, TagGroupSubscription, TagGroupValueResult,
};
pub use tag_manager::{TagCache, TagManager, TagMetadata, TagPermissions, TagScope};
pub use tag_path::TagPath;
pub use tag_subscription::{
SubscriptionManager as RealTimeSubscriptionManager,
SubscriptionOptions as RealTimeSubscriptionOptions, TagSubscription as RealTimeSubscription,
};
pub use udt::{TagAttributes, UdtDefinition, UdtMember, UdtTemplate};
pub fn init_tracing() {
use tracing_subscriber::fmt;
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber = fmt::Subscriber::builder()
.with_env_filter(filter)
.with_target(false) .finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");
}
pub fn try_init_tracing() -> std::result::Result<(), Box<dyn std::error::Error>> {
use tracing_subscriber::fmt;
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber = fmt::Subscriber::builder()
.with_env_filter(filter)
.with_target(false)
.finish();
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct RoutePath {
pub slots: Vec<u8>,
pub ports: Vec<u8>,
pub addresses: Vec<String>,
}
impl RoutePath {
pub fn new() -> Self {
Self {
slots: Vec::new(),
ports: Vec::new(),
addresses: Vec::new(),
}
}
pub fn add_slot(mut self, slot: u8) -> Self {
self.slots.push(slot);
self
}
pub fn add_port(mut self, port: u8) -> Self {
self.ports.push(port);
self
}
pub fn add_address(mut self, address: String) -> Self {
self.addresses.push(address);
self
}
pub fn to_cip_bytes(&self) -> Vec<u8> {
let mut path = Vec::new();
for &slot in &self.slots {
path.push(0x01); path.push(slot); }
for (i, address) in self.addresses.iter().enumerate() {
if i < self.ports.len() {
path.push(self.ports[i]); } else {
path.push(0x01); }
if let Ok(ip) = address.parse::<std::net::Ipv4Addr>() {
let octets = ip.octets();
path.extend_from_slice(&octets);
}
}
path
}
}
impl Default for RoutePath {
fn default() -> Self {
Self::new()
}
}
lazy_static! {
static ref RUNTIME: Runtime = Runtime::new().unwrap();
static ref CLIENTS: Mutex<HashMap<i32, EipClient>> = Mutex::new(HashMap::new());
static ref NEXT_ID: Mutex<i32> = Mutex::new(1);
}
#[derive(Debug, Clone)]
pub enum BatchOperation {
Read { tag_name: String },
Write { tag_name: String, value: PlcValue },
}
#[derive(Debug, Clone)]
pub struct BatchResult {
pub operation: BatchOperation,
pub result: std::result::Result<Option<PlcValue>, BatchError>,
pub execution_time_us: u64,
}
#[derive(Debug, Clone)]
pub enum BatchError {
TagNotFound(String),
DataTypeMismatch { expected: String, actual: String },
NetworkError(String),
CipError { status: u8, message: String },
TagPathError(String),
SerializationError(String),
Timeout,
Other(String),
}
impl std::fmt::Display for BatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchError::TagNotFound(tag) => write!(f, "Tag not found: {tag}"),
BatchError::DataTypeMismatch { expected, actual } => {
write!(f, "Data type mismatch: expected {expected}, got {actual}")
}
BatchError::NetworkError(msg) => write!(f, "Network error: {msg}"),
BatchError::CipError { status, message } => {
write!(f, "CIP error (0x{status:02X}): {message}")
}
BatchError::TagPathError(msg) => write!(f, "Tag path error: {msg}"),
BatchError::SerializationError(msg) => write!(f, "Serialization error: {msg}"),
BatchError::Timeout => write!(f, "Operation timeout"),
BatchError::Other(msg) => write!(f, "Error: {msg}"),
}
}
}
impl std::error::Error for BatchError {}
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub max_operations_per_packet: usize,
pub max_packet_size: usize,
pub packet_timeout_ms: u64,
pub continue_on_error: bool,
pub optimize_packet_packing: bool,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_operations_per_packet: 20,
max_packet_size: 504, packet_timeout_ms: 3000,
continue_on_error: true,
optimize_packet_packing: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectedSession {
pub connection_id: u32,
pub o_to_t_connection_id: u32,
pub t_to_o_connection_id: u32,
pub connection_serial: u16,
pub originator_vendor_id: u16,
pub originator_serial: u32,
pub timeout_multiplier: u8,
pub rpi: u32,
pub o_to_t_params: ConnectionParameters,
pub t_to_o_params: ConnectionParameters,
pub established_at: Instant,
pub is_active: bool,
pub sequence_count: u16,
}
#[derive(Debug, Clone)]
pub struct ConnectionParameters {
pub size: u16,
pub connection_type: u8,
pub priority: u8,
pub variable_size: bool,
}
impl Default for ConnectionParameters {
fn default() -> Self {
Self {
size: 500, connection_type: 0x02, priority: 0x01, variable_size: false,
}
}
}
impl ConnectedSession {
pub fn new(connection_serial: u16) -> Self {
Self {
connection_id: 0,
o_to_t_connection_id: 0,
t_to_o_connection_id: 0,
connection_serial,
originator_vendor_id: 0x1337, originator_serial: 0x1234_5678, timeout_multiplier: 0x05, rpi: 100_000, o_to_t_params: ConnectionParameters::default(),
t_to_o_params: ConnectionParameters::default(),
established_at: Instant::now(),
is_active: false,
sequence_count: 0,
}
}
pub fn with_config(connection_serial: u16, config_id: u8) -> Self {
let mut session = Self::new(connection_serial);
match config_id {
1 => {
session.timeout_multiplier = 0x07; session.rpi = 200_000; session.o_to_t_params.size = 504; session.t_to_o_params.size = 504;
session.o_to_t_params.priority = 0x00; session.t_to_o_params.priority = 0x00;
tracing::debug!("CONFIG 1: Conservative: 504 bytes, 200ms RPI, low priority");
}
2 => {
session.timeout_multiplier = 0x03; session.rpi = 50000; session.o_to_t_params.size = 256; session.t_to_o_params.size = 256;
session.o_to_t_params.priority = 0x02; session.t_to_o_params.priority = 0x02;
tracing::debug!("CONFIG 2: Compact: 256 bytes, 50ms RPI, scheduled priority");
}
3 => {
session.timeout_multiplier = 0x01; session.rpi = 1_000_000; session.o_to_t_params.size = 128; session.t_to_o_params.size = 128;
session.o_to_t_params.priority = 0x03; session.t_to_o_params.priority = 0x03;
tracing::debug!("CONFIG 3: Minimal: 128 bytes, 1000ms RPI, urgent priority");
}
4 => {
session.timeout_multiplier = 0x05; session.rpi = 100_000; session.o_to_t_params.size = 500; session.t_to_o_params.size = 500;
session.o_to_t_params.connection_type = 0x01; session.t_to_o_params.connection_type = 0x01;
session.originator_vendor_id = 0x001D; tracing::debug!(
"CONFIG 4: Rockwell standard: 500 bytes, 100ms RPI, multicast, Rockwell vendor"
);
}
5 => {
session.timeout_multiplier = 0x0A; session.rpi = 500_000; session.o_to_t_params.size = 1024; session.t_to_o_params.size = 1024;
session.o_to_t_params.variable_size = true; session.t_to_o_params.variable_size = true;
tracing::debug!("CONFIG 5: Large buffer: 1024 bytes, 500ms RPI, variable size");
}
_ => {
tracing::debug!("CONFIG 0: Default parameters");
}
}
session
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct UdtData {
pub symbol_id: i32,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum PlcValue {
Bool(bool),
Sint(i8),
Int(i16),
Dint(i32),
Lint(i64),
Usint(u8),
Uint(u16),
Udint(u32),
Ulint(u64),
Real(f32),
Lreal(f64),
String(String),
Udt(UdtData),
}
impl UdtData {
pub fn parse(
&self,
definition: &crate::udt::UserDefinedType,
) -> crate::error::Result<HashMap<String, PlcValue>> {
definition.to_hash_map(&self.data)
}
pub fn from_hash_map(
members: &HashMap<String, PlcValue>,
definition: &crate::udt::UserDefinedType,
symbol_id: i32,
) -> crate::error::Result<Self> {
let data = definition.from_hash_map(members)?;
Ok(UdtData { symbol_id, data })
}
}
impl PlcValue {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
PlcValue::Bool(val) => vec![if *val { 0xFF } else { 0x00 }],
PlcValue::Sint(val) => val.to_le_bytes().to_vec(),
PlcValue::Int(val) => val.to_le_bytes().to_vec(),
PlcValue::Dint(val) => val.to_le_bytes().to_vec(),
PlcValue::Lint(val) => val.to_le_bytes().to_vec(),
PlcValue::Usint(val) => val.to_le_bytes().to_vec(),
PlcValue::Uint(val) => val.to_le_bytes().to_vec(),
PlcValue::Udint(val) => val.to_le_bytes().to_vec(),
PlcValue::Ulint(val) => val.to_le_bytes().to_vec(),
PlcValue::Real(val) => val.to_le_bytes().to_vec(),
PlcValue::Lreal(val) => val.to_le_bytes().to_vec(),
PlcValue::String(val) => {
let mut bytes = Vec::new();
let length = val.len().min(82) as u32;
bytes.extend_from_slice(&length.to_le_bytes());
let string_bytes = val.as_bytes();
let data_len = string_bytes.len().min(82);
bytes.extend_from_slice(&string_bytes[..data_len]);
bytes
}
PlcValue::Udt(udt_data) => {
udt_data.data.clone()
}
}
}
pub fn get_data_type(&self) -> u16 {
match self {
PlcValue::Bool(_) => 0x00C1, PlcValue::Sint(_) => 0x00C2, PlcValue::Int(_) => 0x00C3, PlcValue::Dint(_) => 0x00C4, PlcValue::Lint(_) => 0x00C5, PlcValue::Usint(_) => 0x00C6, PlcValue::Uint(_) => 0x00C7, PlcValue::Udint(_) => 0x00C8, PlcValue::Ulint(_) => 0x00C9, PlcValue::Real(_) => 0x00CA, PlcValue::Lreal(_) => 0x00CB, PlcValue::String(_) => 0x00CE, PlcValue::Udt(_) => 0x00A0, }
}
}
#[derive(Clone)]
pub struct EipClient {
stream: Arc<Mutex<Box<dyn EtherNetIpStream>>>,
session_handle: u32,
_connection_id: u32,
tag_manager: Arc<Mutex<TagManager>>,
udt_manager: Arc<Mutex<UdtManager>>,
route_path: Option<RoutePath>,
_connected: Arc<AtomicBool>,
max_packet_size: u32,
last_activity: Arc<Mutex<Instant>>,
_session_timeout: Duration,
batch_config: BatchConfig,
connected_sessions: Arc<Mutex<HashMap<String, ConnectedSession>>>,
connection_sequence: Arc<Mutex<u32>>,
subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
tag_groups: Arc<Mutex<HashMap<String, TagGroupConfig>>>,
}
impl std::fmt::Debug for EipClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EipClient")
.field("session_handle", &self.session_handle)
.field("route_path", &self.route_path)
.field("max_packet_size", &self.max_packet_size)
.field("_session_timeout", &self._session_timeout)
.field("batch_config", &self.batch_config)
.field("stream", &"<stream>")
.field("tag_manager", &"<tag_manager>")
.field("udt_manager", &"<udt_manager>")
.field("connected_sessions", &"<connected_sessions>")
.field("subscriptions", &"<subscriptions>")
.field("tag_groups", &"<tag_groups>")
.finish()
}
}
impl EipClient {
async fn from_stream<S>(stream: S) -> Result<Self>
where
S: EtherNetIpStream + 'static,
{
let mut client = Self {
stream: Arc::new(Mutex::new(Box::new(stream))),
session_handle: 0,
_connection_id: 0,
tag_manager: Arc::new(Mutex::new(TagManager::new())),
udt_manager: Arc::new(Mutex::new(UdtManager::new())),
route_path: None,
_connected: Arc::new(AtomicBool::new(false)),
max_packet_size: 4000,
last_activity: Arc::new(Mutex::new(Instant::now())),
_session_timeout: Duration::from_secs(120),
batch_config: BatchConfig::default(),
connected_sessions: Arc::new(Mutex::new(HashMap::new())),
connection_sequence: Arc::new(Mutex::new(1)),
subscriptions: Arc::new(Mutex::new(Vec::new())),
tag_groups: Arc::new(Mutex::new(HashMap::new())),
};
client.register_session().await?;
client.negotiate_packet_size().await?;
Ok(client)
}
pub async fn new(addr: &str) -> Result<Self> {
let addr = addr
.parse::<SocketAddr>()
.map_err(|e| EtherNetIpError::Protocol(format!("Invalid address format: {e}")))?;
let stream = TcpStream::connect(addr).await?;
Self::from_stream(stream).await
}
pub async fn connect(addr: &str) -> Result<Self> {
Self::new(addr).await
}
async fn register_session(&mut self) -> crate::error::Result<()> {
tracing::debug!("Starting session registration...");
let packet: [u8; 28] = [
0x65, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, ];
tracing::trace!("Sending Register Session packet: {:02X?}", packet);
self.stream
.lock()
.await
.write_all(&packet)
.await
.map_err(|e| {
tracing::error!("Failed to send Register Session packet: {}", e);
EtherNetIpError::Io(e)
})?;
let mut buf = [0u8; 1024];
tracing::debug!("Waiting for Register Session response...");
let n = match timeout(
Duration::from_secs(5),
self.stream.lock().await.read(&mut buf),
)
.await
{
Ok(Ok(n)) => {
tracing::trace!("Received {} bytes in response", n);
n
}
Ok(Err(e)) => {
tracing::error!("Error reading response: {}", e);
return Err(EtherNetIpError::Io(e));
}
Err(_) => {
tracing::warn!("Timeout waiting for response");
return Err(EtherNetIpError::Timeout(Duration::from_secs(5)));
}
};
if n < 28 {
tracing::error!("Response too short: {} bytes (expected 28)", n);
return Err(EtherNetIpError::Protocol("Response too short".to_string()));
}
self.session_handle = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
tracing::debug!("Session handle: 0x{:08X}", self.session_handle);
let status = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
tracing::trace!("Status code: 0x{:08X}", status);
if status != 0 {
tracing::error!("Session registration failed with status: 0x{:08X}", status);
return Err(EtherNetIpError::Protocol(format!(
"Session registration failed with status: 0x{status:08X}"
)));
}
tracing::info!("Session registration successful");
Ok(())
}
pub fn set_max_packet_size(&mut self, size: u32) {
self.max_packet_size = size.min(4000);
}
pub async fn discover_tags(&mut self) -> crate::error::Result<()> {
let response = self
.send_cip_request(&self.build_list_tags_request())
.await?;
let cip_data = self.extract_cip_from_response(&response)?;
if let Err(e) = self.check_cip_error(&cip_data) {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Tag discovery failed: {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
e
)));
}
let tags = {
let tag_manager = self.tag_manager.lock().await;
tag_manager.parse_tag_list(&cip_data)?
};
tracing::debug!("Initial tag discovery found {} tags", tags.len());
let hierarchical_tags = {
let tag_manager = self.tag_manager.lock().await;
tag_manager.drill_down_tags(&tags).await?
};
tracing::debug!(
"After drill-down: {} total tags discovered",
hierarchical_tags.len()
);
{
let tag_manager = self.tag_manager.lock().await;
let mut cache = tag_manager.cache.write().unwrap();
for (name, metadata) in hierarchical_tags {
cache.insert(name, metadata);
}
}
Ok(())
}
pub async fn discover_udt_members(
&mut self,
udt_name: &str,
) -> crate::error::Result<Vec<(String, TagMetadata)>> {
let cip_request = {
let tag_manager = self.tag_manager.lock().await;
tag_manager.build_udt_definition_request(udt_name)?
};
let response = self.send_cip_request(&cip_request).await?;
let definition = {
let tag_manager = self.tag_manager.lock().await;
tag_manager.parse_udt_definition_response(&response, udt_name)?
};
{
let tag_manager = self.tag_manager.lock().await;
let mut definitions = tag_manager.udt_definitions.write().unwrap();
definitions.insert(udt_name.to_string(), definition.clone());
}
let mut members = Vec::new();
for member in &definition.members {
let member_name = member.name.clone();
let full_name = format!("{}.{}", udt_name, member_name);
let metadata = TagMetadata {
data_type: member.data_type,
scope: TagScope::Controller,
permissions: TagPermissions {
readable: true,
writable: true,
},
is_array: false,
dimensions: Vec::new(),
last_access: std::time::Instant::now(),
size: member.size,
array_info: None,
last_updated: std::time::Instant::now(),
};
members.push((full_name, metadata));
}
Ok(members)
}
pub async fn get_udt_definition_cached(&self, udt_name: &str) -> Option<UdtDefinition> {
let tag_manager = self.tag_manager.lock().await;
tag_manager.get_udt_definition_cached(udt_name)
}
pub async fn list_udt_definitions(&self) -> Vec<String> {
let tag_manager = self.tag_manager.lock().await;
tag_manager.list_udt_definitions()
}
pub async fn discover_tags_detailed(&mut self) -> crate::error::Result<Vec<TagAttributes>> {
let request = self.build_tag_list_request()?;
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
if let Err(e) = self.check_cip_error(&cip_data) {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Tag discovery failed: {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
e
)));
}
self.parse_tag_list_response(&cip_data)
}
pub async fn discover_program_tags(
&mut self,
program_name: &str,
) -> crate::error::Result<Vec<TagAttributes>> {
let request = self.build_program_tag_list_request(program_name)?;
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
if let Err(e) = self.check_cip_error(&cip_data) {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Program tag discovery failed for '{}': {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
program_name, e
)));
}
self.parse_tag_list_response(&cip_data)
}
pub async fn list_cached_tag_attributes(&self) -> Vec<String> {
self.udt_manager.lock().await.list_tag_attributes()
}
pub async fn clear_caches(&mut self) {
self.udt_manager.lock().await.clear_cache();
}
pub async fn with_route_path(addr: &str, route: RoutePath) -> crate::error::Result<Self> {
let mut client = Self::new(addr).await?;
client.set_route_path(route);
Ok(client)
}
pub async fn connect_with_stream<S>(stream: S, route: Option<RoutePath>) -> Result<Self>
where
S: EtherNetIpStream + 'static,
{
let mut client = Self::from_stream(stream).await?;
if let Some(route) = route {
client.set_route_path(route);
}
Ok(client)
}
pub fn set_route_path(&mut self, route: RoutePath) {
self.route_path = Some(route);
}
pub fn get_route_path(&self) -> Option<&RoutePath> {
self.route_path.as_ref()
}
pub fn clear_route_path(&mut self) {
self.route_path = None;
}
pub async fn get_tag_metadata(&self, tag_name: &str) -> Option<TagMetadata> {
let tag_manager = self.tag_manager.lock().await;
let cache = tag_manager.cache.read().unwrap();
let result = cache.get(tag_name).cloned();
result
}
pub async fn read_tag(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
self.validate_session().await?;
if let Some((base_name, index)) = self.parse_array_element_access(tag_name) {
if let Some(bracket_start) = tag_name.find('[') {
if let Some(bracket_end_rel) = tag_name[bracket_start..].find(']') {
let bracket_end_abs = bracket_start + bracket_end_rel;
let after_bracket = &tag_name[bracket_end_abs + 1..];
tracing::debug!(
"Array element detected for '{}': base='{}', index={}, after_bracket='{}'",
tag_name,
base_name,
index,
after_bracket
);
if !after_bracket.starts_with('.') {
tracing::debug!(
"Detected simple array element access: {}[{}], using workaround",
base_name,
index
);
return self.read_array_element_workaround(&base_name, index).await;
} else {
tracing::debug!(
"Array element '{}[{}]' has member access after bracket ('{}'), using TagPath::parse()",
base_name,
index,
after_bracket
);
}
}
}
}
let response = self
.send_cip_request(&self.build_read_request(tag_name))
.await?;
let cip_data = self.extract_cip_from_response(&response)?;
self.parse_cip_response(&cip_data)
}
pub async fn read_bit(&mut self, tag_base: &str, bit_index: u8) -> crate::error::Result<bool> {
if bit_index >= 32 {
return Err(crate::error::EtherNetIpError::Protocol(
"bit_index must be 0..32 for DINT bit access".to_string(),
));
}
let path = format!("{}.{}", tag_base, bit_index);
match self.read_tag(&path).await? {
PlcValue::Bool(b) => Ok(b),
PlcValue::Dint(n) => {
Ok((n >> bit_index) & 1 != 0)
}
other => Err(crate::error::EtherNetIpError::DataTypeMismatch {
expected: "BOOL or DINT".to_string(),
actual: format!("{:?}", other),
}),
}
}
pub async fn write_bit(
&mut self,
tag_base: &str,
bit_index: u8,
value: bool,
) -> crate::error::Result<()> {
if bit_index >= 32 {
return Err(crate::error::EtherNetIpError::Protocol(
"bit_index must be 0..32 for DINT bit access".to_string(),
));
}
let path = format!("{}.{}", tag_base, bit_index);
self.write_tag(&path, PlcValue::Bool(value)).await
}
fn parse_array_element_access(&self, tag_name: &str) -> Option<(String, u32)> {
if let Some(bracket_pos) = tag_name.rfind('[') {
if let Some(close_bracket_pos) = tag_name.rfind(']') {
if close_bracket_pos > bracket_pos {
let base_name = tag_name[..bracket_pos].to_string();
let index_str = &tag_name[bracket_pos + 1..close_bracket_pos];
if let Ok(index) = index_str.parse::<u32>() {
if !tag_name[..bracket_pos].contains('[') {
return Some((base_name, index));
}
}
}
}
}
None
}
async fn read_array_element_workaround(
&mut self,
base_array_name: &str,
index: u32,
) -> crate::error::Result<PlcValue> {
tracing::debug!(
"Reading array element '{}[{}]' using element addressing",
base_array_name,
index
);
let test_response = self
.send_cip_request(&self.build_read_request_with_count(base_array_name, 1))
.await?;
let test_cip_data = self.extract_cip_from_response(&test_response)?;
self.check_cip_error(&test_cip_data)?;
if test_cip_data.len() >= 6 {
let test_data_type = u16::from_le_bytes([test_cip_data[4], test_cip_data[5]]);
if test_data_type == 0x00D3 {
return self
.read_bool_array_element_workaround(base_array_name, index)
.await;
}
}
let request = self.build_read_array_request(base_array_name, index, 1);
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
self.check_cip_error(&cip_data)?;
self.parse_cip_response(&cip_data)
}
async fn read_bool_array_element_workaround(
&mut self,
base_array_name: &str,
index: u32,
) -> crate::error::Result<PlcValue> {
tracing::debug!(
"BOOL array detected - reading DWORD and extracting bit [{}]",
index
);
let response = self
.send_cip_request(&self.build_read_request_with_count(base_array_name, 1))
.await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() < 6 {
return Err(EtherNetIpError::Protocol(
"BOOL array response too short".to_string(),
));
}
self.check_cip_error(&cip_data)?;
let service_reply = cip_data[0];
if service_reply != 0xCC {
return Err(EtherNetIpError::Protocol(format!(
"Unexpected service reply: 0x{service_reply:02X}"
)));
}
let data_type = u16::from_le_bytes([cip_data[4], cip_data[5]]);
let value_data = if cip_data.len() >= 8 && data_type == 0x00D3 {
if cip_data.len() >= 12 {
&cip_data[8..]
} else if cip_data.len() >= 10 {
&cip_data[6..]
} else {
return Err(EtherNetIpError::Protocol(
"BOOL array response too short for data".to_string(),
));
}
} else {
if cip_data.len() < 8 {
return Err(EtherNetIpError::Protocol(
"BOOL array response too short".to_string(),
));
}
&cip_data[8..]
};
if value_data.len() < 4 {
return Err(EtherNetIpError::Protocol(format!(
"BOOL array data too short: need 4 bytes (DWORD), got {} bytes",
value_data.len()
)));
}
let dword_value =
u32::from_le_bytes([value_data[0], value_data[1], value_data[2], value_data[3]]);
let bit_index = (index % 32) as u8;
let bool_value = (dword_value >> bit_index) & 1 != 0;
Ok(PlcValue::Bool(bool_value))
}
async fn read_array_in_chunks(
&mut self,
base_array_name: &str,
data_type: u16,
start_index: u32,
target_element_count: u32,
) -> crate::error::Result<Vec<u8>> {
let element_size = match data_type {
0x00C1 => 1, 0x00C2 => 1, 0x00C3 => 2, 0x00C4 => 4, 0x00C5 => 8, 0x00C6 => 1, 0x00C7 => 2, 0x00C8 => 4, 0x00C9 => 8, 0x00CA => 4, 0x00CB => 8, _ => {
return Err(EtherNetIpError::Protocol(format!(
"Unsupported array data type for chunked reading: 0x{:04X}",
data_type
)));
}
};
let elements_per_chunk = match element_size {
1 => 30, 2 => 15, 4 => 8, 8 => 4, _ => 8,
};
let end_index = start_index
.checked_add(target_element_count)
.ok_or_else(|| EtherNetIpError::Protocol("Array range overflow".to_string()))?;
let mut all_data = Vec::new();
let mut next_chunk_start = start_index;
tracing::debug!(
"Reading array '{}' in chunks: {} elements per chunk, target: {} elements",
base_array_name,
elements_per_chunk,
target_element_count
);
while next_chunk_start < end_index {
let chunk_end = (next_chunk_start + elements_per_chunk as u32).min(end_index);
let chunk_size = (chunk_end - next_chunk_start) as u16;
tracing::trace!(
"Reading chunk: elements {} to {} ({} elements) using element addressing",
next_chunk_start,
chunk_end - 1,
chunk_size
);
let response = self
.send_cip_request(&self.build_read_array_request(
base_array_name,
next_chunk_start,
chunk_size,
))
.await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() < 8 {
if cip_data.len() >= 3 {
let general_status = cip_data[2];
if general_status != 0x00 {
let error_msg = self.get_cip_error_message(general_status);
return Err(EtherNetIpError::Protocol(format!(
"CIP Error {} when reading chunk (elements {} to {}): {}",
general_status,
next_chunk_start,
chunk_end - 1,
error_msg
)));
}
}
return Err(EtherNetIpError::Protocol(format!(
"Chunk response too short: got {} bytes, expected at least 8 (requested {} elements starting at {})",
cip_data.len(), chunk_size, next_chunk_start
)));
}
if cip_data.len() >= 3 {
let general_status = cip_data[2];
if general_status != 0x00 {
let error_msg = self.get_cip_error_message(general_status);
return Err(EtherNetIpError::Protocol(format!(
"CIP Error {} when reading chunk (elements {} to {}): {}",
general_status,
next_chunk_start,
chunk_end - 1,
error_msg
)));
}
}
if !cip_data.is_empty() && cip_data[0] != 0xCC {
return Err(EtherNetIpError::Protocol(format!(
"Unexpected service reply in chunk: 0x{:02X} (expected 0xCC)",
cip_data[0]
)));
}
if cip_data.len() < 6 {
return Err(EtherNetIpError::Protocol(format!(
"Chunk response too short for data type: got {} bytes, expected at least 6",
cip_data.len()
)));
}
let chunk_data_type = u16::from_le_bytes([cip_data[4], cip_data[5]]);
if chunk_data_type != data_type {
return Err(EtherNetIpError::Protocol(format!(
"Data type mismatch in chunk: expected 0x{:04X}, got 0x{:04X}",
data_type, chunk_data_type
)));
}
let value_data_start = if cip_data.len() >= 8 {
8
} else {
6
};
let chunk_value_data = &cip_data[value_data_start..];
let chunk_complete_bytes = (chunk_value_data.len() / element_size) * element_size;
let chunk_data = &chunk_value_data[..chunk_complete_bytes];
if !chunk_data.is_empty() {
all_data.extend_from_slice(chunk_data);
let elements_received = chunk_data.len() / element_size;
next_chunk_start += elements_received as u32;
tracing::trace!(
"Chunk read: {} elements ({} bytes) starting at index {}, total so far: {} elements",
elements_received,
chunk_data.len(),
next_chunk_start - elements_received as u32,
all_data.len() / element_size
);
if next_chunk_start >= end_index {
tracing::trace!(
"Reached target element count ({}), stopping chunked read",
target_element_count
);
break;
}
} else {
break;
}
}
let final_element_count = all_data.len() / element_size;
tracing::debug!(
"Chunked read complete: {} total elements ({} bytes), target was {} elements",
final_element_count,
all_data.len(),
target_element_count
);
if final_element_count < target_element_count as usize {
return Err(EtherNetIpError::Protocol(format!(
"Incomplete array read: requested {} elements, received {}",
target_element_count, final_element_count
)));
}
Ok(all_data)
}
fn array_element_size(data_type: u16) -> Option<usize> {
match data_type {
0x00C1 => Some(1), 0x00C2 => Some(1), 0x00C3 => Some(2), 0x00C4 => Some(4), 0x00C5 => Some(8), 0x00C6 => Some(1), 0x00C7 => Some(2), 0x00C8 => Some(4), 0x00C9 => Some(8), 0x00CA => Some(4), 0x00CB => Some(8), _ => None,
}
}
fn decode_array_bytes(
&self,
data_type: u16,
bytes: &[u8],
) -> crate::error::Result<Vec<PlcValue>> {
let Some(element_size) = Self::array_element_size(data_type) else {
return Err(EtherNetIpError::Protocol(format!(
"Unsupported data type for array decoding: 0x{:04X}",
data_type
)));
};
if bytes.len() % element_size != 0 {
return Err(EtherNetIpError::Protocol(format!(
"Array payload length {} is not aligned to element size {}",
bytes.len(),
element_size
)));
}
let mut values = Vec::with_capacity(bytes.len() / element_size);
for chunk in bytes.chunks_exact(element_size) {
let value = match data_type {
0x00C1 => PlcValue::Bool(chunk[0] != 0),
0x00C2 => PlcValue::Sint(chunk[0] as i8),
0x00C3 => PlcValue::Int(i16::from_le_bytes([chunk[0], chunk[1]])),
0x00C4 => {
PlcValue::Dint(i32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
}
0x00C5 => PlcValue::Lint(i64::from_le_bytes([
chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
])),
0x00C6 => PlcValue::Usint(chunk[0]),
0x00C7 => PlcValue::Uint(u16::from_le_bytes([chunk[0], chunk[1]])),
0x00C8 => {
PlcValue::Udint(u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
}
0x00C9 => PlcValue::Ulint(u64::from_le_bytes([
chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
])),
0x00CA => {
PlcValue::Real(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
}
0x00CB => PlcValue::Lreal(f64::from_le_bytes([
chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
])),
_ => unreachable!("validated by array_element_size"),
};
values.push(value);
}
Ok(values)
}
pub async fn read_array_range(
&mut self,
base_array_name: &str,
start_index: u32,
element_count: u32,
) -> crate::error::Result<Vec<PlcValue>> {
if element_count == 0 {
return Ok(Vec::new());
}
let probe_response = self
.send_cip_request(&self.build_read_array_request(base_array_name, start_index, 1))
.await?;
let probe_cip = self.extract_cip_from_response(&probe_response)?;
self.check_cip_error(&probe_cip)?;
if probe_cip.len() < 6 {
return Err(EtherNetIpError::Protocol(
"Array probe response too short".to_string(),
));
}
let data_type = u16::from_le_bytes([probe_cip[4], probe_cip[5]]);
let raw = self
.read_array_in_chunks(base_array_name, data_type, start_index, element_count)
.await?;
let values = self.decode_array_bytes(data_type, &raw)?;
if values.len() != element_count as usize {
return Err(EtherNetIpError::Protocol(format!(
"Array read count mismatch: requested {}, got {}",
element_count,
values.len()
)));
}
Ok(values)
}
async fn write_array_element_workaround(
&mut self,
base_array_name: &str,
index: u32,
value: PlcValue,
) -> crate::error::Result<()> {
tracing::debug!(
"Writing to array element '{}[{}]' using element addressing",
base_array_name,
index
);
let test_response = self
.send_cip_request(&self.build_read_request_with_count(base_array_name, 1))
.await?;
let test_cip_data = self.extract_cip_from_response(&test_response)?;
if test_cip_data.len() < 3 {
return Err(EtherNetIpError::Protocol(
"Test read response too short".to_string(),
));
}
if let Err(e) = self.check_cip_error(&test_cip_data) {
return Err(EtherNetIpError::Protocol(format!(
"Cannot write to array element: Test read failed: {}",
e
)));
}
if test_cip_data.len() < 6 {
return Err(EtherNetIpError::Protocol(
"Test read response too short to determine data type".to_string(),
));
}
let test_data_type = u16::from_le_bytes([test_cip_data[4], test_cip_data[5]]);
if test_data_type == 0x00D3 {
return self
.write_bool_array_element_workaround(base_array_name, index, value)
.await;
}
let data_type = test_data_type;
let value_bytes = value.to_bytes();
let request = self.build_write_array_request_with_index(
base_array_name,
index,
1, data_type,
&value_bytes,
)?;
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
self.check_cip_error(&cip_data)?;
tracing::info!("Array element write completed successfully");
Ok(())
}
async fn write_bool_array_element_workaround(
&mut self,
base_array_name: &str,
index: u32,
value: PlcValue,
) -> crate::error::Result<()> {
tracing::debug!(
"BOOL array element write - reading DWORD, modifying bit [{}], writing back",
index
);
let response = self
.send_cip_request(&self.build_read_request_with_count(base_array_name, 1))
.await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() < 10 {
return Err(EtherNetIpError::Protocol(
"BOOL array response too short".to_string(),
));
}
self.check_cip_error(&cip_data)?;
let service_reply = cip_data[0];
if service_reply != 0xCC {
return Err(EtherNetIpError::Protocol(format!(
"Unexpected service reply: 0x{service_reply:02X}"
)));
}
let data_type = u16::from_le_bytes([cip_data[4], cip_data[5]]);
let value_data = if cip_data.len() >= 10 {
&cip_data[6..10]
} else {
return Err(EtherNetIpError::Protocol(
"BOOL array data too short".to_string(),
));
};
let bool_value = match value {
PlcValue::Bool(b) => b,
_ => {
return Err(EtherNetIpError::Protocol(
"Expected BOOL value for BOOL array element".to_string(),
))
}
};
let mut dword_value =
u32::from_le_bytes([value_data[0], value_data[1], value_data[2], value_data[3]]);
let bit_index = (index % 32) as u8;
if bool_value {
dword_value |= 1u32 << bit_index;
} else {
dword_value &= !(1u32 << bit_index);
}
tracing::trace!(
"Modified BOOL[{}] in DWORD: 0x{:08X} -> 0x{:08X} (bit {} = {})",
index,
u32::from_le_bytes([value_data[0], value_data[1], value_data[2], value_data[3]]),
dword_value,
bit_index,
bool_value
);
let write_request = self.build_write_request_with_data(
base_array_name,
data_type,
1,
&dword_value.to_le_bytes(),
)?;
let write_response = self.send_cip_request(&write_request).await?;
let write_cip_data = self.extract_cip_from_response(&write_response)?;
self.check_cip_error(&write_cip_data)?;
tracing::info!("BOOL array element write completed successfully");
Ok(())
}
#[allow(dead_code)]
fn build_write_array_request(
&self,
tag_name: &str,
data_type: u16,
element_count: u16,
data: &[u8],
) -> crate::error::Result<Vec<u8>> {
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let path = self.build_tag_path(tag_name);
cip_request.push((path.len() / 2) as u8);
cip_request.extend_from_slice(&path);
cip_request.extend_from_slice(&data_type.to_le_bytes());
cip_request.extend_from_slice(&element_count.to_le_bytes());
cip_request.extend_from_slice(data);
Ok(cip_request)
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn build_write_array_request_with_index(
&self,
base_array_name: &str,
start_index: u32,
element_count: u16,
data_type: u16,
data: &[u8],
) -> crate::error::Result<Vec<u8>> {
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let mut full_path = self.build_base_tag_path(base_array_name);
full_path.extend_from_slice(&self.build_element_id_segment(start_index));
if full_path.len() % 2 != 0 {
full_path.push(0x00);
}
let path_size = (full_path.len() / 2) as u8;
cip_request.push(path_size);
cip_request.extend_from_slice(&full_path);
cip_request.extend_from_slice(&data_type.to_le_bytes());
cip_request.extend_from_slice(&element_count.to_le_bytes());
cip_request.extend_from_slice(data);
Ok(cip_request)
}
fn build_write_request_with_data(
&self,
tag_name: &str,
data_type: u16,
element_count: u16,
data: &[u8],
) -> crate::error::Result<Vec<u8>> {
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let path = self.build_tag_path(tag_name);
cip_request.push((path.len() / 2) as u8);
cip_request.extend_from_slice(&path);
cip_request.extend_from_slice(&data_type.to_le_bytes());
cip_request.extend_from_slice(&element_count.to_le_bytes());
cip_request.extend_from_slice(data);
Ok(cip_request)
}
pub async fn read_udt_chunked(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
self.validate_session().await?;
tracing::debug!("[CHUNKED] Starting advanced UDT reading for: {}", tag_name);
match self.read_tag(tag_name).await {
Ok(value) => {
tracing::debug!("[CHUNKED] Normal read successful");
return Ok(value);
}
Err(crate::error::EtherNetIpError::Protocol(msg))
if msg.contains("Partial transfer") =>
{
tracing::debug!("[CHUNKED] Partial transfer detected, using advanced chunking");
}
Err(e) => {
tracing::warn!("[CHUNKED] Normal read failed: {}", e);
return Err(e);
}
}
self.read_udt_advanced_chunked(tag_name).await
}
async fn read_udt_advanced_chunked(
&mut self,
tag_name: &str,
) -> crate::error::Result<PlcValue> {
tracing::debug!("[ADVANCED] Using multiple strategies for large UDT");
let chunk_sizes = vec![512, 256, 128, 64, 32, 16, 8, 4];
for chunk_size in chunk_sizes {
tracing::trace!("[ADVANCED] Trying chunk size: {}", chunk_size);
match self.read_udt_with_chunk_size(tag_name, chunk_size).await {
Ok(udt_value) => {
tracing::debug!("[ADVANCED] Success with chunk size {}", chunk_size);
return Ok(udt_value);
}
Err(e) => {
tracing::trace!("[ADVANCED] Chunk size {} failed: {}", chunk_size, e);
continue;
}
}
}
tracing::debug!("[ADVANCED] Trying member-by-member discovery");
match self.read_udt_member_discovery(tag_name).await {
Ok(udt_value) => {
tracing::debug!("[ADVANCED] Member discovery successful");
return Ok(udt_value);
}
Err(e) => {
tracing::warn!("[ADVANCED] Member discovery failed: {}", e);
}
}
tracing::debug!("[ADVANCED] Trying progressive reading");
match self.read_udt_progressive(tag_name).await {
Ok(udt_value) => {
tracing::debug!("[ADVANCED] Progressive reading successful");
return Ok(udt_value);
}
Err(e) => {
tracing::warn!("[ADVANCED] Progressive reading failed: {}", e);
}
}
tracing::warn!("[ADVANCED] All strategies failed, using fallback");
let symbol_id = self
.get_tag_attributes(tag_name)
.await
.ok()
.and_then(|attr| attr.template_instance_id)
.unwrap_or(0) as i32;
Ok(PlcValue::Udt(UdtData {
symbol_id,
data: vec![], }))
}
async fn read_udt_with_chunk_size(
&mut self,
tag_name: &str,
mut chunk_size: usize,
) -> crate::error::Result<PlcValue> {
let mut all_data = Vec::new();
let mut offset = 0;
let mut consecutive_failures = 0;
const MAX_FAILURES: usize = 3;
loop {
match self
.read_udt_chunk_advanced(tag_name, offset, chunk_size)
.await
{
Ok(chunk_data) => {
if chunk_data.is_empty() {
break; }
all_data.extend_from_slice(&chunk_data);
offset += chunk_data.len();
consecutive_failures = 0;
tracing::trace!(
"[CHUNK] Read {} bytes at offset {}, total: {}",
chunk_data.len(),
offset - chunk_data.len(),
all_data.len()
);
if chunk_data.len() < chunk_size {
break;
}
}
Err(e) => {
consecutive_failures += 1;
tracing::warn!(
"[CHUNK] Chunk read failed (attempt {}): {}",
consecutive_failures,
e
);
if consecutive_failures >= MAX_FAILURES {
break;
}
if chunk_size > 4 {
chunk_size /= 2;
continue;
}
}
}
}
if all_data.is_empty() {
return Err(crate::error::EtherNetIpError::Protocol(
"No data read from UDT".to_string(),
));
}
tracing::debug!("[CHUNK] Total data collected: {} bytes", all_data.len());
let symbol_id = self
.get_tag_attributes(tag_name)
.await
.ok()
.and_then(|attr| attr.template_instance_id)
.unwrap_or(0) as i32;
Ok(PlcValue::Udt(UdtData {
symbol_id,
data: all_data,
}))
}
async fn read_udt_chunk_advanced(
&mut self,
tag_name: &str,
offset: usize,
size: usize,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4C);
let tag_path = self.build_tag_path(tag_name);
let path_size = (tag_path.len() / 2) as u8;
request.push(path_size);
request.extend_from_slice(&tag_path);
if offset > 0 {
request.push(0x28); request.push(0x02); request.extend_from_slice(&(offset as u16).to_le_bytes());
}
request.push(0x28); request.push(0x02); request.extend_from_slice(&(size as u16).to_le_bytes());
request.push(0x00);
request.push(0x01);
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() < 2 {
return Ok(Vec::new()); }
let _data_type = u16::from_le_bytes([cip_data[0], cip_data[1]]);
let data = &cip_data[2..];
Ok(data.to_vec())
}
async fn read_udt_member_discovery(
&mut self,
tag_name: &str,
) -> crate::error::Result<PlcValue> {
tracing::debug!("[DISCOVERY] Reading UDT as raw data for: {}", tag_name);
let attributes = self.get_tag_attributes(tag_name).await?;
let symbol_id = attributes.template_instance_id.ok_or_else(|| {
crate::error::EtherNetIpError::Protocol(
"UDT template instance ID not found in tag attributes".to_string(),
)
})?;
let raw_data = self.read_tag_raw(tag_name).await?;
tracing::debug!(
"[DISCOVERY] Read {} bytes of UDT data with symbol_id: {}",
raw_data.len(),
symbol_id
);
Ok(PlcValue::Udt(UdtData {
symbol_id: symbol_id as i32,
data: raw_data,
}))
}
async fn read_udt_progressive(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
tracing::debug!("[PROGRESSIVE] Starting progressive reading");
let mut chunk_size = 4;
let mut all_data = Vec::new();
let mut offset = 0;
while chunk_size <= 512 {
match self
.read_udt_chunk_advanced(tag_name, offset, chunk_size)
.await
{
Ok(chunk_data) => {
if chunk_data.is_empty() {
break;
}
all_data.extend_from_slice(&chunk_data);
offset += chunk_data.len();
tracing::trace!(
"[PROGRESSIVE] Read {} bytes with chunk size {}",
chunk_data.len(),
chunk_size
);
if chunk_data.len() == chunk_size {
chunk_size = (chunk_size * 2).min(512);
}
}
Err(_) => {
chunk_size /= 2;
if chunk_size < 4 {
break;
}
}
}
}
if all_data.is_empty() {
return Err(crate::error::EtherNetIpError::Protocol(
"Progressive reading failed".to_string(),
));
}
tracing::debug!("[PROGRESSIVE] Collected {} bytes total", all_data.len());
let symbol_id = self
.get_tag_attributes(tag_name)
.await
.ok()
.and_then(|attr| attr.template_instance_id)
.unwrap_or(0) as i32;
Ok(PlcValue::Udt(UdtData {
symbol_id,
data: all_data,
}))
}
#[allow(dead_code)]
async fn read_udt_in_chunks(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
const MAX_CHUNK_SIZE: usize = 1000; let mut all_data = Vec::new();
let mut offset = 0;
let mut chunk_size = MAX_CHUNK_SIZE;
loop {
match self.read_udt_chunk(tag_name, offset, chunk_size).await {
Ok(chunk_data) => {
all_data.extend_from_slice(&chunk_data);
offset += chunk_data.len();
if chunk_data.len() < chunk_size {
break;
}
}
Err(crate::error::EtherNetIpError::Protocol(msg))
if msg.contains("Partial transfer") =>
{
chunk_size /= 2;
if chunk_size < 100 {
return Err(crate::error::EtherNetIpError::Protocol(
"UDT too large even for smallest chunk size".to_string(),
));
}
continue;
}
Err(e) => return Err(e),
}
}
let symbol_id = self
.get_tag_attributes(tag_name)
.await
.ok()
.and_then(|attr| attr.template_instance_id)
.unwrap_or(0) as i32;
Ok(PlcValue::Udt(UdtData {
symbol_id,
data: all_data,
}))
}
#[allow(dead_code)]
async fn read_udt_chunk(
&mut self,
tag_name: &str,
offset: usize,
size: usize,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4C);
let path_size = 2 + (tag_name.len() + 1) / 2; request.push(path_size as u8);
request.extend_from_slice(tag_name.as_bytes());
if tag_name.len() % 2 != 0 {
request.push(0); }
request.push(0x28); request.push(0x02); request.extend_from_slice(&(offset as u16).to_le_bytes());
request.push(0x28); request.push(0x02); request.extend_from_slice(&(size as u16).to_le_bytes());
request.push(0x00);
request.push(0x01);
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() < 2 {
return Err(crate::error::EtherNetIpError::Protocol(
"Response too short".to_string(),
));
}
let _data_type = u16::from_le_bytes([cip_data[0], cip_data[1]]);
let data = &cip_data[2..];
Ok(data.to_vec())
}
pub async fn read_udt_member_by_offset(
&mut self,
udt_name: &str,
member_offset: usize,
member_size: usize,
data_type: u16,
) -> crate::error::Result<PlcValue> {
self.validate_session().await?;
let udt_data = self.read_tag_raw(udt_name).await?;
if member_offset + member_size > udt_data.len() {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Member data incomplete: offset {} + size {} > UDT size {}",
member_offset,
member_size,
udt_data.len()
)));
}
let member_data = &udt_data[member_offset..member_offset + member_size];
let member = crate::udt::UdtMember {
name: "temp".to_string(),
data_type,
offset: member_offset as u32,
size: member_size as u32,
};
let udt = crate::udt::UserDefinedType::new(udt_name.to_string());
udt.parse_member_value(&member, member_data)
}
pub async fn write_udt_member_by_offset(
&mut self,
udt_name: &str,
member_offset: usize,
member_size: usize,
data_type: u16,
value: PlcValue,
) -> crate::error::Result<()> {
self.validate_session().await?;
let mut udt_data = self.read_tag_raw(udt_name).await?;
if member_offset + member_size > udt_data.len() {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Member data incomplete: offset {} + size {} > UDT size {}",
member_offset,
member_size,
udt_data.len()
)));
}
let member = crate::udt::UdtMember {
name: "temp".to_string(),
data_type,
offset: member_offset as u32,
size: member_size as u32,
};
let udt = crate::udt::UserDefinedType::new(udt_name.to_string());
let member_data = udt.serialize_member_value(&member, &value)?;
let end_offset = member_offset + member_data.len();
if end_offset <= udt_data.len() {
udt_data[member_offset..end_offset].copy_from_slice(&member_data);
} else {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Member data exceeds UDT size: {} > {}",
end_offset,
udt_data.len()
)));
}
self.write_tag_raw(udt_name, &udt_data).await
}
pub async fn get_udt_definition(
&mut self,
udt_name: &str,
) -> crate::error::Result<UdtDefinition> {
if let Some(cached) = self.udt_manager.lock().await.get_definition(udt_name) {
return Ok(cached.clone());
}
let attributes = self.get_tag_attributes(udt_name).await?;
if attributes.data_type != 0x00A0 {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Tag '{}' is not a UDT (type: {})",
udt_name, attributes.data_type_name
)));
}
let template_id = attributes.template_instance_id.ok_or_else(|| {
crate::error::EtherNetIpError::Protocol(
"UDT template instance ID not found".to_string(),
)
})?;
let template_data = self.read_udt_template(template_id).await?;
let template = self
.udt_manager
.lock()
.await
.parse_udt_template(template_id, &template_data)?;
let definition = UdtDefinition {
name: udt_name.to_string(),
members: template.members,
};
self.udt_manager
.lock()
.await
.add_definition(definition.clone());
Ok(definition)
}
pub async fn get_tag_attributes(
&mut self,
tag_name: &str,
) -> crate::error::Result<TagAttributes> {
if let Some(cached) = self.udt_manager.lock().await.get_tag_attributes(tag_name) {
return Ok(cached.clone());
}
let request = self.build_get_attributes_request(tag_name)?;
let response = self.send_cip_request(&request).await?;
let attributes = self.parse_attributes_response(tag_name, &response)?;
self.udt_manager
.lock()
.await
.add_tag_attributes(attributes.clone());
Ok(attributes)
}
async fn read_udt_template(&mut self, template_id: u32) -> crate::error::Result<Vec<u8>> {
let request = self.build_read_template_request(template_id)?;
let response = self.send_cip_request(&request).await?;
self.parse_template_response(&response)
}
fn build_get_attributes_request(&self, tag_name: &str) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x03);
let tag_bytes = tag_name.as_bytes();
request.push(0x91); request.push(tag_bytes.len() as u8);
request.extend_from_slice(tag_bytes);
request.extend_from_slice(&[0x02, 0x00]);
request.extend_from_slice(&[0x01, 0x00]);
request.extend_from_slice(&[0x02, 0x00]);
Ok(request)
}
fn build_read_template_request(&self, template_id: u32) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4C);
request.push(0x20); request.extend_from_slice(&[0x02, 0x00]); request.push(0x24); request.extend_from_slice(&template_id.to_le_bytes());
request.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); request.extend_from_slice(&[0xFF, 0xFF, 0x00, 0x00]);
Ok(request)
}
fn parse_attributes_response(
&self,
tag_name: &str,
response: &[u8],
) -> crate::error::Result<TagAttributes> {
if response.len() < 8 {
return Err(crate::error::EtherNetIpError::Protocol(
"Attributes response too short".to_string(),
));
}
let mut offset = 0;
let data_type = u16::from_le_bytes([response[offset], response[offset + 1]]);
offset += 2;
let size = u32::from_le_bytes([
response[offset],
response[offset + 1],
response[offset + 2],
response[offset + 3],
]);
offset += 4;
let template_instance_id = if response.len() > offset + 4 {
Some(u32::from_le_bytes([
response[offset],
response[offset + 1],
response[offset + 2],
response[offset + 3],
]))
} else {
None
};
let attributes = TagAttributes {
name: tag_name.to_string(),
data_type,
data_type_name: self.get_data_type_name(data_type),
dimensions: Vec::new(), permissions: udt::TagPermissions::ReadWrite, scope: if tag_name.contains(':') {
let parts: Vec<&str> = tag_name.split(':').collect();
if parts.len() >= 2 {
udt::TagScope::Program(parts[0].to_string())
} else {
udt::TagScope::Controller
}
} else {
udt::TagScope::Controller
},
template_instance_id,
size,
};
Ok(attributes)
}
fn parse_template_response(&self, response: &[u8]) -> crate::error::Result<Vec<u8>> {
if response.len() < 4 {
return Err(crate::error::EtherNetIpError::Protocol(
"Template response too short".to_string(),
));
}
let data_start = 4; Ok(response[data_start..].to_vec())
}
fn get_data_type_name(&self, data_type: u16) -> String {
match data_type {
0x00C1 => "BOOL".to_string(),
0x00C2 => "SINT".to_string(),
0x00C3 => "INT".to_string(),
0x00C4 => "DINT".to_string(),
0x00C5 => "LINT".to_string(),
0x00C6 => "USINT".to_string(),
0x00C7 => "UINT".to_string(),
0x00C8 => "UDINT".to_string(),
0x00C9 => "ULINT".to_string(),
0x00CA => "REAL".to_string(),
0x00CB => "LREAL".to_string(),
0x00CE => "STRING".to_string(),
0x00A0 => "UDT".to_string(),
_ => format!("UNKNOWN(0x{:04X})", data_type),
}
}
fn build_tag_list_request(&self) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x55);
request.push(0x20); request.extend_from_slice(&[0x6B, 0x00]); request.push(0x25); request.extend_from_slice(&[0x00, 0x00]);
request.extend_from_slice(&[0x02, 0x00]);
request.extend_from_slice(&[0x01, 0x00]);
request.extend_from_slice(&[0x02, 0x00]);
Ok(request)
}
fn build_program_tag_list_request(&self, _program_name: &str) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x55);
request.push(0x20); request.extend_from_slice(&[0x6C, 0x00]); request.push(0x24); request.extend_from_slice(&[0x00, 0x00]);
request.extend_from_slice(&[0x02, 0x00]);
request.extend_from_slice(&[0x01, 0x00]);
request.extend_from_slice(&[0x02, 0x00]);
Ok(request)
}
fn parse_tag_list_response(&self, response: &[u8]) -> crate::error::Result<Vec<TagAttributes>> {
if response.len() < 4 {
return Err(crate::error::EtherNetIpError::Protocol(
"Tag list response too short".to_string(),
));
}
let mut offset = 0;
let mut tags = Vec::new();
offset += 4;
while offset < response.len() {
if offset + 8 > response.len() {
break; }
let name_length = u16::from_le_bytes([response[offset], response[offset + 1]]) as usize;
offset += 2;
if offset
.checked_add(name_length)
.map_or(true, |end| end > response.len())
{
break; }
let name_bytes = &response[offset..offset + name_length];
let tag_name = String::from_utf8_lossy(name_bytes).to_string();
offset += name_length;
offset = (offset + 3) & !3;
if offset + 2 > response.len() {
break; }
let data_type = u16::from_le_bytes([response[offset], response[offset + 1]]);
offset += 2;
let attributes = TagAttributes {
name: tag_name,
data_type,
data_type_name: self.get_data_type_name(data_type),
dimensions: Vec::new(), permissions: udt::TagPermissions::ReadWrite, scope: udt::TagScope::Controller, template_instance_id: if data_type == 0x00A0 { Some(0) } else { None },
size: 0, };
tags.push(attributes);
}
Ok(tags)
}
async fn negotiate_packet_size(&mut self) -> crate::error::Result<()> {
let mut request = vec![
0x03, 0x02, 0x20, 0x02, 0x24, 0x01, ];
request.extend_from_slice(&[0x01, 0x00]); request.extend_from_slice(&[0x04, 0x00]);
let response = self.send_cip_request(&request).await?;
let cip_data = self.extract_cip_from_response(&response)?;
if cip_data.len() >= 12 && cip_data[2] == 0x00 {
let max_packet_size = u16::from_le_bytes([cip_data[10], cip_data[11]]) as u32;
self.max_packet_size = max_packet_size.clamp(504, 4000);
tracing::debug!("Negotiated packet size: {} bytes", self.max_packet_size);
} else {
self.max_packet_size = 4000;
tracing::debug!("Using default packet size: {} bytes", self.max_packet_size);
}
Ok(())
}
pub async fn write_tag(&mut self, tag_name: &str, value: PlcValue) -> crate::error::Result<()> {
tracing::debug!(
"Writing '{}' to tag '{}'",
match &value {
PlcValue::String(s) => format!("\"{s}\""),
_ => format!("{value:?}"),
},
tag_name
);
let value = if let PlcValue::Udt(udt_data) = &value {
if udt_data.symbol_id == 0 {
tracing::debug!("[UDT WRITE] symbol_id is 0, reading tag to get symbol_id");
let attributes = self.get_tag_attributes(tag_name).await?;
let symbol_id = attributes.template_instance_id.ok_or_else(|| {
crate::error::EtherNetIpError::Protocol(
"UDT template instance ID not found. Cannot write UDT without symbol_id."
.to_string(),
)
})? as i32;
PlcValue::Udt(UdtData {
symbol_id,
data: udt_data.data.clone(),
})
} else {
value
}
} else {
value
};
if let Some((base_name, index)) = self.parse_array_element_access(tag_name) {
tracing::debug!(
"Detected array element write: {}[{}], using workaround",
base_name,
index
);
return self
.write_array_element_workaround(&base_name, index, value)
.await;
}
let cip_request = self.build_write_request(tag_name, &value)?;
let response = self.send_cip_request(&cip_request).await?;
let cip_response = self.extract_cip_from_response(&response)?;
if cip_response.len() < 3 {
return Err(EtherNetIpError::Protocol(
"Write response too short".to_string(),
));
}
let service_reply = cip_response[0]; let general_status = cip_response[2];
tracing::trace!(
"Write response - Service: 0x{:02X}, Status: 0x{:02X}",
service_reply,
general_status
);
if let Err(e) = self.check_cip_error(&cip_response) {
tracing::error!("[WRITE] CIP Error: {}", e);
return Err(e);
}
tracing::info!("Write operation completed successfully");
Ok(())
}
fn _build_ab_string_write_request(
&self,
tag_name: &str,
value: &PlcValue,
) -> crate::error::Result<Vec<u8>> {
if let PlcValue::String(string_value) = value {
tracing::debug!(
"Building correct Allen-Bradley string write request for tag: '{}'",
tag_name
);
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let tag_bytes = tag_name.as_bytes();
let path_len = if tag_bytes.len() % 2 == 0 {
tag_bytes.len() + 2
} else {
tag_bytes.len() + 3
} / 2;
cip_request.push(path_len as u8);
cip_request.push(0x91); cip_request.push(tag_bytes.len() as u8);
cip_request.extend_from_slice(tag_bytes);
if tag_bytes.len() % 2 != 0 {
cip_request.push(0x00);
}
cip_request.extend_from_slice(&[0xA0, 0x02]);
cip_request.extend_from_slice(&[0x01, 0x00]);
let string_bytes = string_value.as_bytes();
let max_len: u16 = 82; let current_len = string_bytes.len().min(max_len as usize) as u16;
cip_request.extend_from_slice(¤t_len.to_le_bytes());
cip_request.extend_from_slice(&max_len.to_le_bytes());
let mut data_array = vec![0u8; max_len as usize];
data_array[..current_len as usize]
.copy_from_slice(&string_bytes[..current_len as usize]);
cip_request.extend_from_slice(&data_array);
tracing::trace!(
"Built correct AB string write request ({} bytes): len={}, maxlen={}, data_len={}",
cip_request.len(),
current_len,
max_len,
string_bytes.len()
);
tracing::trace!(
"First 32 bytes: {:02X?}",
&cip_request[..std::cmp::min(32, cip_request.len())]
);
Ok(cip_request)
} else {
Err(EtherNetIpError::Protocol(
"Expected string value for Allen-Bradley string write".to_string(),
))
}
}
fn build_write_request(
&self,
tag_name: &str,
value: &PlcValue,
) -> crate::error::Result<Vec<u8>> {
tracing::debug!("Building write request for tag: '{}'", tag_name);
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let path = self.build_tag_path(tag_name);
cip_request.push((path.len() / 2) as u8);
cip_request.extend_from_slice(&path);
let data_type = if let PlcValue::Udt(udt_data) = value {
0x02A0u16.wrapping_add(udt_data.symbol_id as u16)
} else {
value.get_data_type()
};
let value_bytes = value.to_bytes();
cip_request.extend_from_slice(&data_type.to_le_bytes()); cip_request.extend_from_slice(&[0x01, 0x00]); cip_request.extend_from_slice(&value_bytes);
tracing::trace!(
"Built CIP write request ({} bytes): {:02X?}",
cip_request.len(),
cip_request
);
Ok(cip_request)
}
fn build_write_request_raw(
&self,
tag_name: &str,
data: &[u8],
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4D);
request.push(0x00);
let tag_path = self.build_tag_path(tag_name);
request.extend(tag_path);
request.extend(data);
Ok(request)
}
#[allow(dead_code)]
fn serialize_value(&self, value: &PlcValue) -> crate::error::Result<Vec<u8>> {
let mut data = Vec::new();
match value {
PlcValue::Bool(v) => {
data.extend(&0x00C1u16.to_le_bytes()); data.push(if *v { 0xFF } else { 0x00 });
}
PlcValue::Sint(v) => {
data.extend(&0x00C2u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Int(v) => {
data.extend(&0x00C3u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Dint(v) => {
data.extend(&0x00C4u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Lint(v) => {
data.extend(&0x00C5u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Usint(v) => {
data.extend(&0x00C6u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Uint(v) => {
data.extend(&0x00C7u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Udint(v) => {
data.extend(&0x00C8u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Ulint(v) => {
data.extend(&0x00C9u16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Real(v) => {
data.extend(&0x00CAu16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::Lreal(v) => {
data.extend(&0x00CBu16.to_le_bytes()); data.extend(&v.to_le_bytes());
}
PlcValue::String(v) => {
data.extend(&0x00CEu16.to_le_bytes());
let length = v.len().min(82) as u32;
data.extend_from_slice(&length.to_le_bytes());
let string_bytes = v.as_bytes();
let data_len = string_bytes.len().min(82);
data.extend_from_slice(&string_bytes[..data_len]);
let remaining_chars = 82 - data_len;
data.extend(vec![0u8; remaining_chars]);
}
PlcValue::Udt(_) => {
data.extend(&0x00A0u16.to_le_bytes()); }
}
Ok(data)
}
pub fn build_list_tags_request(&self) -> Vec<u8> {
tracing::debug!("Building list tags request");
let path_array = vec![
0x20, 0x6B, 0x25, 0x00, 0x00, 0x00,
];
let request_data = vec![0x02, 0x00, 0x01, 0x00, 0x02, 0x00];
let mut cip_request = Vec::new();
cip_request.push(0x55);
cip_request.push((path_array.len() / 2) as u8);
cip_request.extend_from_slice(&path_array);
cip_request.extend_from_slice(&request_data);
tracing::trace!(
"Built CIP list tags request ({} bytes): {:02X?}",
cip_request.len(),
cip_request
);
cip_request
}
fn parse_extended_error(&self, cip_data: &[u8]) -> crate::error::Result<String> {
if cip_data.len() < 6 {
return Err(EtherNetIpError::Protocol(
"Extended error response too short".to_string(),
));
}
let additional_status_size = cip_data[3] as usize; if additional_status_size == 0 || cip_data.len() < 4 + (additional_status_size * 2) {
return Ok("Extended error (no additional status)".to_string());
}
let extended_error_code_le = u16::from_le_bytes([cip_data[4], cip_data[5]]);
let extended_error_code_be = u16::from_be_bytes([cip_data[4], cip_data[5]]);
let error_msg = match extended_error_code_le {
0x0001 => "Connection failure (extended)".to_string(),
0x0002 => "Resource unavailable (extended)".to_string(),
0x0003 => "Invalid parameter value (extended)".to_string(),
0x0004 => "Path segment error (extended)".to_string(),
0x0005 => "Path destination unknown (extended)".to_string(),
0x0006 => "Partial transfer (extended)".to_string(),
0x0007 => "Connection lost (extended)".to_string(),
0x0008 => "Service not supported (extended)".to_string(),
0x0009 => "Invalid attribute value (extended)".to_string(),
0x000A => "Attribute list error (extended)".to_string(),
0x000B => "Already in requested mode/state (extended)".to_string(),
0x000C => "Object state conflict (extended)".to_string(),
0x000D => "Object already exists (extended)".to_string(),
0x000E => "Attribute not settable (extended)".to_string(),
0x000F => "Privilege violation (extended)".to_string(),
0x0010 => "Device state conflict (extended)".to_string(),
0x0011 => "Reply data too large (extended)".to_string(),
0x0012 => "Fragmentation of a primitive value (extended)".to_string(),
0x0013 => "Not enough data (extended)".to_string(),
0x0014 => "Attribute not supported (extended)".to_string(),
0x0015 => "Too much data (extended)".to_string(),
0x0016 => "Object does not exist (extended)".to_string(),
0x0017 => "Service fragmentation sequence not in progress (extended)".to_string(),
0x0018 => "No stored attribute data (extended)".to_string(),
0x0019 => "Store operation failure (extended)".to_string(),
0x001A => "Routing failure, request packet too large (extended)".to_string(),
0x001B => "Routing failure, response packet too large (extended)".to_string(),
0x001C => "Missing attribute list entry data (extended)".to_string(),
0x001D => "Invalid attribute value list (extended)".to_string(),
0x001E => "Embedded service error (extended)".to_string(),
0x001F => "Vendor specific error (extended)".to_string(),
0x0020 => "Invalid parameter (extended)".to_string(),
0x0021 => "Write-once value or medium already written (extended)".to_string(),
0x0022 => "Invalid reply received (extended)".to_string(),
0x0023 => "Buffer overflow (extended)".to_string(),
0x0024 => "Invalid message format (extended)".to_string(),
0x0025 => "Key failure in path (extended)".to_string(),
0x0026 => "Path size invalid (extended)".to_string(),
0x0027 => "Unexpected attribute in list (extended)".to_string(),
0x0028 => "Invalid member ID (extended)".to_string(),
0x0029 => "Member not settable (extended)".to_string(),
0x002A => "Group 2 only server general failure (extended)".to_string(),
0x002B => "Unknown Modbus error (extended)".to_string(),
0x002C => "Attribute not gettable (extended)".to_string(),
_ => {
match extended_error_code_be {
0x0001 => "Connection failure (extended, BE)".to_string(),
0x0002 => "Resource unavailable (extended, BE)".to_string(),
0x0003 => "Invalid parameter value (extended, BE)".to_string(),
0x0004 => "Path segment error (extended, BE)".to_string(),
0x0005 => "Path destination unknown (extended, BE)".to_string(),
0x0006 => "Partial transfer (extended, BE)".to_string(),
0x0007 => "Connection lost (extended, BE)".to_string(),
0x0008 => "Service not supported (extended, BE)".to_string(),
0x0009 => "Invalid attribute value (extended, BE)".to_string(),
0x000A => "Attribute list error (extended, BE)".to_string(),
0x000B => "Already in requested mode/state (extended, BE)".to_string(),
0x000C => "Object state conflict (extended, BE)".to_string(),
0x000D => "Object already exists (extended, BE)".to_string(),
0x000E => "Attribute not settable (extended, BE)".to_string(),
0x000F => "Privilege violation (extended, BE)".to_string(),
0x0010 => "Device state conflict (extended, BE)".to_string(),
0x0011 => "Reply data too large (extended, BE)".to_string(),
0x0012 => "Fragmentation of a primitive value (extended, BE)".to_string(),
0x0013 => "Not enough data (extended, BE)".to_string(),
0x0014 => "Attribute not supported (extended, BE)".to_string(),
0x0015 => "Too much data (extended, BE)".to_string(),
0x0016 => "Object does not exist (extended, BE)".to_string(),
0x0017 => "Service fragmentation sequence not in progress (extended, BE)".to_string(),
0x0018 => "No stored attribute data (extended, BE)".to_string(),
0x0019 => "Store operation failure (extended, BE)".to_string(),
0x001A => "Routing failure, request packet too large (extended, BE)".to_string(),
0x001B => "Routing failure, response packet too large (extended, BE)".to_string(),
0x001C => "Missing attribute list entry data (extended, BE)".to_string(),
0x001D => "Invalid attribute value list (extended, BE)".to_string(),
0x001E => "Embedded service error (extended, BE)".to_string(),
0x001F => "Vendor specific error (extended, BE)".to_string(),
0x0020 => "Invalid parameter (extended, BE)".to_string(),
0x0021 => "Write-once value or medium already written (extended, BE)".to_string(),
0x0022 => "Invalid reply received (extended, BE)".to_string(),
0x0023 => "Buffer overflow (extended, BE)".to_string(),
0x0024 => "Invalid message format (extended, BE)".to_string(),
0x0025 => "Key failure in path (extended, BE)".to_string(),
0x0026 => "Path size invalid (extended, BE)".to_string(),
0x0027 => "Unexpected attribute in list (extended, BE)".to_string(),
0x0028 => "Invalid member ID (extended, BE)".to_string(),
0x0029 => "Member not settable (extended, BE)".to_string(),
0x002A => "Group 2 only server general failure (extended, BE)".to_string(),
0x002B => "Unknown Modbus error (extended, BE)".to_string(),
0x002C => "Attribute not gettable (extended, BE)".to_string(),
_ if extended_error_code_le == 0x2107 || extended_error_code_be == 0x2107 => {
format!(
"Vendor-specific or composite extended error: 0x{extended_error_code_le:04X} (LE) / 0x{extended_error_code_be:04X} (BE). Raw bytes: [0x{:02X}, 0x{:02X}]. This may indicate the PLC does not support writing to UDT array element members directly.",
cip_data[4], cip_data[5]
)
}
_ => format!(
"Unknown extended CIP error code: 0x{extended_error_code_le:04X} (LE) / 0x{extended_error_code_be:04X} (BE). Raw bytes: [0x{:02X}, 0x{:02X}]",
cip_data[4], cip_data[5]
),
}
}
};
Ok(error_msg)
}
fn check_cip_error(&self, cip_data: &[u8]) -> crate::error::Result<()> {
if cip_data.len() < 3 {
return Err(EtherNetIpError::Protocol(
"CIP response too short for status check".to_string(),
));
}
let general_status = cip_data[2];
if general_status == 0x00 {
return Ok(());
}
if general_status == 0xFF {
let error_msg = self.parse_extended_error(cip_data)?;
return Err(EtherNetIpError::Protocol(format!(
"CIP Extended Error: {error_msg}"
)));
}
let error_msg = self.get_cip_error_message(general_status);
Err(EtherNetIpError::Protocol(format!(
"CIP Error 0x{general_status:02X}: {error_msg}"
)))
}
fn get_cip_error_message(&self, status: u8) -> String {
match status {
0x00 => "Success".to_string(),
0x01 => "Connection failure".to_string(),
0x02 => "Resource unavailable".to_string(),
0x03 => "Invalid parameter value".to_string(),
0x04 => "Path segment error".to_string(),
0x05 => "Path destination unknown".to_string(),
0x06 => "Partial transfer".to_string(),
0x07 => "Connection lost".to_string(),
0x08 => "Service not supported".to_string(),
0x09 => "Invalid attribute value".to_string(),
0x0A => "Attribute list error".to_string(),
0x0B => "Already in requested mode/state".to_string(),
0x0C => "Object state conflict".to_string(),
0x0D => "Object already exists".to_string(),
0x0E => "Attribute not settable".to_string(),
0x0F => "Privilege violation".to_string(),
0x10 => "Device state conflict".to_string(),
0x11 => "Reply data too large".to_string(),
0x12 => "Fragmentation of a primitive value".to_string(),
0x13 => "Not enough data".to_string(),
0x14 => "Attribute not supported".to_string(),
0x15 => "Too much data".to_string(),
0x16 => "Object does not exist".to_string(),
0x17 => "Service fragmentation sequence not in progress".to_string(),
0x18 => "No stored attribute data".to_string(),
0x19 => "Store operation failure".to_string(),
0x1A => "Routing failure, request packet too large".to_string(),
0x1B => "Routing failure, response packet too large".to_string(),
0x1C => "Missing attribute list entry data".to_string(),
0x1D => "Invalid attribute value list".to_string(),
0x1E => "Embedded service error".to_string(),
0x1F => "Vendor specific error".to_string(),
0x20 => "Invalid parameter".to_string(),
0x21 => "Write-once value or medium already written".to_string(),
0x22 => "Invalid reply received".to_string(),
0x23 => "Buffer overflow".to_string(),
0x24 => "Invalid message format".to_string(),
0x25 => "Key failure in path".to_string(),
0x26 => "Path size invalid".to_string(),
0x27 => "Unexpected attribute in list".to_string(),
0x28 => "Invalid member ID".to_string(),
0x29 => "Member not settable".to_string(),
0x2A => "Group 2 only server general failure".to_string(),
0x2B => "Unknown Modbus error".to_string(),
0x2C => "Attribute not gettable".to_string(),
_ => format!("Unknown CIP error code: 0x{status:02X}"),
}
}
fn describe_multiple_service_error(
&self,
general_status: u8,
operations: &[BatchOperation],
) -> String {
if general_status == 0x1E
&& operations.iter().any(|op| {
matches!(
op,
BatchOperation::Write {
value: PlcValue::String(_),
..
}
)
})
{
return "Multiple Service Response error: 0x1E (Embedded service error). On CompactLogix/ControlLogix this commonly indicates the controller rejected a direct STRING write in the batch request; treat it as a PLC firmware limitation, not a protocol bug.".to_string();
}
format!("Multiple Service Response error: 0x{general_status:02X}")
}
async fn validate_session(&mut self) -> crate::error::Result<()> {
let time_since_activity = self.last_activity.lock().await.elapsed();
if time_since_activity > Duration::from_secs(30) {
self.send_keep_alive().await?;
}
Ok(())
}
async fn send_keep_alive(&mut self) -> crate::error::Result<()> {
let packet = vec![0u8; 24];
let mut stream = self.stream.lock().await;
stream.write_all(&packet).await?;
*self.last_activity.lock().await = Instant::now();
Ok(())
}
pub async fn check_health(&self) -> bool {
self.session_handle != 0
&& self.last_activity.lock().await.elapsed() < Duration::from_secs(150)
}
pub async fn check_health_detailed(&mut self) -> crate::error::Result<bool> {
if self.session_handle == 0 {
return Ok(false);
}
match self.send_keep_alive().await {
Ok(()) => Ok(true),
Err(_) => {
match self.register_session().await {
Ok(()) => Ok(true),
Err(_) => Ok(false),
}
}
}
}
async fn read_tag_raw(&mut self, tag_name: &str) -> crate::error::Result<Vec<u8>> {
let response = self
.send_cip_request(&self.build_read_request(tag_name))
.await?;
self.extract_cip_from_response(&response)
}
#[allow(dead_code)]
async fn write_tag_raw(&mut self, tag_name: &str, data: &[u8]) -> crate::error::Result<()> {
let request = self.build_write_request_raw(tag_name, data)?;
let response = self.send_cip_request(&request).await?;
let cip_response = self.extract_cip_from_response(&response)?;
if cip_response.len() < 3 {
return Err(EtherNetIpError::Protocol(
"Write response too short".to_string(),
));
}
let service_reply = cip_response[0]; let general_status = cip_response[2];
tracing::trace!(
"Write response - Service: 0x{:02X}, Status: 0x{:02X}",
service_reply,
general_status
);
if let Err(e) = self.check_cip_error(&cip_response) {
tracing::error!("[WRITE] CIP Error: {}", e);
return Err(e);
}
tracing::info!("Write completed successfully");
Ok(())
}
fn build_unconnected_send(&self, embedded_message: &[u8]) -> Vec<u8> {
let mut ucmm = vec![
0x52, 0x02,
0x20, 0x06, 0x24, 0x01, 0x0A, 0xF0,
];
let msg_len = embedded_message.len() as u16;
ucmm.extend_from_slice(&msg_len.to_le_bytes());
ucmm.extend_from_slice(embedded_message);
if embedded_message.len() % 2 == 1 {
ucmm.push(0x00);
}
let route_path_bytes = if let Some(route_path) = &self.route_path {
route_path.to_cip_bytes()
} else {
Vec::new()
};
let route_path_words = if route_path_bytes.is_empty() {
0
} else {
(route_path_bytes.len() / 2) as u8
};
ucmm.push(route_path_words);
ucmm.push(0x00);
if !route_path_bytes.is_empty() {
tracing::trace!(
"Adding route path to Unconnected Send: {:02X?} ({} bytes, {} words)",
route_path_bytes,
route_path_bytes.len(),
route_path_words
);
ucmm.extend_from_slice(&route_path_bytes);
}
ucmm
}
pub async fn send_cip_request(&self, cip_request: &[u8]) -> Result<Vec<u8>> {
tracing::trace!(
"Sending CIP request ({} bytes): {:02X?}",
cip_request.len(),
cip_request
);
let ucmm_message = self.build_unconnected_send(cip_request);
tracing::trace!(
"Unconnected Send message ({} bytes): {:02X?}",
ucmm_message.len(),
&ucmm_message[..std::cmp::min(64, ucmm_message.len())]
);
let response_data = self.send_rr_data_item(&ucmm_message).await?;
if let Ok(raw_cip_data) = self.extract_unconnected_data_item(&response_data) {
let use_direct_fallback = raw_cip_data.len() >= 3
&& raw_cip_data[0] == 0xD2
&& raw_cip_data[2] != 0x00
&& self.route_path.is_none();
if use_direct_fallback {
tracing::warn!(
"Unconnected Send returned 0xD2 status 0x{:02X}; retrying with direct CIP SendRRData fallback",
raw_cip_data[2]
);
return self.send_rr_data_item(cip_request).await;
}
}
Ok(response_data)
}
async fn send_rr_data_item(&self, item_data: &[u8]) -> Result<Vec<u8>> {
let item_data_size = item_data.len();
let total_data_len = 4 + 2 + 2 + 8 + item_data_size;
let mut packet = Vec::new();
packet.extend_from_slice(&[0x6F, 0x00]); packet.extend_from_slice(&(total_data_len as u16).to_le_bytes()); packet.extend_from_slice(&self.session_handle.to_le_bytes()); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x05, 0x00]); packet.extend_from_slice(&[0x02, 0x00]);
packet.extend_from_slice(&[0x00, 0x00]); packet.extend_from_slice(&[0x00, 0x00]);
packet.extend_from_slice(&[0xB2, 0x00]); packet.extend_from_slice(&(item_data_size as u16).to_le_bytes()); packet.extend_from_slice(item_data);
tracing::trace!(
"Built packet ({} bytes): {:02X?}",
packet.len(),
&packet[..std::cmp::min(64, packet.len())]
);
let mut stream = self.stream.lock().await;
stream
.write_all(&packet)
.await
.map_err(EtherNetIpError::Io)?;
let mut header = [0u8; 24];
match timeout(Duration::from_secs(10), stream.read_exact(&mut header)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(EtherNetIpError::Io(e)),
Err(_) => return Err(EtherNetIpError::Timeout(Duration::from_secs(10))),
}
let cmd_status = u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
if cmd_status != 0 {
return Err(EtherNetIpError::Protocol(format!(
"EIP Command failed. Status: 0x{cmd_status:08X}"
)));
}
let response_length = u16::from_le_bytes([header[2], header[3]]) as usize;
if response_length == 0 {
return Ok(Vec::new());
}
let mut response_data = vec![0u8; response_length];
match timeout(
Duration::from_secs(10),
stream.read_exact(&mut response_data),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(EtherNetIpError::Io(e)),
Err(_) => return Err(EtherNetIpError::Timeout(Duration::from_secs(10))),
}
*self.last_activity.lock().await = Instant::now();
tracing::trace!(
"Received response ({} bytes): {:02X?}",
response_data.len(),
&response_data[..std::cmp::min(32, response_data.len())]
);
Ok(response_data)
}
fn extract_unconnected_data_item(&self, response: &[u8]) -> crate::error::Result<Vec<u8>> {
if response.len() < 8 {
return Err(EtherNetIpError::Protocol(
"Response too short for CPF header".to_string(),
));
}
let mut pos = 6;
let item_count = u16::from_le_bytes([response[pos], response[pos + 1]]);
pos += 2;
for _ in 0..item_count {
if pos + 4 > response.len() {
return Err(EtherNetIpError::Protocol(
"Response truncated while parsing items".to_string(),
));
}
let item_type = u16::from_le_bytes([response[pos], response[pos + 1]]);
let item_length = u16::from_le_bytes([response[pos + 2], response[pos + 3]]) as usize;
pos += 4;
if pos
.checked_add(item_length)
.map_or(true, |end| end > response.len())
{
return Err(EtherNetIpError::Protocol("Data item truncated".to_string()));
}
if item_type == 0x00B2 {
return Ok(response[pos..pos + item_length].to_vec());
}
pos += item_length;
}
Err(EtherNetIpError::Protocol(
"No Unconnected Data Item (0x00B2) found in response".to_string(),
))
}
fn unwrap_unconnected_send_reply(&self, cip_data: &[u8]) -> crate::error::Result<Vec<u8>> {
if cip_data.is_empty() || cip_data[0] != 0xD2 {
return Ok(cip_data.to_vec());
}
if cip_data.len() < 4 {
return Err(EtherNetIpError::Protocol(
"Unconnected Send reply too short".to_string(),
));
}
let general_status = cip_data[2];
let additional_status_words = cip_data[3] as usize;
let embedded_offset = 4 + (additional_status_words * 2);
if general_status != 0x00 {
let error_msg = self.get_cip_error_message(general_status);
return Err(EtherNetIpError::Protocol(format!(
"Unconnected Send failed (0xD2): CIP Error 0x{general_status:02X}: {error_msg}"
)));
}
if embedded_offset >= cip_data.len() {
return Err(EtherNetIpError::Protocol(
"Unconnected Send succeeded but no embedded response payload was returned"
.to_string(),
));
}
Ok(cip_data[embedded_offset..].to_vec())
}
fn extract_cip_from_response(&self, response: &[u8]) -> crate::error::Result<Vec<u8>> {
tracing::trace!(
"Extracting CIP from response ({} bytes): {:02X?}",
response.len(),
&response[..std::cmp::min(32, response.len())]
);
let cip_data = self.extract_unconnected_data_item(response)?;
tracing::trace!(
"Found Unconnected Data Item, extracted CIP data ({} bytes)",
cip_data.len()
);
tracing::trace!(
"CIP data bytes: {:02X?}",
&cip_data[..std::cmp::min(16, cip_data.len())]
);
self.unwrap_unconnected_send_reply(&cip_data)
}
fn parse_cip_response(&self, cip_response: &[u8]) -> crate::error::Result<PlcValue> {
tracing::trace!(
"Parsing CIP response ({} bytes): {:02X?}",
cip_response.len(),
cip_response
);
if cip_response.len() < 4 {
return Err(EtherNetIpError::Protocol(
"CIP response too short".to_string(),
));
}
let service_reply = cip_response[0]; let general_status = cip_response[2];
tracing::trace!(
"Service reply: 0x{:02X}, Status: 0x{:02X}",
service_reply,
general_status
);
if let Err(e) = self.check_cip_error(cip_response) {
tracing::error!("CIP Error: {}", e);
return Err(e);
}
if service_reply == 0xCC {
if cip_response.len() < 6 {
return Err(EtherNetIpError::Protocol(
"Read response too short for data".to_string(),
));
}
let data_type = u16::from_le_bytes([cip_response[4], cip_response[5]]);
let value_data = &cip_response[6..];
tracing::trace!(
"Data type: 0x{:04X}, Value data ({} bytes): {:02X?}",
data_type,
value_data.len(),
value_data
);
match data_type {
0x00C1 => {
if value_data.is_empty() {
return Err(EtherNetIpError::Protocol(
"No data for BOOL value".to_string(),
));
}
let value = value_data[0] != 0;
tracing::trace!("Parsed BOOL: {}", value);
Ok(PlcValue::Bool(value))
}
0x00C2 => {
if value_data.is_empty() {
return Err(EtherNetIpError::Protocol(
"No data for SINT value".to_string(),
));
}
let value = value_data[0] as i8;
tracing::trace!("Parsed SINT: {}", value);
Ok(PlcValue::Sint(value))
}
0x00C3 => {
if value_data.len() < 2 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for INT value".to_string(),
));
}
let value = i16::from_le_bytes([value_data[0], value_data[1]]);
tracing::trace!("Parsed INT: {}", value);
Ok(PlcValue::Int(value))
}
0x00C4 => {
if value_data.len() < 4 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for DINT value".to_string(),
));
}
let value = i32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
tracing::trace!("Parsed DINT: {}", value);
Ok(PlcValue::Dint(value))
}
0x00C5 => {
if value_data.len() < 8 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for LINT value".to_string(),
));
}
let value = i64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
tracing::trace!("Parsed LINT: {}", value);
Ok(PlcValue::Lint(value))
}
0x00C6 => {
if value_data.is_empty() {
return Err(EtherNetIpError::Protocol(
"No data for USINT value".to_string(),
));
}
let value = value_data[0];
tracing::trace!("Parsed USINT: {}", value);
Ok(PlcValue::Usint(value))
}
0x00C7 => {
if value_data.len() < 2 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for UINT value".to_string(),
));
}
let value = u16::from_le_bytes([value_data[0], value_data[1]]);
tracing::trace!("Parsed UINT: {}", value);
Ok(PlcValue::Uint(value))
}
0x00C8 => {
if value_data.len() < 4 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for UDINT value".to_string(),
));
}
let value = u32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
tracing::trace!("Parsed UDINT: {}", value);
Ok(PlcValue::Udint(value))
}
0x00C9 => {
if value_data.len() < 8 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for ULINT value".to_string(),
));
}
let value = u64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
tracing::trace!("Parsed ULINT: {}", value);
Ok(PlcValue::Ulint(value))
}
0x00CA => {
if value_data.len() < 4 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for REAL value".to_string(),
));
}
let value = f32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
tracing::trace!("Parsed REAL: {}", value);
Ok(PlcValue::Real(value))
}
0x00CB => {
if value_data.len() < 8 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for LREAL value".to_string(),
));
}
let value = f64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
tracing::trace!("Parsed LREAL: {}", value);
Ok(PlcValue::Lreal(value))
}
0x00CE => {
if value_data.len() < 4 {
return Err(EtherNetIpError::Protocol(
"Insufficient data for STRING length field".to_string(),
));
}
let length = u32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]) as usize;
if value_data.len() < 4 || value_data.len() - 4 < length {
return Err(EtherNetIpError::Protocol(format!(
"Insufficient data for STRING value: need {} bytes, have {} bytes",
4 + length,
value_data.len()
)));
}
let string_data = &value_data[4..4 + length];
let value = String::from_utf8_lossy(string_data).to_string();
tracing::trace!(
"Parsed STRING (0x00CE): length={}, value='{}'",
length,
value
);
Ok(PlcValue::String(value))
}
0x00DA => {
if value_data.is_empty() {
return Ok(PlcValue::String(String::new()));
}
let length = value_data[0] as usize;
if value_data.len() < 1 + length {
return Err(EtherNetIpError::Protocol(
"Insufficient data for STRING value".to_string(),
));
}
let string_data = &value_data[1..1 + length];
let value = String::from_utf8_lossy(string_data).to_string();
tracing::trace!("Parsed STRING (0x00DA): '{}'", value);
Ok(PlcValue::String(value))
}
0x02A0 => {
tracing::trace!(
"Detected UDT structure (0x02A0) with {} bytes",
value_data.len()
);
Ok(PlcValue::Udt(UdtData {
symbol_id: 0, data: value_data.to_vec(),
}))
}
0x00D3 => {
if value_data.len() >= 4 {
let dword_value = u32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
tracing::trace!(
"Parsed 0x00D3 as DWORD (BOOL array): {} (0x{:08X})",
dword_value,
dword_value
);
Ok(PlcValue::Udint(dword_value))
} else if value_data.len() >= 8 {
let value = u64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
tracing::trace!("Parsed ULINT: {}", value);
Ok(PlcValue::Ulint(value))
} else {
Err(EtherNetIpError::Protocol(
"Insufficient data for ULINT/DWORD value".to_string(),
))
}
}
0x00A0 => {
tracing::trace!(
"Parsed UDT ({} bytes) - note: symbol_id not available in this context",
value_data.len()
);
Ok(PlcValue::Udt(UdtData {
symbol_id: 0, data: value_data.to_vec(),
}))
}
_ => {
tracing::warn!("Unknown data type: 0x{:04X}", data_type);
Err(EtherNetIpError::Protocol(format!(
"Unsupported data type: 0x{data_type:04X}"
)))
}
}
} else if service_reply == 0xCD {
tracing::debug!("Write operation successful");
Ok(PlcValue::Bool(true)) } else {
Err(EtherNetIpError::Protocol(format!(
"Unknown service reply: 0x{service_reply:02X}"
)))
}
}
pub async fn unregister_session(&mut self) -> crate::error::Result<()> {
tracing::info!("Unregistering session and cleaning up connections...");
let _ = self.close_all_connected_sessions().await;
let mut packet = Vec::new();
packet.extend_from_slice(&[0x66, 0x00]); packet.extend_from_slice(&[0x00, 0x00]); packet.extend_from_slice(&self.session_handle.to_le_bytes()); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
self.stream
.lock()
.await
.write_all(&packet)
.await
.map_err(EtherNetIpError::Io)?;
tracing::info!("Session unregistered and all connections closed");
Ok(())
}
fn build_read_request(&self, tag_name: &str) -> Vec<u8> {
self.build_read_request_with_count(tag_name, 1)
}
fn build_read_request_with_count(&self, tag_name: &str, element_count: u16) -> Vec<u8> {
tracing::debug!(
"Building read request for tag: '{}' with count: {}",
tag_name,
element_count
);
let mut cip_request = Vec::new();
cip_request.push(0x4C);
let path = self.build_tag_path(tag_name);
let path_size_words = (path.len() / 2) as u8;
tracing::debug!(
"Path size calculation: {} bytes / 2 = {} words for tag '{}'",
path.len(),
path_size_words,
tag_name
);
cip_request.push(path_size_words);
cip_request.extend_from_slice(&path);
cip_request.extend_from_slice(&element_count.to_le_bytes());
tracing::debug!(
"Built CIP read request ({} bytes) for tag '{}': {:02X?}",
cip_request.len(),
tag_name,
cip_request
);
tracing::debug!(
"Path bytes ({} bytes, {} words) for tag '{}': {:02X?}",
path.len(),
path_size_words,
tag_name,
path
);
cip_request
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn build_element_id_segment(&self, index: u32) -> Vec<u8> {
let mut segment = Vec::new();
if index <= 255 {
segment.push(0x28);
segment.push(index as u8);
} else if index <= 65535 {
segment.push(0x29);
segment.push(0x00); segment.extend_from_slice(&(index as u16).to_le_bytes());
} else {
segment.push(0x2A);
segment.push(0x00); segment.extend_from_slice(&index.to_le_bytes());
}
segment
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn build_base_tag_path(&self, tag_name: &str) -> Vec<u8> {
match TagPath::parse(tag_name) {
Ok(path) => {
let base_path = match &path {
TagPath::Array { base_path, .. } => base_path.as_ref(),
_ => &path,
};
base_path.to_cip_path().unwrap_or_else(|_| {
let mut path = Vec::new();
path.push(0x91); let name_bytes = tag_name.as_bytes();
path.push(name_bytes.len() as u8);
path.extend_from_slice(name_bytes);
if path.len() % 2 != 0 {
path.push(0x00);
}
path
})
}
Err(_) => {
let mut path = Vec::new();
path.push(0x91); let name_bytes = tag_name.as_bytes();
path.push(name_bytes.len() as u8);
path.extend_from_slice(name_bytes);
if path.len() % 2 != 0 {
path.push(0x00);
}
path
}
}
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn build_read_array_request(
&self,
base_array_name: &str,
start_index: u32,
element_count: u16,
) -> Vec<u8> {
let mut cip_request = Vec::new();
cip_request.push(0x4C);
let mut full_path = self.build_base_tag_path(base_array_name);
tracing::trace!(
"build_read_array_request: base_path for '{}' = {:02X?} ({} bytes)",
base_array_name,
full_path,
full_path.len()
);
let element_segment = self.build_element_id_segment(start_index);
tracing::trace!(
"build_read_array_request: element_segment for index {} = {:02X?} ({} bytes)",
start_index,
element_segment,
element_segment.len()
);
full_path.extend_from_slice(&element_segment);
if full_path.len() % 2 != 0 {
full_path.push(0x00);
}
let path_size = (full_path.len() / 2) as u8;
cip_request.push(path_size);
cip_request.extend_from_slice(&full_path);
cip_request.extend_from_slice(&element_count.to_le_bytes());
tracing::trace!(
"build_read_array_request: final request = {:02X?} ({} bytes), path_size = {} words ({} bytes)",
cip_request, cip_request.len(), path_size, full_path.len()
);
cip_request
}
fn build_tag_path(&self, tag_name: &str) -> Vec<u8> {
let app_path = match TagPath::parse(tag_name) {
Ok(tag_path) => {
tracing::debug!("Parsed tag path for '{}': {:?}", tag_name, tag_path);
match tag_path.to_cip_path() {
Ok(path) => {
tracing::debug!(
"TagPath generated {} bytes ({} words) for '{}': {:02X?}",
path.len(),
path.len() / 2,
tag_name,
path
);
path
}
Err(e) => {
tracing::warn!("TagPath.to_cip_path() failed for '{}': {}", tag_name, e);
self.build_simple_tag_path_legacy(tag_name)
}
}
}
Err(e) => {
tracing::warn!("TagPath::parse() failed for '{}': {}", tag_name, e);
self.build_simple_tag_path_legacy(tag_name)
}
};
app_path
}
fn build_simple_tag_path_legacy(&self, tag_name: &str) -> Vec<u8> {
let mut path = Vec::new();
path.push(0x91); path.push(tag_name.len() as u8);
path.extend_from_slice(tag_name.as_bytes());
if tag_name.len() % 2 != 0 {
path.push(0x00);
}
path
}
pub async fn execute_batch(
&mut self,
operations: &[BatchOperation],
) -> crate::error::Result<Vec<BatchResult>> {
if operations.is_empty() {
return Ok(Vec::new());
}
let start_time = Instant::now();
tracing::debug!(
"[BATCH] Starting batch execution with {} operations",
operations.len()
);
let operation_groups = if self.batch_config.optimize_packet_packing {
self.optimize_operation_groups(operations)
} else {
self.sequential_operation_groups(operations)
};
let mut all_results = Vec::with_capacity(operations.len());
for (group_index, group) in operation_groups.iter().enumerate() {
tracing::debug!(
"[BATCH] Processing group {} with {} operations",
group_index + 1,
group.len()
);
match self.execute_operation_group(group).await {
Ok(mut group_results) => {
all_results.append(&mut group_results);
}
Err(e) => {
if !self.batch_config.continue_on_error {
return Err(e);
}
for op in group {
let error_result = BatchResult {
operation: op.clone(),
result: Err(BatchError::NetworkError(e.to_string())),
execution_time_us: 0,
};
all_results.push(error_result);
}
}
}
}
let total_time = start_time.elapsed();
tracing::info!(
"[BATCH] Completed batch execution in {:?} - {} operations processed",
total_time,
all_results.len()
);
Ok(all_results)
}
pub async fn read_tags_batch(
&mut self,
tag_names: &[&str],
) -> crate::error::Result<Vec<(String, std::result::Result<PlcValue, BatchError>)>> {
let operations: Vec<BatchOperation> = tag_names
.iter()
.map(|&name| BatchOperation::Read {
tag_name: name.to_string(),
})
.collect();
let results = self.execute_batch(&operations).await?;
Ok(results
.into_iter()
.map(|result| {
let tag_name = match &result.operation {
BatchOperation::Read { tag_name } => tag_name.clone(),
BatchOperation::Write { .. } => {
unreachable!("Should only have read operations")
}
};
let value_result = match result.result {
Ok(Some(value)) => Ok(value),
Ok(None) => Err(BatchError::Other(
"Unexpected None result for read operation".to_string(),
)),
Err(e) => Err(e),
};
(tag_name, value_result)
})
.collect())
}
pub async fn write_tags_batch(
&mut self,
tag_values: &[(&str, PlcValue)],
) -> crate::error::Result<Vec<(String, std::result::Result<(), BatchError>)>> {
let operations: Vec<BatchOperation> = tag_values
.iter()
.map(|(name, value)| BatchOperation::Write {
tag_name: name.to_string(),
value: value.clone(),
})
.collect();
let results = self.execute_batch(&operations).await?;
Ok(results
.into_iter()
.map(|result| {
let tag_name = match &result.operation {
BatchOperation::Write { tag_name, .. } => tag_name.clone(),
BatchOperation::Read { .. } => {
unreachable!("Should only have write operations")
}
};
let write_result = match result.result {
Ok(None) => Ok(()),
Ok(Some(_)) => Err(BatchError::Other(
"Unexpected value result for write operation".to_string(),
)),
Err(e) => Err(e),
};
(tag_name, write_result)
})
.collect())
}
pub fn configure_batch_operations(&mut self, config: BatchConfig) {
self.batch_config = config;
tracing::debug!(
"[BATCH] Updated batch configuration: max_ops={}, max_size={}, timeout={}ms",
self.batch_config.max_operations_per_packet,
self.batch_config.max_packet_size,
self.batch_config.packet_timeout_ms
);
}
pub fn get_batch_config(&self) -> &BatchConfig {
&self.batch_config
}
fn optimize_operation_groups(&self, operations: &[BatchOperation]) -> Vec<Vec<BatchOperation>> {
let mut groups = Vec::new();
let mut reads = Vec::new();
let mut writes = Vec::new();
for op in operations {
match op {
BatchOperation::Read { .. } => reads.push(op.clone()),
BatchOperation::Write { .. } => writes.push(op.clone()),
}
}
for chunk in reads.chunks(self.batch_config.max_operations_per_packet) {
groups.push(chunk.to_vec());
}
for chunk in writes.chunks(self.batch_config.max_operations_per_packet) {
groups.push(chunk.to_vec());
}
groups
}
fn sequential_operation_groups(
&self,
operations: &[BatchOperation],
) -> Vec<Vec<BatchOperation>> {
operations
.chunks(self.batch_config.max_operations_per_packet)
.map(|chunk| chunk.to_vec())
.collect()
}
async fn execute_operation_group(
&mut self,
operations: &[BatchOperation],
) -> crate::error::Result<Vec<BatchResult>> {
let start_time = Instant::now();
let mut results = Vec::with_capacity(operations.len());
let cip_request = self.build_multiple_service_packet(operations)?;
let response = self.send_cip_request(&cip_request).await?;
let parsed_results = self.parse_multiple_service_response(&response, operations)?;
let execution_time = start_time.elapsed();
for (i, operation) in operations.iter().enumerate() {
let op_execution_time = execution_time.as_micros() as u64 / operations.len() as u64;
let result = if i < parsed_results.len() {
match &parsed_results[i] {
Ok(value) => Ok(value.clone()),
Err(e) => Err(e.clone()),
}
} else {
Err(BatchError::Other(
"Missing result from response".to_string(),
))
};
results.push(BatchResult {
operation: operation.clone(),
result,
execution_time_us: op_execution_time,
});
}
Ok(results)
}
fn build_multiple_service_packet(
&self,
operations: &[BatchOperation],
) -> crate::error::Result<Vec<u8>> {
let mut packet = Vec::with_capacity(8 + (operations.len() * 2));
packet.push(0x0A);
packet.push(0x02); packet.push(0x20); packet.push(0x02); packet.push(0x24); packet.push(0x01);
packet.extend_from_slice(&(operations.len() as u16).to_le_bytes());
let mut service_requests = Vec::with_capacity(operations.len());
let mut current_offset = 2 + (operations.len() * 2);
for operation in operations {
let service_request = match operation {
BatchOperation::Read { tag_name } => self.build_read_request(tag_name),
BatchOperation::Write { tag_name, value } => {
self.build_write_request(tag_name, value)?
}
};
service_requests.push(service_request);
}
for service_request in &service_requests {
packet.extend_from_slice(&(current_offset as u16).to_le_bytes());
current_offset += service_request.len();
}
for service_request in service_requests {
packet.extend_from_slice(&service_request);
}
tracing::trace!(
"[BATCH] Built Multiple Service Packet ({} bytes, {} services)",
packet.len(),
operations.len()
);
Ok(packet)
}
fn parse_multiple_service_response(
&self,
response: &[u8],
operations: &[BatchOperation],
) -> crate::error::Result<Vec<std::result::Result<Option<PlcValue>, BatchError>>> {
if response.len() < 6 {
return Err(crate::error::EtherNetIpError::Protocol(
"Response too short for Multiple Service Packet".to_string(),
));
}
let mut results = Vec::new();
tracing::trace!(
"Raw Multiple Service Response ({} bytes): {:02X?}",
response.len(),
response
);
let cip_data = match self.extract_cip_from_response(response) {
Ok(data) => data,
Err(e) => {
tracing::error!("Failed to extract CIP data: {}", e);
return Err(e);
}
};
tracing::trace!(
"Extracted CIP data ({} bytes): {:02X?}",
cip_data.len(),
cip_data
);
if cip_data.len() < 6 {
return Err(crate::error::EtherNetIpError::Protocol(
"CIP data too short for Multiple Service Response".to_string(),
));
}
let service_code = cip_data[0];
let general_status = cip_data[2];
let num_replies = u16::from_le_bytes([cip_data[4], cip_data[5]]) as usize;
tracing::debug!(
"Multiple Service Response: service=0x{:02X}, status=0x{:02X}, replies={}",
service_code,
general_status,
num_replies
);
if general_status != 0x00 {
return Err(crate::error::EtherNetIpError::Protocol(
self.describe_multiple_service_error(general_status, operations),
));
}
if num_replies != operations.len() {
return Err(crate::error::EtherNetIpError::Protocol(format!(
"Reply count mismatch: expected {}, got {}",
operations.len(),
num_replies
)));
}
let mut reply_offsets = Vec::new();
let mut offset = 6;
for _i in 0..num_replies {
if offset + 2 > cip_data.len() {
return Err(crate::error::EtherNetIpError::Protocol(
"CIP data too short for reply offsets".to_string(),
));
}
let reply_offset =
u16::from_le_bytes([cip_data[offset], cip_data[offset + 1]]) as usize;
reply_offsets.push(reply_offset);
offset += 2;
}
tracing::trace!("Reply offsets: {:?}", reply_offsets);
let reply_base_offset = 6 + (num_replies * 2);
tracing::trace!("Reply base offset: {}", reply_base_offset);
for (i, &reply_offset) in reply_offsets.iter().enumerate() {
let reply_start = 4 + reply_offset;
if reply_start >= cip_data.len() {
results.push(Err(BatchError::Other(
"Reply offset beyond CIP data".to_string(),
)));
continue;
}
let reply_end = if i + 1 < reply_offsets.len() {
4 + reply_offsets[i + 1]
} else {
cip_data.len()
};
if reply_end > cip_data.len() || reply_start >= reply_end {
results.push(Err(BatchError::Other(
"Invalid reply boundaries".to_string(),
)));
continue;
}
let reply_data = &cip_data[reply_start..reply_end];
tracing::trace!(
"Reply {} at offset {}: start={}, end={}, len={}",
i,
reply_offset,
reply_start,
reply_end,
reply_data.len()
);
tracing::trace!("Reply {} data: {:02X?}", i, reply_data);
let result = self.parse_individual_reply(reply_data, &operations[i]);
results.push(result);
}
Ok(results)
}
fn parse_individual_reply(
&self,
reply_data: &[u8],
operation: &BatchOperation,
) -> std::result::Result<Option<PlcValue>, BatchError> {
if reply_data.len() < 4 {
return Err(BatchError::SerializationError(
"Reply too short".to_string(),
));
}
tracing::trace!(
"Parsing individual reply ({} bytes): {:02X?}",
reply_data.len(),
reply_data
);
let service_code = reply_data[0];
let general_status = reply_data[2];
tracing::trace!(
"Service code: 0x{:02X}, Status: 0x{:02X}",
service_code,
general_status
);
if general_status != 0x00 {
let error_msg = self.get_cip_error_message(general_status);
return Err(BatchError::CipError {
status: general_status,
message: error_msg,
});
}
match operation {
BatchOperation::Write { .. } => {
Ok(None)
}
BatchOperation::Read { .. } => {
if reply_data.len() < 6 {
return Err(BatchError::SerializationError(
"Read reply too short for data".to_string(),
));
}
let data = &reply_data[4..];
tracing::trace!("Parsing data ({} bytes): {:02X?}", data.len(), data);
if data.len() < 2 {
return Err(BatchError::SerializationError(
"Data too short for type".to_string(),
));
}
let data_type = u16::from_le_bytes([data[0], data[1]]);
let value_data = &data[2..];
tracing::trace!(
"Data type: 0x{:04X}, Value data ({} bytes): {:02X?}",
data_type,
value_data.len(),
value_data
);
match data_type {
0x00C1 => {
if value_data.is_empty() {
return Err(BatchError::SerializationError(
"Missing BOOL value".to_string(),
));
}
Ok(Some(PlcValue::Bool(value_data[0] != 0)))
}
0x00D3 => {
if value_data.len() < 4 {
return Err(BatchError::SerializationError(
"Missing packed BOOL array DWORD value".to_string(),
));
}
let packed_value = u32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
if let BatchOperation::Read { tag_name } = operation {
if let Some((_base_name, index)) =
self.parse_array_element_access(tag_name)
{
let bit_index = (index % 32) as u32;
let value = (packed_value >> bit_index) & 1 != 0;
tracing::trace!(
"Parsed packed BOOL array element '{}' from DWORD 0x{:08X} using bit {} -> {}",
tag_name,
packed_value,
bit_index,
value
);
return Ok(Some(PlcValue::Bool(value)));
}
}
tracing::trace!(
"Parsed 0x00D3 batch read as UDINT fallback: {} (0x{:08X})",
packed_value,
packed_value
);
Ok(Some(PlcValue::Udint(packed_value)))
}
0x00C2 => {
if value_data.is_empty() {
return Err(BatchError::SerializationError(
"Missing SINT value".to_string(),
));
}
Ok(Some(PlcValue::Sint(value_data[0] as i8)))
}
0x00C3 => {
if value_data.len() < 2 {
return Err(BatchError::SerializationError(
"Missing INT value".to_string(),
));
}
let value = i16::from_le_bytes([value_data[0], value_data[1]]);
Ok(Some(PlcValue::Int(value)))
}
0x00C4 => {
if value_data.len() < 4 {
return Err(BatchError::SerializationError(
"Missing DINT value".to_string(),
));
}
let value = i32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
tracing::trace!("Parsed DINT: {}", value);
Ok(Some(PlcValue::Dint(value)))
}
0x00C5 => {
if value_data.len() < 8 {
return Err(BatchError::SerializationError(
"Missing LINT value".to_string(),
));
}
let value = i64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
Ok(Some(PlcValue::Lint(value)))
}
0x00C6 => {
if value_data.is_empty() {
return Err(BatchError::SerializationError(
"Missing USINT value".to_string(),
));
}
Ok(Some(PlcValue::Usint(value_data[0])))
}
0x00C7 => {
if value_data.len() < 2 {
return Err(BatchError::SerializationError(
"Missing UINT value".to_string(),
));
}
let value = u16::from_le_bytes([value_data[0], value_data[1]]);
Ok(Some(PlcValue::Uint(value)))
}
0x00C8 => {
if value_data.len() < 4 {
return Err(BatchError::SerializationError(
"Missing UDINT value".to_string(),
));
}
let value = u32::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
]);
Ok(Some(PlcValue::Udint(value)))
}
0x00C9 => {
if value_data.len() < 8 {
return Err(BatchError::SerializationError(
"Missing ULINT value".to_string(),
));
}
let value = u64::from_le_bytes([
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
]);
Ok(Some(PlcValue::Ulint(value)))
}
0x00CA => {
if value_data.len() < 4 {
return Err(BatchError::SerializationError(
"Missing REAL value".to_string(),
));
}
let bytes = [value_data[0], value_data[1], value_data[2], value_data[3]];
let value = f32::from_le_bytes(bytes);
tracing::trace!("Parsed REAL: {}", value);
Ok(Some(PlcValue::Real(value)))
}
0x00CB => {
if value_data.len() < 8 {
return Err(BatchError::SerializationError(
"Missing LREAL value".to_string(),
));
}
let bytes = [
value_data[0],
value_data[1],
value_data[2],
value_data[3],
value_data[4],
value_data[5],
value_data[6],
value_data[7],
];
let value = f64::from_le_bytes(bytes);
Ok(Some(PlcValue::Lreal(value)))
}
0x00DA => {
if value_data.is_empty() {
return Ok(Some(PlcValue::String(String::new())));
}
let length = value_data[0] as usize;
if value_data.len() < 1 + length {
return Err(BatchError::SerializationError(
"Insufficient data for STRING value".to_string(),
));
}
let string_data = &value_data[1..1 + length];
let value = String::from_utf8_lossy(string_data).to_string();
tracing::trace!("Parsed STRING: '{}'", value);
Ok(Some(PlcValue::String(value)))
}
0x02A0 => {
tracing::trace!(
"Detected UDT structure (0x02A0) with {} bytes",
value_data.len()
);
Ok(Some(PlcValue::Udt(UdtData {
symbol_id: 0, data: value_data.to_vec(),
})))
}
_ => Err(BatchError::SerializationError(format!(
"Unsupported data type: 0x{data_type:04X}"
))),
}
}
}
}
pub async fn write_ab_string_components(
&mut self,
tag_name: &str,
value: &str,
) -> crate::error::Result<()> {
tracing::debug!(
"[AB STRING] Writing string '{}' to tag '{}' using component access",
value,
tag_name
);
let string_bytes = value.as_bytes();
let string_len = string_bytes.len() as i32;
let len_tag = format!("{tag_name}.LEN");
tracing::debug!("Step 1: Writing length {} to {}", string_len, len_tag);
match self.write_tag(&len_tag, PlcValue::Dint(string_len)).await {
Ok(_) => tracing::debug!("Length written successfully"),
Err(e) => {
tracing::error!("Length write failed: {}", e);
return Err(e);
}
}
tracing::debug!("Step 2: Writing string data to {}.DATA", tag_name);
for (i, &byte) in string_bytes.iter().enumerate() {
let data_element = format!("{tag_name}.DATA[{i}]");
match self
.write_tag(&data_element, PlcValue::Sint(byte as i8))
.await
{
Ok(_) => print!("."),
Err(e) => {
tracing::error!("Failed to write byte {} to position {}: {}", byte, i, e);
return Err(e);
}
}
}
if string_bytes.len() < 82 {
let null_element = format!("{}.DATA[{}]", tag_name, string_bytes.len());
match self.write_tag(&null_element, PlcValue::Sint(0)).await {
Ok(_) => tracing::debug!("String null-terminated successfully"),
Err(e) => tracing::warn!("Could not null-terminate: {}", e),
}
}
tracing::info!("AB STRING component write completed!");
Ok(())
}
pub async fn write_ab_string_udt(
&mut self,
tag_name: &str,
value: &str,
) -> crate::error::Result<()> {
tracing::debug!(
"[AB STRING UDT] Writing string '{}' to tag '{}' as UDT",
value,
tag_name
);
let string_bytes = value.as_bytes();
if string_bytes.len() > 82 {
return Err(EtherNetIpError::Protocol(
"String too long for Allen-Bradley STRING (max 82 chars)".to_string(),
));
}
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let tag_path = self.build_tag_path(tag_name);
cip_request.push((tag_path.len() / 2) as u8); cip_request.extend_from_slice(&tag_path);
cip_request.extend_from_slice(&[0xA0, 0x00]); cip_request.extend_from_slice(&[0x01, 0x00]);
let len = string_bytes.len() as u32;
cip_request.extend_from_slice(&len.to_le_bytes());
cip_request.extend_from_slice(string_bytes);
let padding_needed = 82 - string_bytes.len();
cip_request.extend_from_slice(&vec![0u8; padding_needed]);
tracing::trace!("Built UDT write request: {} bytes total", cip_request.len());
let response = self.send_cip_request(&cip_request).await?;
if response.len() >= 3 {
let general_status = response[2];
if general_status == 0x00 {
tracing::info!("AB STRING UDT write successful!");
Ok(())
} else {
let error_msg = self.get_cip_error_message(general_status);
Err(EtherNetIpError::Protocol(format!(
"AB STRING UDT write failed - CIP Error 0x{general_status:02X}: {error_msg}"
)))
}
} else {
Err(EtherNetIpError::Protocol(
"Invalid AB STRING UDT write response".to_string(),
))
}
}
async fn establish_connected_session(
&mut self,
session_name: &str,
) -> crate::error::Result<ConnectedSession> {
tracing::debug!(
"[CONNECTED] Establishing connected session: '{}'",
session_name
);
tracing::debug!("[CONNECTED] Will try multiple parameter configurations...");
*self.connection_sequence.lock().await += 1;
let connection_serial = (*self.connection_sequence.lock().await & 0xFFFF) as u16;
for config_id in 0..=5 {
tracing::debug!(
"[ATTEMPT {}] Trying configuration {}:",
config_id + 1,
config_id
);
let mut session = if config_id == 0 {
ConnectedSession::new(connection_serial)
} else {
ConnectedSession::with_config(connection_serial, config_id)
};
session.o_to_t_connection_id =
0x2000_0000 + *self.connection_sequence.lock().await + (config_id as u32 * 0x1000);
session.t_to_o_connection_id =
0x3000_0000 + *self.connection_sequence.lock().await + (config_id as u32 * 0x1000);
let forward_open_request = self.build_forward_open_request(&session)?;
tracing::debug!(
"[ATTEMPT {}] Sending Forward Open request ({} bytes)",
config_id + 1,
forward_open_request.len()
);
match self.send_cip_request(&forward_open_request).await {
Ok(response) => {
match self.parse_forward_open_response(&mut session, &response) {
Ok(()) => {
tracing::info!("[SUCCESS] Configuration {} worked!", config_id);
tracing::debug!("Connection ID: 0x{:08X}", session.connection_id);
tracing::debug!("O->T ID: 0x{:08X}", session.o_to_t_connection_id);
tracing::debug!("T->O ID: 0x{:08X}", session.t_to_o_connection_id);
tracing::debug!(
"Using Connection ID: 0x{:08X} for messaging",
session.connection_id
);
session.is_active = true;
let mut sessions = self.connected_sessions.lock().await;
sessions.insert(session_name.to_string(), session.clone());
return Ok(session);
}
Err(e) => {
tracing::warn!(
"[ATTEMPT {}] Configuration {} failed: {}",
config_id + 1,
config_id,
e
);
if e.to_string().contains("status: 0x") {
tracing::debug!("Status indicates: parameter incompatibility or resource conflict");
}
}
}
}
Err(e) => {
tracing::warn!(
"[ATTEMPT {}] Network error with config {}: {}",
config_id + 1,
config_id,
e
);
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(EtherNetIpError::Protocol(
"All connection parameter configurations failed. PLC may not support connected messaging or has reached connection limits.".to_string()
))
}
fn build_forward_open_request(
&self,
session: &ConnectedSession,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::with_capacity(50);
request.push(0x54);
request.push(0x02);
request.push(0x20); request.push(0x06);
request.push(0x24); request.push(0x01);
request.push(0x0A); request.push(session.timeout_multiplier);
request.extend_from_slice(&session.o_to_t_connection_id.to_le_bytes());
request.extend_from_slice(&session.t_to_o_connection_id.to_le_bytes());
request.extend_from_slice(&session.connection_serial.to_le_bytes());
request.extend_from_slice(&session.originator_vendor_id.to_le_bytes());
request.extend_from_slice(&session.originator_serial.to_le_bytes());
request.push(session.timeout_multiplier);
request.extend_from_slice(&[0x00, 0x00, 0x00]);
request.extend_from_slice(&session.rpi.to_le_bytes());
let o_to_t_params = self.encode_connection_parameters(&session.o_to_t_params);
request.extend_from_slice(&o_to_t_params.to_le_bytes());
request.extend_from_slice(&session.rpi.to_le_bytes());
let t_to_o_params = self.encode_connection_parameters(&session.t_to_o_params);
request.extend_from_slice(&t_to_o_params.to_le_bytes());
request.push(0xA3);
request.push(0x02);
request.push(0x20); request.push(0x02); request.push(0x24); request.push(0x01);
Ok(request)
}
fn encode_connection_parameters(&self, params: &ConnectionParameters) -> u32 {
let mut encoded = 0u32;
encoded |= params.size as u32;
if params.variable_size {
encoded |= 1 << 25;
}
encoded |= (params.connection_type as u32) << 29;
encoded |= (params.priority as u32) << 26;
encoded
}
fn parse_forward_open_response(
&self,
session: &mut ConnectedSession,
response: &[u8],
) -> crate::error::Result<()> {
if response.len() < 2 {
return Err(EtherNetIpError::Protocol(
"Forward Open response too short".to_string(),
));
}
let service = response[0];
let status = response[1];
if service != 0xD4 {
return Err(EtherNetIpError::Protocol(format!(
"Unexpected service in Forward Open response: 0x{service:02X}"
)));
}
if status != 0x00 {
let error_msg = match status {
0x01 => "Connection failure - Resource unavailable or already exists",
0x02 => "Invalid parameter - Connection parameters rejected",
0x03 => "Connection timeout - PLC did not respond in time",
0x04 => "Connection limit exceeded - Too many connections",
0x08 => "Invalid service - Forward Open not supported",
0x0C => "Invalid attribute - Connection parameters invalid",
0x13 => "Path destination unknown - Target object not found",
0x26 => "Invalid parameter value - RPI or size out of range",
_ => &format!("Unknown status: 0x{status:02X}"),
};
return Err(EtherNetIpError::Protocol(format!(
"Forward Open failed with status 0x{status:02X}: {error_msg}"
)));
}
if response.len() < 16 {
return Err(EtherNetIpError::Protocol(
"Forward Open response data too short".to_string(),
));
}
let actual_o_to_t_id =
u32::from_le_bytes([response[2], response[3], response[4], response[5]]);
let actual_t_to_o_id =
u32::from_le_bytes([response[6], response[7], response[8], response[9]]);
session.o_to_t_connection_id = actual_o_to_t_id;
session.t_to_o_connection_id = actual_t_to_o_id;
session.connection_id = actual_o_to_t_id;
tracing::info!("[FORWARD OPEN] Success!");
tracing::debug!(
"O->T Connection ID: 0x{:08X} (PLC assigned)",
session.o_to_t_connection_id
);
tracing::debug!(
"T->O Connection ID: 0x{:08X} (PLC assigned)",
session.t_to_o_connection_id
);
tracing::debug!(
"Using Connection ID: 0x{:08X} for messaging",
session.connection_id
);
Ok(())
}
pub async fn write_string_connected(
&mut self,
tag_name: &str,
value: &str,
) -> crate::error::Result<()> {
let session_name = format!("string_write_{tag_name}");
let mut sessions = self.connected_sessions.lock().await;
if !sessions.contains_key(&session_name) {
drop(sessions); self.establish_connected_session(&session_name).await?;
sessions = self.connected_sessions.lock().await;
}
let session = sessions.get(&session_name).unwrap().clone();
let request = self.build_connected_string_write_request(tag_name, value, &session)?;
drop(sessions); let response = self
.send_connected_cip_request(&request, &session, &session_name)
.await?;
if response.len() >= 2 {
let status = response[1];
if status == 0x00 {
Ok(())
} else {
let error_msg = self.get_cip_error_message(status);
Err(EtherNetIpError::Protocol(format!(
"CIP Error 0x{status:02X}: {error_msg}"
)))
}
} else {
Err(EtherNetIpError::Protocol(
"Invalid connected string write response".to_string(),
))
}
}
fn build_connected_string_write_request(
&self,
tag_name: &str,
value: &str,
_session: &ConnectedSession,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4D);
let tag_bytes = tag_name.as_bytes();
let path_size_words = (2 + tag_bytes.len() + 1) / 2; request.push(path_size_words as u8);
request.push(0x91); request.push(tag_bytes.len() as u8); request.extend_from_slice(tag_bytes);
if (2 + tag_bytes.len()) % 2 != 0 {
request.push(0x00);
}
request.extend_from_slice(&[0xCE, 0x0F]);
request.extend_from_slice(&[0x01, 0x00]);
let string_bytes = value.as_bytes();
let max_len: u16 = 82; let current_len = string_bytes.len().min(max_len as usize) as u16;
request.extend_from_slice(¤t_len.to_le_bytes());
request.extend_from_slice(&max_len.to_le_bytes());
let mut data_array = vec![0u8; max_len as usize];
data_array[..current_len as usize].copy_from_slice(&string_bytes[..current_len as usize]);
request.extend_from_slice(&data_array);
tracing::trace!(
"Built connected string write request ({} bytes) for '{}' = '{}' (len={}, maxlen={})",
request.len(),
tag_name,
value,
current_len,
max_len
);
tracing::trace!("Request: {:02X?}", request);
Ok(request)
}
async fn send_connected_cip_request(
&mut self,
cip_request: &[u8],
session: &ConnectedSession,
session_name: &str,
) -> crate::error::Result<Vec<u8>> {
tracing::debug!(
"[CONNECTED] Sending connected CIP request ({} bytes) using T->O connection ID 0x{:08X}",
cip_request.len(), session.t_to_o_connection_id
);
let mut packet = Vec::new();
packet.extend_from_slice(&[0x6F, 0x00]); packet.extend_from_slice(&[0x00, 0x00]); packet.extend_from_slice(&self.session_handle.to_le_bytes()); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]); packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
let cpf_start = packet.len();
packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
packet.extend_from_slice(&[0x05, 0x00]);
packet.extend_from_slice(&[0x02, 0x00]);
packet.extend_from_slice(&[0xA1, 0x00]); packet.extend_from_slice(&[0x04, 0x00]); packet.extend_from_slice(&session.t_to_o_connection_id.to_le_bytes());
packet.extend_from_slice(&[0xB1, 0x00]); let data_length = cip_request.len() + 2; packet.extend_from_slice(&(data_length as u16).to_le_bytes());
let session_name_clone = session_name.to_string();
let _session_clone = session.clone();
let mut sessions = self.connected_sessions.lock().await;
let current_sequence = if let Some(session_mut) = sessions.get_mut(&session_name_clone) {
session_mut.sequence_count += 1;
session_mut.sequence_count
} else {
1 };
drop(sessions);
packet.extend_from_slice(¤t_sequence.to_le_bytes());
packet.extend_from_slice(cip_request);
let cpf_length = packet.len() - cpf_start;
packet[2..4].copy_from_slice(&(cpf_length as u16).to_le_bytes());
tracing::trace!(
"[CONNECTED] Sending packet ({} bytes) with sequence {}",
packet.len(),
current_sequence
);
let mut stream = self.stream.lock().await;
stream
.write_all(&packet)
.await
.map_err(EtherNetIpError::Io)?;
let mut header = [0u8; 24];
stream
.read_exact(&mut header)
.await
.map_err(EtherNetIpError::Io)?;
let cmd_status = u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
if cmd_status != 0 {
return Err(EtherNetIpError::Protocol(format!(
"Connected message failed with status: 0x{cmd_status:08X}"
)));
}
let response_length = u16::from_le_bytes([header[2], header[3]]) as usize;
let mut response_data = vec![0u8; response_length];
stream
.read_exact(&mut response_data)
.await
.map_err(EtherNetIpError::Io)?;
let mut last_activity = self.last_activity.lock().await;
*last_activity = Instant::now();
tracing::trace!(
"[CONNECTED] Received response ({} bytes)",
response_data.len()
);
self.extract_connected_cip_from_response(&response_data)
}
fn extract_connected_cip_from_response(
&self,
response: &[u8],
) -> crate::error::Result<Vec<u8>> {
tracing::trace!(
"[CONNECTED] Extracting CIP from connected response ({} bytes): {:02X?}",
response.len(),
response
);
if response.len() < 12 {
return Err(EtherNetIpError::Protocol(
"Connected response too short for CPF header".to_string(),
));
}
let item_count = u16::from_le_bytes([response[6], response[7]]) as usize;
tracing::trace!("[CONNECTED] CPF item count: {}", item_count);
let mut pos = 8;
for _i in 0..item_count {
if pos + 4 > response.len() {
return Err(EtherNetIpError::Protocol(
"Response truncated while parsing items".to_string(),
));
}
let item_type = u16::from_le_bytes([response[pos], response[pos + 1]]);
let item_length = u16::from_le_bytes([response[pos + 2], response[pos + 3]]) as usize;
pos += 4;
tracing::trace!(
"[CONNECTED] Found item: type=0x{:04X}, length={}",
item_type,
item_length
);
if pos
.checked_add(item_length)
.map_or(true, |end| end > response.len())
{
return Err(EtherNetIpError::Protocol(
"Connected data item truncated".to_string(),
));
}
if item_type == 0x00B1 {
if item_length < 2 {
return Err(EtherNetIpError::Protocol(
"Connected data item too short for sequence".to_string(),
));
}
let sequence_count = u16::from_le_bytes([response[pos], response[pos + 1]]);
tracing::trace!("[CONNECTED] Sequence count: {}", sequence_count);
let cip_data = response[pos + 2..pos + item_length].to_vec();
tracing::trace!(
"[CONNECTED] Extracted CIP data ({} bytes): {:02X?}",
cip_data.len(),
cip_data
);
return Ok(cip_data);
} else {
pos += item_length;
}
}
Err(EtherNetIpError::Protocol(
"Connected Data Item (0x00B1) not found in response".to_string(),
))
}
async fn close_connected_session(&mut self, session_name: &str) -> crate::error::Result<()> {
if let Some(session) = self.connected_sessions.lock().await.get(session_name) {
let session = session.clone();
let forward_close_request = self.build_forward_close_request(&session)?;
let _response = self.send_cip_request(&forward_close_request).await?;
tracing::info!("[CONNECTED] Session '{}' closed successfully", session_name);
}
let mut sessions = self.connected_sessions.lock().await;
sessions.remove(session_name);
Ok(())
}
fn build_forward_close_request(
&self,
session: &ConnectedSession,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::with_capacity(21);
request.push(0x4E);
request.push(0x02);
request.push(0x20); request.push(0x06);
request.push(0x24); request.push(0x01);
request.push(0x0A); request.push(session.timeout_multiplier);
request.extend_from_slice(&session.connection_serial.to_le_bytes());
request.extend_from_slice(&session.originator_vendor_id.to_le_bytes());
request.extend_from_slice(&session.originator_serial.to_le_bytes());
request.push(0x02);
request.push(0x20); request.push(0x02); request.push(0x24); request.push(0x01);
Ok(request)
}
async fn close_all_connected_sessions(&mut self) -> crate::error::Result<()> {
let session_names: Vec<String> = self
.connected_sessions
.lock()
.await
.keys()
.cloned()
.collect();
for session_name in session_names {
let _ = self.close_connected_session(&session_name).await; }
Ok(())
}
pub async fn write_string_unconnected(
&mut self,
tag_name: &str,
value: &str,
) -> crate::error::Result<()> {
tracing::debug!(
"[UNCONNECTED] Writing string '{}' to tag '{}' using unconnected messaging",
value,
tag_name
);
self.validate_session().await?;
let string_bytes = value.as_bytes();
if string_bytes.len() > 82 {
return Err(EtherNetIpError::Protocol(
"String too long for Allen-Bradley STRING (max 82 chars)".to_string(),
));
}
let mut cip_request = Vec::new();
cip_request.push(0x4D);
let tag_bytes = tag_name.as_bytes();
let path_len = if tag_bytes.len() % 2 == 0 {
tag_bytes.len() + 2
} else {
tag_bytes.len() + 3
} / 2;
cip_request.push(path_len as u8);
cip_request.push(0x91); cip_request.push(tag_bytes.len() as u8); cip_request.extend_from_slice(tag_bytes);
if tag_bytes.len() % 2 != 0 {
cip_request.push(0x00);
}
let _current_len = string_bytes.len().min(82) as u16;
let current_len = string_bytes.len().min(82) as u32;
cip_request.extend_from_slice(&[0xCE, 0x0F]);
cip_request.extend_from_slice(¤t_len.to_le_bytes());
cip_request.extend_from_slice(&string_bytes[..current_len as usize]);
tracing::trace!(
"Built Allen-Bradley STRING write request ({} bytes) for '{}' = '{}' (len={})",
cip_request.len(),
tag_name,
value,
current_len
);
tracing::trace!(
"Request structure: Service=0x4D, Path={} bytes, Header=0xCE0F, Len={} (4 bytes), Data",
path_len * 2,
current_len
);
let response = self.send_cip_request(&cip_request).await?;
let cip_response = self.extract_cip_from_response(&response)?;
if cip_response.len() >= 3 {
let service_reply = cip_response[0]; let _additional_status_size = cip_response[1]; let status = cip_response[2];
tracing::trace!(
"Write response - Service: 0x{:02X}, Status: 0x{:02X}",
service_reply,
status
);
if status == 0x00 {
tracing::info!("[UNCONNECTED] String write completed successfully");
Ok(())
} else {
let error_msg = self.get_cip_error_message(status);
tracing::error!(
"[UNCONNECTED] String write failed: {} (0x{:02X})",
error_msg,
status
);
Err(EtherNetIpError::Protocol(format!(
"CIP Error 0x{status:02X}: {error_msg}"
)))
}
} else {
Err(EtherNetIpError::Protocol(
"Invalid unconnected string write response - too short".to_string(),
))
}
}
pub async fn write_string(&mut self, tag_name: &str, value: &str) -> crate::error::Result<()> {
if value.len() > 82 {
return Err(crate::error::EtherNetIpError::StringTooLong {
max_length: 82,
actual_length: value.len(),
});
}
if !value.is_ascii() {
return Err(crate::error::EtherNetIpError::InvalidString {
reason: "String contains non-ASCII characters".to_string(),
});
}
let request = self.build_string_write_request(tag_name, value)?;
let response = self.send_cip_request(&request).await?;
let cip_response = self.extract_cip_from_response(&response)?;
if cip_response.len() < 2 {
return Err(crate::error::EtherNetIpError::InvalidResponse {
reason: "Response too short".to_string(),
});
}
let status = cip_response[0];
if status != 0 {
return Err(crate::error::EtherNetIpError::WriteError {
status,
message: self.get_cip_error_message(status),
});
}
Ok(())
}
fn build_string_write_request(
&self,
tag_name: &str,
value: &str,
) -> crate::error::Result<Vec<u8>> {
let mut request = Vec::new();
request.push(0x4D);
let tag_path = self.build_tag_path(tag_name);
request.extend_from_slice(&tag_path);
request.extend_from_slice(&(value.len() as u16).to_le_bytes()); request.extend_from_slice(&82u16.to_le_bytes());
let mut data = [0u8; 82];
let bytes = value.as_bytes();
data[..bytes.len()].copy_from_slice(bytes);
request.extend_from_slice(&data);
Ok(request)
}
pub async fn subscribe_to_tag(
&self,
tag_path: &str,
options: SubscriptionOptions,
) -> Result<TagSubscription> {
let subscription = TagSubscription::new(tag_path.to_string(), options.clone());
let mut validation_client = self.clone();
let initial_value = validation_client.read_tag(tag_path).await?;
subscription.update_value(&initial_value).await?;
let mut subscriptions = self.subscriptions.lock().await;
let update_rate_ms = options.update_rate;
subscriptions.push(subscription.clone());
drop(subscriptions);
let tag_path = tag_path.to_string();
let mut client = self.clone();
tokio::spawn(async move {
let interval = std::time::Duration::from_millis(update_rate_ms as u64);
loop {
match client.read_tag(&tag_path).await {
Ok(value) => {
if let Err(e) = client.update_subscription(&tag_path, &value).await {
tracing::error!("Error updating subscription: {}", e);
break;
}
}
Err(e) => {
tracing::error!("Error reading tag {}: {}", tag_path, e);
break;
}
}
tokio::time::sleep(interval).await;
}
});
Ok(subscription)
}
pub async fn subscribe_to_tags(
&self,
tags: &[(&str, SubscriptionOptions)],
) -> Result<Vec<TagSubscription>> {
let mut subs = Vec::with_capacity(tags.len());
for (tag_name, options) in tags {
subs.push(self.subscribe_to_tag(tag_name, options.clone()).await?);
}
Ok(subs)
}
pub async fn upsert_tag_group(
&self,
group_name: &str,
tags: &[&str],
update_rate_ms: u32,
) -> Result<()> {
if group_name.trim().is_empty() {
return Err(EtherNetIpError::Protocol(
"Tag group name cannot be empty".to_string(),
));
}
if tags.is_empty() {
return Err(EtherNetIpError::Protocol(
"Tag group must contain at least one tag".to_string(),
));
}
if update_rate_ms == 0 {
return Err(EtherNetIpError::Protocol(
"Tag group update rate must be greater than 0 ms".to_string(),
));
}
let config = TagGroupConfig {
name: group_name.to_string(),
tags: tags.iter().map(|s| (*s).to_string()).collect(),
update_rate_ms,
};
let mut groups = self.tag_groups.lock().await;
groups.insert(group_name.to_string(), config);
Ok(())
}
pub async fn remove_tag_group(&self, group_name: &str) -> bool {
let mut groups = self.tag_groups.lock().await;
groups.remove(group_name).is_some()
}
pub async fn list_tag_groups(&self) -> Vec<TagGroupConfig> {
let groups = self.tag_groups.lock().await;
groups.values().cloned().collect()
}
pub async fn read_tag_group_once(&self, group_name: &str) -> Result<TagGroupSnapshot> {
let config = {
let groups = self.tag_groups.lock().await;
groups.get(group_name).cloned().ok_or_else(|| {
EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
})?
};
let mut client = self.clone();
let tag_refs: Vec<&str> = config.tags.iter().map(String::as_str).collect();
let values = client.read_tags_batch(&tag_refs).await?;
let mapped = values
.into_iter()
.map(|(tag_name, result)| match result {
Ok(value) => TagGroupValueResult {
tag_name,
value: Some(value),
error: None,
},
Err(e) => TagGroupValueResult {
tag_name,
value: None,
error: Some(e.to_string()),
},
})
.collect();
Ok(TagGroupSnapshot {
group_name: config.name,
sampled_at: std::time::SystemTime::now(),
values: mapped,
})
}
pub async fn subscribe_tag_group(&self, group_name: &str) -> Result<TagGroupSubscription> {
let config = {
let groups = self.tag_groups.lock().await;
groups.get(group_name).cloned().ok_or_else(|| {
EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
})?
};
let subscription = TagGroupSubscription::new(config.name.clone(), config.update_rate_ms);
let subscription_task = subscription.clone();
let mut client = self.clone();
let tags = config.tags.clone();
let interval = std::time::Duration::from_millis(config.update_rate_ms as u64);
let group_name_owned = config.name.clone();
tokio::spawn(async move {
while subscription_task.is_active() {
let tag_refs: Vec<&str> = tags.iter().map(String::as_str).collect();
match client.read_tags_batch(&tag_refs).await {
Ok(values) => {
let has_errors = values.iter().any(|(_, result)| result.is_err());
let snapshot = TagGroupSnapshot {
group_name: group_name_owned.clone(),
sampled_at: std::time::SystemTime::now(),
values: values
.into_iter()
.map(|(tag_name, result)| match result {
Ok(value) => TagGroupValueResult {
tag_name,
value: Some(value),
error: None,
},
Err(e) => TagGroupValueResult {
tag_name,
value: None,
error: Some(e.to_string()),
},
})
.collect(),
};
let event = TagGroupEvent {
kind: if has_errors {
TagGroupEventKind::PartialError
} else {
TagGroupEventKind::Data
},
snapshot,
error: None,
failure: None,
};
if let Err(e) = subscription_task.publish_event(event).await {
tracing::error!(
"Tag group '{}' publish failed: {}",
group_name_owned,
e
);
break;
}
}
Err(e) => {
tracing::error!(
"Tag group '{}' polling read failed: {}",
group_name_owned,
e
);
let failure_event = TagGroupEvent {
kind: TagGroupEventKind::ReadFailure,
snapshot: TagGroupSnapshot {
group_name: group_name_owned.clone(),
sampled_at: std::time::SystemTime::now(),
values: Vec::new(),
},
error: Some(e.to_string()),
failure: Some(TagGroupFailureDiagnostic::from_error(&e)),
};
if let Err(publish_error) =
subscription_task.publish_event(failure_event).await
{
tracing::error!(
"Tag group '{}' failure-event publish failed: {}",
group_name_owned,
publish_error
);
break;
}
}
}
tokio::time::sleep(interval).await;
}
});
Ok(subscription)
}
async fn update_subscription(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
let subscriptions = self.subscriptions.lock().await;
for subscription in subscriptions.iter() {
if subscription.tag_path == tag_name && subscription.is_active() {
subscription.update_value(value).await?;
}
}
Ok(())
}
async fn _get_connected_session(
&mut self,
session_name: &str,
) -> crate::error::Result<ConnectedSession> {
{
let sessions = self.connected_sessions.lock().await;
if let Some(session) = sessions.get(session_name) {
return Ok(session.clone());
}
}
let session = self.establish_connected_session(session_name).await?;
let mut sessions = self.connected_sessions.lock().await;
sessions.insert(session_name.to_string(), session.clone());
Ok(session)
}
#[allow(dead_code)]
fn parse_udt_structure(&self, data: &[u8]) -> crate::error::Result<PlcValue> {
tracing::debug!("Parsing UDT structure with {} bytes", data.len());
if data.len() >= 12 {
let _offset = 0;
for alignment in 0..4 {
if alignment + 12 <= data.len() {
let aligned_data = &data[alignment..];
if aligned_data.len() >= 4 {
let dint1_bytes = [
aligned_data[0],
aligned_data[1],
aligned_data[2],
aligned_data[3],
];
let dint1_value = i32::from_le_bytes(dint1_bytes);
if aligned_data.len() >= 8 {
let dint2_bytes = [
aligned_data[4],
aligned_data[5],
aligned_data[6],
aligned_data[7],
];
let dint2_value = i32::from_le_bytes(dint2_bytes);
if aligned_data.len() >= 12 {
let real_bytes = [
aligned_data[8],
aligned_data[9],
aligned_data[10],
aligned_data[11],
];
let real_value = f32::from_le_bytes(real_bytes);
tracing::trace!(
"Alignment {}: DINT1={}, DINT2={}, REAL={}",
alignment,
dint1_value,
dint2_value,
real_value
);
if self.is_reasonable_udt_values(
dint1_value,
dint2_value,
real_value,
) {
tracing::debug!(
"Found reasonable UDT values at alignment {}",
alignment
);
return Ok(PlcValue::Udt(UdtData {
symbol_id: 0, data: data.to_vec(),
}));
}
}
}
}
}
}
}
if data.len() >= 4 {
let interpretations = vec![
("DINT_at_start", 0, 4),
("DINT_at_end", data.len().saturating_sub(4), data.len()),
("DINT_middle", data.len() / 2, data.len() / 2 + 4),
];
for (name, start, end) in interpretations {
if end <= data.len() && end > start {
let bytes = &data[start..end];
if bytes.len() == 4 {
let dint_value =
i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
tracing::trace!("{}: DINT = {}", name, dint_value);
if self.is_reasonable_value(dint_value) {
return Ok(PlcValue::Udt(UdtData {
symbol_id: 0, data: data.to_vec(),
}));
}
}
}
}
}
Err(crate::error::EtherNetIpError::Protocol(
"Could not parse UDT structure".to_string(),
))
}
#[allow(dead_code)]
fn parse_udt_simple(&self, data: &[u8]) -> crate::error::Result<PlcValue> {
Ok(PlcValue::Udt(UdtData {
symbol_id: 0, data: data.to_vec(),
}))
}
#[allow(dead_code)]
fn is_reasonable_udt_values(&self, dint1: i32, dint2: i32, real: f32) -> bool {
let dint1_reasonable = (-1000..=1000).contains(&dint1);
let dint2_reasonable = (-1000..=1000).contains(&dint2);
let real_reasonable = (-1000.0..=1000.0).contains(&real) && real.is_finite();
tracing::trace!(
"Reasonableness check: DINT1={} ({}), DINT2={} ({}), REAL={} ({})",
dint1,
dint1_reasonable,
dint2,
dint2_reasonable,
real,
real_reasonable
);
dint1_reasonable && dint2_reasonable && real_reasonable
}
#[allow(dead_code)]
fn is_reasonable_value(&self, value: i32) -> bool {
(-1000..=1000).contains(&value)
}
}