1use crate::EtherNetIpStream;
2use crate::batch::{BatchConfig, BatchOperation};
3use crate::error::{EtherNetIpError, Result};
4use crate::protocol::cip::{CipRequest, CipResponse, READ_TAG, SendDataRequest, WRITE_TAG};
5use crate::protocol::encap::{EncapsulationHeader, REGISTER_SESSION, UNREGISTER_SESSION};
6use crate::protocol::values;
7use crate::protocol::{Decode, Encode};
8use crate::route::RoutePath;
9use crate::subscription::TagSubscription;
10use crate::tag_group::TagGroupConfig;
11use crate::tag_manager::{TagManager, TagMetadata, TagPermissions, TagScope};
12use crate::types::{ConnectedSession, PlcValue, UdtData};
13use crate::udt::{TagAttributes, UdtDefinition, UdtManager};
14use crate::{TagPath, udt};
15use bytes::BytesMut;
16use std::collections::HashMap;
17use std::net::SocketAddr;
18#[cfg(feature = "ffi")]
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::{Arc, Mutex as StdMutex};
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23use tokio::net::TcpStream;
24#[cfg(feature = "ffi")]
25use tokio::runtime::Runtime;
26use tokio::sync::Mutex;
27use tokio::time::{Duration, Instant, timeout};
28
29mod actor;
30mod batch_exec;
31mod diagnostics;
32mod schema_export;
33mod service_layer;
34mod string;
35mod subscriptions;
36
37pub use actor::{Backoff, Client, ConnectionEvent, RetryClient, RetryPolicy};
38
39#[derive(Debug)]
40struct TagListPage {
41 tags: Vec<TagAttributes>,
42 last_instance_id: Option<u32>,
43 partial_transfer: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct TemplateAttributes {
48 structure_handle: u16,
49 member_count: u16,
50 definition_size_words: u32,
51 structure_size_bytes: u32,
52}
53
54#[cfg(feature = "ffi")]
56pub(crate) static RUNTIME: LazyLock<std::io::Result<Runtime>> = LazyLock::new(Runtime::new);
57
58#[derive(Clone)]
231pub struct EipClient {
232 stream: Arc<Mutex<Box<dyn EtherNetIpStream>>>,
234 session_handle: u32,
236 tag_manager: Arc<Mutex<TagManager>>,
238 udt_manager: Arc<Mutex<UdtManager>>,
240 route_path: Arc<StdMutex<Option<RoutePath>>>,
242 max_packet_size: Arc<AtomicU32>,
244 last_activity: Arc<Mutex<Instant>>,
246 batch_config: BatchConfig,
248 connected_sessions: Arc<Mutex<HashMap<String, ConnectedSession>>>,
250 connection_sequence: Arc<Mutex<u32>>,
252 subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
254 tag_groups: Arc<Mutex<HashMap<String, TagGroupConfig>>>,
256}
257
258#[cfg(test)]
259const _: fn() = || {
260 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
261 assert_send_sync_static::<EipClient>();
262};
263
264impl std::fmt::Debug for EipClient {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 f.debug_struct("EipClient")
267 .field("session_handle", &self.session_handle)
268 .field("route_path", &self.route_path_snapshot())
269 .field("max_packet_size", &self.max_packet_size())
270 .field("batch_config", &self.batch_config)
271 .field("stream", &"<stream>")
272 .field("tag_manager", &"<tag_manager>")
273 .field("udt_manager", &"<udt_manager>")
274 .field("connected_sessions", &"<connected_sessions>")
275 .field("subscriptions", &"<subscriptions>")
276 .field("tag_groups", &"<tag_groups>")
277 .finish()
278 }
279}
280
281impl EipClient {
282 async fn from_stream<S>(stream: S) -> Result<Self>
285 where
286 S: EtherNetIpStream + 'static,
287 {
288 let mut client = Self {
289 stream: Arc::new(Mutex::new(Box::new(stream))),
290 session_handle: 0,
291 tag_manager: Arc::new(Mutex::new(TagManager::new())),
292 udt_manager: Arc::new(Mutex::new(UdtManager::new())),
293 route_path: Arc::new(StdMutex::new(None)),
294 max_packet_size: Arc::new(AtomicU32::new(4000)),
295 last_activity: Arc::new(Mutex::new(Instant::now())),
296 batch_config: BatchConfig::default(),
297 connected_sessions: Arc::new(Mutex::new(HashMap::new())),
298 connection_sequence: Arc::new(Mutex::new(1)),
299 subscriptions: Arc::new(Mutex::new(Vec::new())),
300 tag_groups: Arc::new(Mutex::new(HashMap::new())),
301 };
302 client.register_session().await?;
303 client.negotiate_packet_size().await?;
304 Ok(client)
305 }
306
307 pub async fn new(addr: &str) -> Result<Self> {
308 let addr = addr
309 .parse::<SocketAddr>()
310 .map_err(|e| EtherNetIpError::Protocol(format!("Invalid address format: {e}")))?;
311 let stream = TcpStream::connect(addr).await?;
312 Self::from_stream(stream).await
313 }
314
315 pub async fn connect(addr: &str) -> Result<Self> {
317 Self::new(addr).await
318 }
319
320 #[cfg(test)]
321 fn new_unconnected_for_testing() -> Self {
322 let (stream, _peer) = tokio::io::duplex(64);
323 Self {
324 stream: Arc::new(Mutex::new(Box::new(stream))),
325 session_handle: 0,
326 tag_manager: Arc::new(Mutex::new(TagManager::new())),
327 udt_manager: Arc::new(Mutex::new(UdtManager::new())),
328 route_path: Arc::new(StdMutex::new(None)),
329 max_packet_size: Arc::new(AtomicU32::new(4000)),
330 last_activity: Arc::new(Mutex::new(Instant::now())),
331 batch_config: BatchConfig::default(),
332 connected_sessions: Arc::new(Mutex::new(HashMap::new())),
333 connection_sequence: Arc::new(Mutex::new(1)),
334 subscriptions: Arc::new(Mutex::new(Vec::new())),
335 tag_groups: Arc::new(Mutex::new(HashMap::new())),
336 }
337 }
338
339 async fn register_session(&mut self) -> crate::error::Result<()> {
361 tracing::debug!("Starting session registration...");
362 let mut packet = BytesMut::with_capacity(28);
363 EncapsulationHeader::new(REGISTER_SESSION, 4, 0).encode(&mut packet);
364 packet.extend_from_slice(&[0x01, 0x00]); packet.extend_from_slice(&[0x00, 0x00]); tracing::trace!("Sending Register Session packet: {:02X?}", packet);
368 self.stream
369 .lock()
370 .await
371 .write_all(&packet)
372 .await
373 .map_err(|e| {
374 tracing::error!("Failed to send Register Session packet: {}", e);
375 EtherNetIpError::Io(e)
376 })?;
377
378 let mut buf = [0u8; 1024];
379 tracing::debug!("Waiting for Register Session response...");
380 let n = match timeout(
381 Duration::from_secs(5),
382 self.stream.lock().await.read(&mut buf),
383 )
384 .await
385 {
386 Ok(Ok(n)) => {
387 tracing::trace!("Received {} bytes in response", n);
388 n
389 }
390 Ok(Err(e)) => {
391 tracing::error!("Error reading response: {}", e);
392 return Err(EtherNetIpError::Io(e));
393 }
394 Err(_) => {
395 tracing::warn!("Timeout waiting for response");
396 return Err(EtherNetIpError::Timeout(Duration::from_secs(5)));
397 }
398 };
399
400 if n < 28 {
401 tracing::error!("Response too short: {} bytes (expected 28)", n);
402 return Err(EtherNetIpError::Protocol("Response too short".to_string()));
403 }
404
405 let mut response = &buf[..n];
406 let header = EncapsulationHeader::decode(&mut response)?;
407
408 self.session_handle = header.session_handle;
410 tracing::debug!("Session handle: 0x{:08X}", self.session_handle);
411
412 let status = header.status;
414 tracing::trace!("Status code: 0x{:08X}", status);
415
416 if status != 0 {
417 tracing::error!("Session registration failed with status: 0x{:08X}", status);
418 return Err(EtherNetIpError::Protocol(format!(
419 "Session registration failed with status: 0x{status:08X}"
420 )));
421 }
422
423 tracing::info!("Session registration successful");
424 Ok(())
425 }
426
427 pub fn set_max_packet_size(&mut self, size: u32) {
429 self.max_packet_size
430 .store(size.min(4000), Ordering::Relaxed);
431 }
432
433 pub(crate) fn max_packet_size(&self) -> u32 {
434 self.max_packet_size.load(Ordering::Relaxed)
435 }
436
437 fn route_path_snapshot(&self) -> Option<RoutePath> {
438 self.route_path
439 .lock()
440 .unwrap_or_else(|poisoned| poisoned.into_inner())
441 .clone()
442 }
443
444 pub async fn discover_tags(&mut self) -> crate::error::Result<()> {
446 let response = self
447 .send_cip_request(&self.build_list_tags_request())
448 .await?;
449
450 let cip_data = self.extract_cip_from_response(&response)?;
452
453 if let Err(e) = self.check_cip_error(&cip_data) {
455 return Err(crate::error::EtherNetIpError::Protocol(format!(
456 "Tag discovery failed: {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
457 e
458 )));
459 }
460
461 let tags = {
462 let tag_manager = self.tag_manager.lock().await;
463 tag_manager.parse_tag_list(&cip_data)?
464 };
465
466 tracing::debug!("Initial tag discovery found {} tags", tags.len());
467
468 let hierarchical_tags = {
470 let tag_manager = self.tag_manager.lock().await;
471 let hierarchical_tags = tag_manager.drill_down_tags(&tags).await?;
472 drop(tag_manager);
473 hierarchical_tags
474 };
475
476 tracing::debug!(
477 "After drill-down: {} total tags discovered",
478 hierarchical_tags.len()
479 );
480
481 {
482 let tag_manager = self.tag_manager.lock().await;
483 let mut cache = tag_manager.cache.write()?;
484 for (name, metadata) in hierarchical_tags {
485 cache.insert(name, metadata);
486 }
487 }
488 Ok(())
489 }
490
491 pub async fn discover_udt_members(
493 &mut self,
494 udt_name: &str,
495 ) -> crate::error::Result<Vec<(String, TagMetadata)>> {
496 let cip_request = {
498 let tag_manager = self.tag_manager.lock().await;
499 tag_manager.build_udt_definition_request(udt_name)?
500 };
501
502 let response = self.send_cip_request(&cip_request).await?;
504
505 let definition = {
507 let tag_manager = self.tag_manager.lock().await;
508 tag_manager.parse_udt_definition_response(&response, udt_name)?
509 };
510
511 {
513 let tag_manager = self.tag_manager.lock().await;
514 let mut definitions = tag_manager.udt_definitions.write()?;
515 definitions.insert(udt_name.to_string(), definition.clone());
516 }
517
518 let mut members = Vec::new();
520 for member in &definition.members {
521 let member_name = member.name.clone();
522 let full_name = format!("{}.{}", udt_name, member_name);
523
524 let metadata = TagMetadata {
525 data_type: member.data_type,
526 scope: TagScope::Controller,
527 permissions: TagPermissions {
528 readable: true,
529 writable: true,
530 },
531 is_array: false,
532 dimensions: Vec::new(),
533 last_access: std::time::Instant::now(),
534 size: member.size,
535 array_info: None,
536 last_updated: std::time::Instant::now(),
537 };
538
539 members.push((full_name, metadata));
540 }
541
542 Ok(members)
543 }
544
545 pub async fn get_udt_definition_cached(&self, udt_name: &str) -> Option<UdtDefinition> {
547 let tag_manager = self.tag_manager.lock().await;
548 tag_manager.get_udt_definition_cached(udt_name)
549 }
550
551 pub async fn list_udt_definitions(&self) -> Vec<String> {
553 let tag_manager = self.tag_manager.lock().await;
554 tag_manager.list_udt_definitions()
555 }
556
557 pub async fn discover_tags_detailed(&mut self) -> crate::error::Result<Vec<TagAttributes>> {
560 let (tags, _) = self.discover_tags_detailed_internal(false).await?;
561 Ok(tags)
562 }
563
564 async fn discover_tags_detailed_internal(
565 &mut self,
566 best_effort: bool,
567 ) -> crate::error::Result<(Vec<TagAttributes>, Vec<String>)> {
568 let mut start_instance = 0u32;
569 let mut tags = Vec::new();
570 let mut warnings = Vec::new();
571
572 loop {
573 let request = self.build_tag_list_request_from_instance(start_instance)?;
574 let response = match self.send_cip_request(&request).await {
575 Ok(response) => response,
576 Err(err) if best_effort && !tags.is_empty() => {
577 warnings.push(format!(
578 "Tag discovery stopped early at instance {} after transport/protocol failure: {}",
579 start_instance, err
580 ));
581 break;
582 }
583 Err(err) => return Err(err),
584 };
585 let cip_data = match self.extract_cip_from_response(&response) {
586 Ok(cip_data) => cip_data,
587 Err(err) if best_effort && !tags.is_empty() => {
588 warnings.push(format!(
589 "Tag discovery stopped early at instance {} after response extraction failure: {}",
590 start_instance, err
591 ));
592 break;
593 }
594 Err(err) => return Err(err),
595 };
596 let page = match self.parse_tag_list_response_page(&cip_data) {
597 Ok(page) => page,
598 Err(err) if best_effort && !tags.is_empty() => {
599 warnings.push(format!(
600 "Tag discovery stopped early at instance {} after page-parse failure: {}",
601 start_instance, err
602 ));
603 break;
604 }
605 Err(err) => return Err(err),
606 };
607
608 tags.extend(page.tags);
609
610 if !page.partial_transfer {
611 break;
612 }
613
614 let Some(last_instance_id) = page.last_instance_id else {
615 return Err(crate::error::EtherNetIpError::Protocol(
616 "Tag discovery returned Partial transfer without a last instance ID"
617 .to_string(),
618 ));
619 };
620
621 if last_instance_id == u32::MAX || last_instance_id < start_instance {
622 return Err(crate::error::EtherNetIpError::Protocol(format!(
623 "Tag discovery pagination stalled at instance {}",
624 last_instance_id
625 )));
626 }
627
628 start_instance = last_instance_id.saturating_add(1);
629 }
630
631 Ok((tags, warnings))
632 }
633
634 pub async fn discover_program_tags(
637 &mut self,
638 program_name: &str,
639 ) -> crate::error::Result<Vec<TagAttributes>> {
640 let request = self.build_program_tag_list_request(program_name)?;
642 let response = self.send_cip_request(&request).await?;
643
644 let cip_data = self.extract_cip_from_response(&response)?;
646
647 if let Err(e) = self.check_cip_error(&cip_data) {
649 return Err(crate::error::EtherNetIpError::Protocol(format!(
650 "Program tag discovery failed for '{}': {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
651 program_name, e
652 )));
653 }
654
655 self.parse_tag_list_response(&cip_data)
657 }
658
659 pub async fn list_cached_tag_attributes(&self) -> Vec<String> {
661 self.udt_manager.lock().await.list_tag_attributes()
662 }
663
664 pub async fn clear_caches(&mut self) {
666 if let Err(error) = self.tag_manager.lock().await.clear_cache().await {
667 tracing::warn!("failed to clear tag metadata cache: {error}");
668 }
669 self.udt_manager.lock().await.clear_cache();
670 }
671
672 pub async fn with_route_path(addr: &str, route: RoutePath) -> crate::error::Result<Self> {
674 let mut client = Self::new(addr).await?;
675 client.set_route_path(route);
676 Ok(client)
677 }
678
679 pub async fn connect_with_stream<S>(stream: S, route: Option<RoutePath>) -> Result<Self>
707 where
708 S: EtherNetIpStream + 'static,
709 {
710 let mut client = Self::from_stream(stream).await?;
711 if let Some(route) = route {
712 client.set_route_path(route);
713 }
714 Ok(client)
715 }
716
717 pub fn set_route_path(&mut self, route: RoutePath) {
719 *self
720 .route_path
721 .lock()
722 .unwrap_or_else(|poisoned| poisoned.into_inner()) = Some(route);
723 }
724
725 pub fn get_route_path(&self) -> Option<RoutePath> {
727 self.route_path_snapshot()
728 }
729
730 pub fn clear_route_path(&mut self) {
732 *self
733 .route_path
734 .lock()
735 .unwrap_or_else(|poisoned| poisoned.into_inner()) = None;
736 }
737
738 pub async fn get_tag_metadata(&self, tag_name: &str) -> Option<TagMetadata> {
740 let tag_manager = self.tag_manager.lock().await;
741 match tag_manager.cache.read() {
742 Ok(cache) => cache.get(tag_name).cloned(),
743 Err(_) => {
744 tracing::warn!("failed to read tag metadata cache: lock poisoned");
745 None
746 }
747 }
748 }
749
750 pub async fn read_tag(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
816 self.validate_session().await?;
817
818 if let Some((base_name, index)) = self.parse_array_element_access(tag_name) {
822 if let Some(bracket_start) = tag_name.find('[')
825 && let Some(bracket_end_rel) = tag_name[bracket_start..].find(']')
826 {
827 let bracket_end_abs = bracket_start + bracket_end_rel;
828 let after_bracket = &tag_name[bracket_end_abs + 1..];
829 tracing::debug!(
830 "Array element detected for '{}': base='{}', index={}, after_bracket='{}'",
831 tag_name,
832 base_name,
833 index,
834 after_bracket
835 );
836 if !after_bracket.starts_with('.') {
838 tracing::debug!(
839 "Detected simple array element access: {}[{}], using workaround",
840 base_name,
841 index
842 );
843 return self.read_array_element_workaround(&base_name, index).await;
844 } else {
845 tracing::debug!(
846 "Array element '{}[{}]' has member access after bracket ('{}'), using TagPath::parse()",
847 base_name,
848 index,
849 after_bracket
850 );
851 }
852 }
853 }
854
855 if let Some((parent_path, index)) = self.parse_final_array_element_access(tag_name)
859 && self.detect_bool_array_path(&parent_path).await?
860 {
861 return self
862 .read_bool_array_element_workaround(&parent_path, index)
863 .await;
864 }
865
866 let response = self
867 .send_cip_request(&self.build_read_request(tag_name)?)
868 .await?;
869 let cip_data = self.extract_cip_from_response(&response)?;
870 self.parse_cip_response(&cip_data)
871 }
872
873 pub async fn read_bit(&mut self, tag_base: &str, bit_index: u8) -> crate::error::Result<bool> {
884 if bit_index >= 32 {
885 return Err(crate::error::EtherNetIpError::Protocol(
886 "bit_index must be 0..32 for DINT bit access".to_string(),
887 ));
888 }
889 let path = format!("{}.{}", tag_base, bit_index);
890 match self.read_tag(&path).await? {
891 PlcValue::Bool(b) => Ok(b),
892 PlcValue::Dint(n) => {
893 Ok((n >> bit_index) & 1 != 0)
895 }
896 other => Err(crate::error::EtherNetIpError::DataTypeMismatch {
897 expected: "BOOL or DINT".to_string(),
898 actual: format!("{:?}", other),
899 }),
900 }
901 }
902
903 pub async fn write_bit(
914 &mut self,
915 tag_base: &str,
916 bit_index: u8,
917 value: bool,
918 ) -> crate::error::Result<()> {
919 if bit_index >= 32 {
920 return Err(crate::error::EtherNetIpError::Protocol(
921 "bit_index must be 0..32 for DINT bit access".to_string(),
922 ));
923 }
924 let path = format!("{}.{}", tag_base, bit_index);
925 self.write_tag(&path, PlcValue::Bool(value)).await
926 }
927
928 fn parse_array_element_access(&self, tag_name: &str) -> Option<(String, u32)> {
930 if let Some(bracket_pos) = tag_name.rfind('[')
932 && let Some(close_bracket_pos) = tag_name.rfind(']')
933 && close_bracket_pos > bracket_pos
934 {
935 let base_name = tag_name[..bracket_pos].to_string();
936 let index_str = &tag_name[bracket_pos + 1..close_bracket_pos];
937 if let Ok(index) = index_str.parse::<u32>()
938 && !tag_name[..bracket_pos].contains('[')
939 {
940 return Some((base_name, index));
942 }
943 }
944 None
945 }
946
947 fn parse_final_array_element_access(&self, tag_name: &str) -> Option<(String, u32)> {
948 match TagPath::parse(tag_name).ok()? {
949 TagPath::Array { base_path, indices } if indices.len() == 1 => {
950 Some((base_path.as_string(), indices[0]))
951 }
952 _ => None,
953 }
954 }
955
956 async fn detect_bool_array_path(&mut self, array_path: &str) -> crate::error::Result<bool> {
957 let test_response = self
958 .send_cip_request(&self.build_read_request_with_count(array_path, 1)?)
959 .await?;
960 let test_cip_data = self.extract_cip_from_response(&test_response)?;
961
962 if self.check_cip_error(&test_cip_data).is_err() || test_cip_data.len() < 6 {
963 return Ok(false);
964 }
965
966 let test_data_type = u16::from_le_bytes([test_cip_data[4], test_cip_data[5]]);
967 Ok(test_data_type == values::BOOL_ARRAY_DWORD)
968 }
969
970 fn parse_bool_array_dword_response(&self, cip_data: &[u8]) -> crate::error::Result<u32> {
971 if cip_data.len() < 6 {
972 return Err(EtherNetIpError::Protocol(
973 "BOOL array response too short".to_string(),
974 ));
975 }
976
977 self.check_cip_error(cip_data)?;
978
979 let service_reply = cip_data[0];
980 if service_reply != 0xCC {
981 return Err(EtherNetIpError::Protocol(format!(
982 "Unexpected service reply: 0x{service_reply:02X}"
983 )));
984 }
985
986 let data_type = u16::from_le_bytes([cip_data[4], cip_data[5]]);
987 if data_type != values::BOOL_ARRAY_DWORD {
988 return Err(EtherNetIpError::Protocol(format!(
989 "Expected BOOL array DWORD data type 0x00D3, got 0x{data_type:04X}"
990 )));
991 }
992
993 let value_data = if cip_data.len() >= 12 {
994 &cip_data[8..]
995 } else if cip_data.len() >= 10 {
996 &cip_data[6..]
997 } else {
998 return Err(EtherNetIpError::Protocol(
999 "BOOL array response too short for data".to_string(),
1000 ));
1001 };
1002
1003 if value_data.len() < 4 {
1004 return Err(EtherNetIpError::Protocol(format!(
1005 "BOOL array data too short: need 4 bytes (DWORD), got {} bytes",
1006 value_data.len()
1007 )));
1008 }
1009
1010 Ok(u32::from_le_bytes([
1011 value_data[0],
1012 value_data[1],
1013 value_data[2],
1014 value_data[3],
1015 ]))
1016 }
1017
1018 async fn read_array_element_workaround(
1031 &mut self,
1032 base_array_name: &str,
1033 index: u32,
1034 ) -> crate::error::Result<PlcValue> {
1035 tracing::debug!(
1036 "Reading array element '{}[{}]' using element addressing",
1037 base_array_name,
1038 index
1039 );
1040
1041 let test_response = self
1043 .send_cip_request(&self.build_read_request_with_count(base_array_name, 1)?)
1044 .await?;
1045 let test_cip_data = self.extract_cip_from_response(&test_response)?;
1046
1047 self.check_cip_error(&test_cip_data)?;
1049
1050 if test_cip_data.len() >= 6 {
1052 let test_data_type = u16::from_le_bytes([test_cip_data[4], test_cip_data[5]]);
1053 if test_data_type == 0x00D3 {
1054 return self
1056 .read_bool_array_element_workaround(base_array_name, index)
1057 .await;
1058 }
1059 }
1060
1061 let request = self.build_read_array_request(base_array_name, index, 1);
1064
1065 let response = self.send_cip_request(&request).await?;
1066 let cip_data = self.extract_cip_from_response(&response)?;
1067
1068 self.check_cip_error(&cip_data)?;
1070
1071 self.parse_cip_response(&cip_data)
1074 }
1075
1076 async fn read_bool_array_element_workaround(
1080 &mut self,
1081 base_array_name: &str,
1082 index: u32,
1083 ) -> crate::error::Result<PlcValue> {
1084 tracing::debug!(
1085 "BOOL array detected - reading DWORD and extracting bit [{}]",
1086 index
1087 );
1088
1089 let dword_index = index / 32;
1090
1091 let response = self
1094 .send_cip_request(&self.build_read_array_request(base_array_name, dword_index, 1))
1095 .await?;
1096 let cip_data = self.extract_cip_from_response(&response)?;
1097 let dword_value = self.parse_bool_array_dword_response(&cip_data)?;
1098
1099 let bit_index = (index % 32) as u8;
1102 let bool_value = (dword_value >> bit_index) & 1 != 0;
1103
1104 Ok(PlcValue::Bool(bool_value))
1105 }
1106
1107 async fn read_array_in_chunks(
1114 &mut self,
1115 base_array_name: &str,
1116 data_type: u16,
1117 start_index: u32,
1118 target_element_count: u32,
1119 ) -> crate::error::Result<Vec<u8>> {
1120 let element_size = match data_type {
1122 0x00C1 => 1, 0x00C2 => 1, 0x00C3 => 2, 0x00C4 => 4, 0x00C5 => 8, 0x00C6 => 1, 0x00C7 => 2, 0x00C8 => 4, 0x00C9 => 8, 0x00CA => 4, 0x00CB => 8, _ => {
1134 return Err(EtherNetIpError::Protocol(format!(
1135 "Unsupported array data type for chunked reading: 0x{:04X}",
1136 data_type
1137 )));
1138 }
1139 };
1140
1141 let elements_per_chunk = match element_size {
1144 1 => 30, 2 => 15, 4 => 8, 8 => 4, _ => 8,
1149 };
1150
1151 let end_index = start_index
1152 .checked_add(target_element_count)
1153 .ok_or_else(|| EtherNetIpError::Protocol("Array range overflow".to_string()))?;
1154
1155 let mut all_data = Vec::new();
1156 let mut next_chunk_start = start_index;
1157
1158 tracing::debug!(
1159 "Reading array '{}' in chunks: {} elements per chunk, target: {} elements",
1160 base_array_name,
1161 elements_per_chunk,
1162 target_element_count
1163 );
1164
1165 while next_chunk_start < end_index {
1166 let chunk_end = (next_chunk_start + elements_per_chunk as u32).min(end_index);
1169 let chunk_size = (chunk_end - next_chunk_start) as u16;
1170
1171 tracing::trace!(
1172 "Reading chunk: elements {} to {} ({} elements) using element addressing",
1173 next_chunk_start,
1174 chunk_end - 1,
1175 chunk_size
1176 );
1177
1178 let response = self
1181 .send_cip_request(&self.build_read_array_request(
1182 base_array_name,
1183 next_chunk_start,
1184 chunk_size,
1185 ))
1186 .await?;
1187 let cip_data = self.extract_cip_from_response(&response)?;
1188
1189 if cip_data.len() < 8 {
1190 if cip_data.len() >= 3 {
1193 let general_status = cip_data[2];
1194 if general_status != 0x00 {
1195 let error_msg = self.get_cip_error_message(general_status);
1196 return Err(EtherNetIpError::Protocol(format!(
1197 "CIP Error {} when reading chunk (elements {} to {}): {}",
1198 general_status,
1199 next_chunk_start,
1200 chunk_end - 1,
1201 error_msg
1202 )));
1203 }
1204 }
1205 return Err(EtherNetIpError::Protocol(format!(
1206 "Chunk response too short: got {} bytes, expected at least 8 (requested {} elements starting at {})",
1207 cip_data.len(),
1208 chunk_size,
1209 next_chunk_start
1210 )));
1211 }
1212
1213 if cip_data.len() >= 3 {
1215 let general_status = cip_data[2];
1216 if general_status != 0x00 {
1217 let error_msg = self.get_cip_error_message(general_status);
1218 return Err(EtherNetIpError::Protocol(format!(
1219 "CIP Error {} when reading chunk (elements {} to {}): {}",
1220 general_status,
1221 next_chunk_start,
1222 chunk_end - 1,
1223 error_msg
1224 )));
1225 }
1226 }
1227
1228 if !cip_data.is_empty() && cip_data[0] != 0xCC {
1230 return Err(EtherNetIpError::Protocol(format!(
1231 "Unexpected service reply in chunk: 0x{:02X} (expected 0xCC)",
1232 cip_data[0]
1233 )));
1234 }
1235
1236 if cip_data.len() < 6 {
1237 return Err(EtherNetIpError::Protocol(format!(
1238 "Chunk response too short for data type: got {} bytes, expected at least 6",
1239 cip_data.len()
1240 )));
1241 }
1242
1243 let chunk_data_type = u16::from_le_bytes([cip_data[4], cip_data[5]]);
1244 if chunk_data_type != data_type {
1245 return Err(EtherNetIpError::Protocol(format!(
1246 "Data type mismatch in chunk: expected 0x{:04X}, got 0x{:04X}",
1247 data_type, chunk_data_type
1248 )));
1249 }
1250
1251 let value_data_start = if cip_data.len() >= 8 {
1254 8
1256 } else {
1257 6
1258 };
1259
1260 let chunk_value_data = &cip_data[value_data_start..];
1261 let chunk_complete_bytes = (chunk_value_data.len() / element_size) * element_size;
1262 let chunk_data = &chunk_value_data[..chunk_complete_bytes];
1263
1264 if !chunk_data.is_empty() {
1267 all_data.extend_from_slice(chunk_data);
1268 let elements_received = chunk_data.len() / element_size;
1269 next_chunk_start += elements_received as u32;
1270
1271 tracing::trace!(
1272 "Chunk read: {} elements ({} bytes) starting at index {}, total so far: {} elements",
1273 elements_received,
1274 chunk_data.len(),
1275 next_chunk_start - elements_received as u32,
1276 all_data.len() / element_size
1277 );
1278
1279 if next_chunk_start >= end_index {
1281 tracing::trace!(
1282 "Reached target element count ({}), stopping chunked read",
1283 target_element_count
1284 );
1285 break;
1286 }
1287 } else {
1288 break;
1290 }
1291 }
1292
1293 let final_element_count = all_data.len() / element_size;
1294 tracing::debug!(
1295 "Chunked read complete: {} total elements ({} bytes), target was {} elements",
1296 final_element_count,
1297 all_data.len(),
1298 target_element_count
1299 );
1300
1301 if final_element_count < target_element_count as usize {
1302 return Err(EtherNetIpError::Protocol(format!(
1303 "Incomplete array read: requested {} elements, received {}",
1304 target_element_count, final_element_count
1305 )));
1306 }
1307
1308 Ok(all_data)
1309 }
1310
1311 fn array_element_size(data_type: u16) -> Option<usize> {
1312 match data_type {
1313 0x00C1 => Some(1), 0x00C2 => Some(1), 0x00C3 => Some(2), 0x00C4 => Some(4), 0x00C5 => Some(8), 0x00C6 => Some(1), 0x00C7 => Some(2), 0x00C8 => Some(4), 0x00C9 => Some(8), 0x00CA => Some(4), 0x00CB => Some(8), _ => None,
1325 }
1326 }
1327
1328 fn decode_array_bytes(
1329 &self,
1330 data_type: u16,
1331 bytes: &[u8],
1332 ) -> crate::error::Result<Vec<PlcValue>> {
1333 let Some(element_size) = Self::array_element_size(data_type) else {
1334 return Err(EtherNetIpError::Protocol(format!(
1335 "Unsupported data type for array decoding: 0x{:04X}",
1336 data_type
1337 )));
1338 };
1339
1340 if !bytes.len().is_multiple_of(element_size) {
1341 return Err(EtherNetIpError::Protocol(format!(
1342 "Array payload length {} is not aligned to element size {}",
1343 bytes.len(),
1344 element_size
1345 )));
1346 }
1347
1348 let mut values = Vec::with_capacity(bytes.len() / element_size);
1349 for chunk in bytes.chunks_exact(element_size) {
1350 values.push(values::decode_array_element(data_type, chunk)?);
1351 }
1352
1353 Ok(values)
1354 }
1355
1356 pub async fn read_array_range(
1372 &mut self,
1373 base_array_name: &str,
1374 start_index: u32,
1375 element_count: u32,
1376 ) -> crate::error::Result<Vec<PlcValue>> {
1377 if element_count == 0 {
1378 return Ok(Vec::new());
1379 }
1380
1381 let probe_response = self
1382 .send_cip_request(&self.build_read_array_request(base_array_name, start_index, 1))
1383 .await?;
1384 let probe_cip = self.extract_cip_from_response(&probe_response)?;
1385 self.check_cip_error(&probe_cip)?;
1386
1387 if probe_cip.len() < 6 {
1388 return Err(EtherNetIpError::Protocol(
1389 "Array probe response too short".to_string(),
1390 ));
1391 }
1392
1393 let data_type = u16::from_le_bytes([probe_cip[4], probe_cip[5]]);
1394 let raw = self
1395 .read_array_in_chunks(base_array_name, data_type, start_index, element_count)
1396 .await?;
1397 let values = self.decode_array_bytes(data_type, &raw)?;
1398
1399 if values.len() != element_count as usize {
1400 return Err(EtherNetIpError::Protocol(format!(
1401 "Array read count mismatch: requested {}, got {}",
1402 element_count,
1403 values.len()
1404 )));
1405 }
1406
1407 Ok(values)
1408 }
1409
1410 async fn write_array_element_workaround(
1424 &mut self,
1425 base_array_name: &str,
1426 index: u32,
1427 value: PlcValue,
1428 ) -> crate::error::Result<()> {
1429 tracing::debug!(
1430 "Writing to array element '{}[{}]' using element addressing",
1431 base_array_name,
1432 index
1433 );
1434
1435 let test_response = self
1437 .send_cip_request(&self.build_read_request_with_count(base_array_name, 1)?)
1438 .await?;
1439 let test_cip_data = self.extract_cip_from_response(&test_response)?;
1440
1441 if test_cip_data.len() < 3 {
1443 return Err(EtherNetIpError::Protocol(
1444 "Test read response too short".to_string(),
1445 ));
1446 }
1447
1448 if let Err(e) = self.check_cip_error(&test_cip_data) {
1450 return Err(EtherNetIpError::Protocol(format!(
1451 "Cannot write to array element: Test read failed: {}",
1452 e
1453 )));
1454 }
1455
1456 if test_cip_data.len() < 6 {
1458 return Err(EtherNetIpError::Protocol(
1459 "Test read response too short to determine data type".to_string(),
1460 ));
1461 }
1462
1463 let test_data_type = u16::from_le_bytes([test_cip_data[4], test_cip_data[5]]);
1464
1465 if test_data_type == 0x00D3 {
1467 return self
1468 .write_bool_array_element_workaround(base_array_name, index, value)
1469 .await;
1470 }
1471
1472 let data_type = test_data_type;
1474 let value_bytes = value.to_bytes();
1475
1476 let request = self.build_write_array_request_with_index(
1479 base_array_name,
1480 index,
1481 1, data_type,
1483 &value_bytes,
1484 )?;
1485
1486 let response = self.send_cip_request(&request).await?;
1487 let cip_data = self.extract_cip_from_response(&response)?;
1488
1489 self.check_cip_error(&cip_data)?;
1491
1492 tracing::info!("Array element write completed successfully");
1493 Ok(())
1494 }
1495
1496 async fn write_bool_array_element_workaround(
1504 &mut self,
1505 base_array_name: &str,
1506 index: u32,
1507 value: PlcValue,
1508 ) -> crate::error::Result<()> {
1509 tracing::debug!(
1510 "BOOL array element write - reading DWORD, modifying bit [{}], writing back",
1511 index
1512 );
1513
1514 let dword_index = index / 32;
1515
1516 let response = self
1518 .send_cip_request(&self.build_read_array_request(base_array_name, dword_index, 1))
1519 .await?;
1520 let cip_data = self.extract_cip_from_response(&response)?;
1521
1522 let bool_value = match value {
1524 PlcValue::Bool(b) => b,
1525 _ => {
1526 return Err(EtherNetIpError::Protocol(
1527 "Expected BOOL value for BOOL array element".to_string(),
1528 ));
1529 }
1530 };
1531
1532 let original_dword_value = self.parse_bool_array_dword_response(&cip_data)?;
1534 let mut dword_value = original_dword_value;
1535
1536 let bit_index = (index % 32) as u8;
1537 if bool_value {
1538 dword_value |= 1u32 << bit_index;
1539 } else {
1540 dword_value &= !(1u32 << bit_index);
1541 }
1542
1543 tracing::trace!(
1544 "Modified BOOL[{}] in DWORD: 0x{:08X} -> 0x{:08X} (bit {} = {})",
1545 index,
1546 original_dword_value,
1547 dword_value,
1548 bit_index,
1549 bool_value
1550 );
1551
1552 let write_request = self.build_write_array_request_with_index(
1554 base_array_name,
1555 dword_index,
1556 1,
1557 values::BOOL_ARRAY_DWORD,
1558 &dword_value.to_le_bytes(),
1559 )?;
1560 let write_response = self.send_cip_request(&write_request).await?;
1561 let write_cip_data = self.extract_cip_from_response(&write_response)?;
1562
1563 self.check_cip_error(&write_cip_data)?;
1565
1566 tracing::info!("BOOL array element write completed successfully");
1567 Ok(())
1568 }
1569
1570 #[allow(dead_code)]
1574 fn build_write_array_request(
1575 &self,
1576 tag_name: &str,
1577 data_type: u16,
1578 element_count: u16,
1579 data: &[u8],
1580 ) -> crate::error::Result<Vec<u8>> {
1581 let mut cip_request = Vec::new();
1582
1583 cip_request.push(0x4D);
1586
1587 let path = self.build_tag_path(tag_name);
1589 cip_request.push((path.len() / 2) as u8);
1590 cip_request.extend_from_slice(&path);
1591
1592 cip_request.extend_from_slice(&data_type.to_le_bytes());
1595 cip_request.extend_from_slice(&element_count.to_le_bytes());
1596
1597 cip_request.extend_from_slice(data);
1599
1600 Ok(cip_request)
1601 }
1602
1603 #[cfg_attr(not(test), allow(dead_code))]
1632 pub fn build_write_array_request_with_index(
1633 &self,
1634 base_array_name: &str,
1635 start_index: u32,
1636 element_count: u16,
1637 data_type: u16,
1638 data: &[u8],
1639 ) -> crate::error::Result<Vec<u8>> {
1640 let mut cip_request = Vec::new();
1641
1642 cip_request.push(0x4D);
1645
1646 let mut full_path = self.build_base_tag_path(base_array_name);
1649
1650 full_path.extend_from_slice(&self.build_element_id_segment(start_index));
1653
1654 if !full_path.len().is_multiple_of(2) {
1656 full_path.push(0x00);
1657 }
1658
1659 let path_size = (full_path.len() / 2) as u8;
1661 cip_request.push(path_size);
1662 cip_request.extend_from_slice(&full_path);
1663
1664 cip_request.extend_from_slice(&data_type.to_le_bytes());
1667 cip_request.extend_from_slice(&element_count.to_le_bytes());
1668 cip_request.extend_from_slice(data);
1669
1670 Ok(cip_request)
1671 }
1672
1673 pub async fn read_udt_chunked(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
1710 self.validate_session().await?;
1711
1712 tracing::debug!("[CHUNKED] Starting advanced UDT reading for: {}", tag_name);
1713
1714 match self.read_tag(tag_name).await {
1716 Ok(value) => {
1717 tracing::debug!("[CHUNKED] Normal read successful");
1718 return Ok(value);
1719 }
1720 Err(crate::error::EtherNetIpError::Protocol(msg))
1721 if msg.contains("Partial transfer") =>
1722 {
1723 tracing::debug!("[CHUNKED] Partial transfer detected, using advanced chunking");
1724 }
1725 Err(e) => {
1726 tracing::warn!("[CHUNKED] Normal read failed: {}", e);
1727 return Err(e);
1728 }
1729 }
1730
1731 self.read_udt_advanced_chunked(tag_name).await
1733 }
1734
1735 async fn read_udt_advanced_chunked(
1737 &mut self,
1738 tag_name: &str,
1739 ) -> crate::error::Result<PlcValue> {
1740 tracing::debug!("[ADVANCED] Using multiple strategies for large UDT");
1741
1742 let chunk_sizes = vec![512, 256, 128, 64, 32, 16, 8, 4];
1744
1745 for chunk_size in chunk_sizes {
1746 tracing::trace!("[ADVANCED] Trying chunk size: {}", chunk_size);
1747
1748 match self.read_udt_with_chunk_size(tag_name, chunk_size).await {
1749 Ok(udt_value) => {
1750 tracing::debug!("[ADVANCED] Success with chunk size {}", chunk_size);
1751 return Ok(udt_value);
1752 }
1753 Err(e) => {
1754 tracing::trace!("[ADVANCED] Chunk size {} failed: {}", chunk_size, e);
1755 continue;
1756 }
1757 }
1758 }
1759
1760 tracing::debug!("[ADVANCED] Trying member-by-member discovery");
1762 match self.read_udt_member_discovery(tag_name).await {
1763 Ok(udt_value) => {
1764 tracing::debug!("[ADVANCED] Member discovery successful");
1765 return Ok(udt_value);
1766 }
1767 Err(e) => {
1768 tracing::warn!("[ADVANCED] Member discovery failed: {}", e);
1769 }
1770 }
1771
1772 tracing::debug!("[ADVANCED] Trying progressive reading");
1774 match self.read_udt_progressive(tag_name).await {
1775 Ok(udt_value) => {
1776 tracing::debug!("[ADVANCED] Progressive reading successful");
1777 return Ok(udt_value);
1778 }
1779 Err(e) => {
1780 tracing::warn!("[ADVANCED] Progressive reading failed: {}", e);
1781 }
1782 }
1783
1784 tracing::warn!("[ADVANCED] All strategies failed, using fallback");
1786 let symbol_id = self
1788 .get_tag_attributes(tag_name)
1789 .await
1790 .ok()
1791 .and_then(|attr| attr.template_instance_id)
1792 .unwrap_or(0) as i32;
1793
1794 Ok(PlcValue::Udt(UdtData {
1796 symbol_id,
1797 data: vec![], }))
1799 }
1800
1801 async fn read_udt_with_chunk_size(
1803 &mut self,
1804 tag_name: &str,
1805 mut chunk_size: usize,
1806 ) -> crate::error::Result<PlcValue> {
1807 let mut all_data = Vec::new();
1808 let mut offset = 0;
1809 let mut consecutive_failures = 0;
1810 const MAX_FAILURES: usize = 3;
1811
1812 loop {
1813 match self
1814 .read_udt_chunk_advanced(tag_name, offset, chunk_size)
1815 .await
1816 {
1817 Ok(chunk_data) => {
1818 if chunk_data.is_empty() {
1819 break; }
1821
1822 all_data.extend_from_slice(&chunk_data);
1823 offset += chunk_data.len();
1824 consecutive_failures = 0;
1825
1826 tracing::trace!(
1827 "[CHUNK] Read {} bytes at offset {}, total: {}",
1828 chunk_data.len(),
1829 offset - chunk_data.len(),
1830 all_data.len()
1831 );
1832
1833 if chunk_data.len() < chunk_size {
1835 break;
1836 }
1837 }
1838 Err(e) => {
1839 consecutive_failures += 1;
1840 tracing::warn!(
1841 "[CHUNK] Chunk read failed (attempt {}): {}",
1842 consecutive_failures,
1843 e
1844 );
1845
1846 if consecutive_failures >= MAX_FAILURES {
1847 break;
1848 }
1849
1850 if chunk_size > 4 {
1852 chunk_size /= 2;
1853 continue;
1854 }
1855 }
1856 }
1857 }
1858
1859 if all_data.is_empty() {
1860 return Err(crate::error::EtherNetIpError::Protocol(
1861 "No data read from UDT".to_string(),
1862 ));
1863 }
1864
1865 tracing::debug!("[CHUNK] Total data collected: {} bytes", all_data.len());
1866
1867 let symbol_id = self
1869 .get_tag_attributes(tag_name)
1870 .await
1871 .ok()
1872 .and_then(|attr| attr.template_instance_id)
1873 .unwrap_or(0) as i32;
1874
1875 Ok(PlcValue::Udt(UdtData {
1877 symbol_id,
1878 data: all_data,
1879 }))
1880 }
1881
1882 async fn read_udt_chunk_advanced(
1884 &mut self,
1885 tag_name: &str,
1886 offset: usize,
1887 size: usize,
1888 ) -> crate::error::Result<Vec<u8>> {
1889 let mut request = Vec::new();
1891
1892 request.push(0x4C);
1894
1895 let tag_path = self.build_tag_path(tag_name);
1897
1898 let path_size = (tag_path.len() / 2) as u8;
1900 request.push(path_size);
1901
1902 request.extend_from_slice(&tag_path);
1904
1905 if offset > 0 {
1908 request.push(0x28); request.push(0x02); request.extend_from_slice(&(offset as u16).to_le_bytes());
1912 }
1913
1914 request.push(0x28); request.push(0x02); request.extend_from_slice(&(size as u16).to_le_bytes());
1918
1919 request.push(0x00);
1921 request.push(0x01);
1922
1923 let response = self.send_cip_request(&request).await?;
1925 let cip_data = self.extract_cip_from_response(&response)?;
1926
1927 if cip_data.len() < 2 {
1929 return Ok(Vec::new()); }
1931
1932 let _data_type = u16::from_le_bytes([cip_data[0], cip_data[1]]);
1933 let data = &cip_data[2..];
1934
1935 Ok(data.to_vec())
1936 }
1937
1938 async fn read_udt_member_discovery(
1944 &mut self,
1945 tag_name: &str,
1946 ) -> crate::error::Result<PlcValue> {
1947 tracing::debug!("[DISCOVERY] Reading UDT as raw data for: {}", tag_name);
1948
1949 let attributes = self.get_tag_attributes(tag_name).await?;
1951
1952 let symbol_id = attributes.template_instance_id.ok_or_else(|| {
1953 crate::error::EtherNetIpError::Protocol(
1954 "UDT template instance ID not found in tag attributes".to_string(),
1955 )
1956 })?;
1957
1958 let raw_data = self.read_tag_raw(tag_name).await?;
1960
1961 tracing::debug!(
1962 "[DISCOVERY] Read {} bytes of UDT data with symbol_id: {}",
1963 raw_data.len(),
1964 symbol_id
1965 );
1966
1967 Ok(PlcValue::Udt(UdtData {
1968 symbol_id: symbol_id as i32,
1969 data: raw_data,
1970 }))
1971 }
1972
1973 async fn read_udt_progressive(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
1975 tracing::debug!("[PROGRESSIVE] Starting progressive reading");
1976
1977 let mut chunk_size = 4;
1979 let mut all_data = Vec::new();
1980 let mut offset = 0;
1981
1982 while chunk_size <= 512 {
1983 match self
1984 .read_udt_chunk_advanced(tag_name, offset, chunk_size)
1985 .await
1986 {
1987 Ok(chunk_data) => {
1988 if chunk_data.is_empty() {
1989 break;
1990 }
1991
1992 all_data.extend_from_slice(&chunk_data);
1993 offset += chunk_data.len();
1994
1995 tracing::trace!(
1996 "[PROGRESSIVE] Read {} bytes with chunk size {}",
1997 chunk_data.len(),
1998 chunk_size
1999 );
2000
2001 if chunk_data.len() == chunk_size {
2003 chunk_size = (chunk_size * 2).min(512);
2004 }
2005 }
2006 Err(_) => {
2007 chunk_size /= 2;
2009 if chunk_size < 4 {
2010 break;
2011 }
2012 }
2013 }
2014 }
2015
2016 if all_data.is_empty() {
2017 return Err(crate::error::EtherNetIpError::Protocol(
2018 "Progressive reading failed".to_string(),
2019 ));
2020 }
2021
2022 tracing::debug!("[PROGRESSIVE] Collected {} bytes total", all_data.len());
2023
2024 let symbol_id = self
2026 .get_tag_attributes(tag_name)
2027 .await
2028 .ok()
2029 .and_then(|attr| attr.template_instance_id)
2030 .unwrap_or(0) as i32;
2031
2032 Ok(PlcValue::Udt(UdtData {
2034 symbol_id,
2035 data: all_data,
2036 }))
2037 }
2038
2039 #[allow(dead_code)]
2041 async fn read_udt_in_chunks(&mut self, tag_name: &str) -> crate::error::Result<PlcValue> {
2042 const MAX_CHUNK_SIZE: usize = 1000; let mut all_data = Vec::new();
2044 let mut offset = 0;
2045 let mut chunk_size = MAX_CHUNK_SIZE;
2046
2047 loop {
2048 match self.read_udt_chunk(tag_name, offset, chunk_size).await {
2050 Ok(chunk_data) => {
2051 all_data.extend_from_slice(&chunk_data);
2052 offset += chunk_data.len();
2053
2054 if chunk_data.len() < chunk_size {
2056 break;
2057 }
2058 }
2059 Err(crate::error::EtherNetIpError::Protocol(msg))
2060 if msg.contains("Partial transfer") =>
2061 {
2062 chunk_size /= 2;
2064 if chunk_size < 100 {
2065 return Err(crate::error::EtherNetIpError::Protocol(
2066 "UDT too large even for smallest chunk size".to_string(),
2067 ));
2068 }
2069 continue;
2070 }
2071 Err(e) => return Err(e),
2072 }
2073 }
2074
2075 let symbol_id = self
2077 .get_tag_attributes(tag_name)
2078 .await
2079 .ok()
2080 .and_then(|attr| attr.template_instance_id)
2081 .unwrap_or(0) as i32;
2082
2083 Ok(PlcValue::Udt(UdtData {
2085 symbol_id,
2086 data: all_data,
2087 }))
2088 }
2089
2090 #[allow(dead_code)]
2092 async fn read_udt_chunk(
2093 &mut self,
2094 tag_name: &str,
2095 offset: usize,
2096 size: usize,
2097 ) -> crate::error::Result<Vec<u8>> {
2098 let mut request = Vec::new();
2100
2101 request.push(0x4C);
2103
2104 let path_size = 2 + tag_name.len().div_ceil(2); request.push(path_size as u8);
2107
2108 request.extend_from_slice(tag_name.as_bytes());
2110 if !tag_name.len().is_multiple_of(2) {
2111 request.push(0); }
2113
2114 request.push(0x28); request.push(0x02); request.extend_from_slice(&(offset as u16).to_le_bytes());
2118
2119 request.push(0x28); request.push(0x02); request.extend_from_slice(&(size as u16).to_le_bytes());
2123
2124 request.push(0x00);
2126 request.push(0x01);
2127
2128 let response = self.send_cip_request(&request).await?;
2130 let cip_data = self.extract_cip_from_response(&response)?;
2131
2132 if cip_data.len() < 2 {
2134 return Err(crate::error::EtherNetIpError::Protocol(
2135 "Response too short".to_string(),
2136 ));
2137 }
2138
2139 let _data_type = u16::from_le_bytes([cip_data[0], cip_data[1]]);
2140 let data = &cip_data[2..];
2141
2142 Ok(data.to_vec())
2143 }
2144
2145 pub async fn read_udt_member_by_offset(
2168 &mut self,
2169 udt_name: &str,
2170 member_offset: usize,
2171 member_size: usize,
2172 data_type: u16,
2173 ) -> crate::error::Result<PlcValue> {
2174 self.validate_session().await?;
2175
2176 let udt_data = self.read_tag_raw(udt_name).await?;
2178
2179 if member_offset + member_size > udt_data.len() {
2181 return Err(crate::error::EtherNetIpError::Protocol(format!(
2182 "Member data incomplete: offset {} + size {} > UDT size {}",
2183 member_offset,
2184 member_size,
2185 udt_data.len()
2186 )));
2187 }
2188
2189 let member_data = &udt_data[member_offset..member_offset + member_size];
2190
2191 let member = crate::udt::UdtMember {
2193 name: "temp".to_string(),
2194 data_type,
2195 offset: member_offset as u32,
2196 size: member_size as u32,
2197 };
2198
2199 let udt = crate::udt::UserDefinedType::new(udt_name.to_string());
2200 Ok(udt.parse_member_value(&member, member_data)?)
2201 }
2202
2203 pub async fn write_udt_member_by_offset(
2227 &mut self,
2228 udt_name: &str,
2229 member_offset: usize,
2230 member_size: usize,
2231 data_type: u16,
2232 value: PlcValue,
2233 ) -> crate::error::Result<()> {
2234 self.validate_session().await?;
2235
2236 let mut udt_data = self.read_tag_raw(udt_name).await?;
2238
2239 if member_offset + member_size > udt_data.len() {
2241 return Err(crate::error::EtherNetIpError::Protocol(format!(
2242 "Member data incomplete: offset {} + size {} > UDT size {}",
2243 member_offset,
2244 member_size,
2245 udt_data.len()
2246 )));
2247 }
2248
2249 let member = crate::udt::UdtMember {
2251 name: "temp".to_string(),
2252 data_type,
2253 offset: member_offset as u32,
2254 size: member_size as u32,
2255 };
2256
2257 let udt = crate::udt::UserDefinedType::new(udt_name.to_string());
2258 let member_data = udt.serialize_member_value(&member, &value)?;
2259
2260 let end_offset = member_offset + member_data.len();
2262 if end_offset <= udt_data.len() {
2263 udt_data[member_offset..end_offset].copy_from_slice(&member_data);
2264 } else {
2265 return Err(crate::error::EtherNetIpError::Protocol(format!(
2266 "Member data exceeds UDT size: {} > {}",
2267 end_offset,
2268 udt_data.len()
2269 )));
2270 }
2271
2272 self.write_tag_raw(udt_name, &udt_data).await
2274 }
2275
2276 pub async fn get_udt_definition(
2279 &mut self,
2280 udt_name: &str,
2281 ) -> crate::error::Result<UdtDefinition> {
2282 if let Some(cached) = self.udt_manager.lock().await.get_definition(udt_name) {
2284 return Ok(cached.clone());
2285 }
2286
2287 let attributes = self.get_tag_attributes(udt_name).await?;
2289
2290 if attributes.data_type != 0x00A0 {
2292 return Err(crate::error::EtherNetIpError::Protocol(format!(
2293 "Tag '{}' is not a UDT (type: {})",
2294 udt_name, attributes.data_type_name
2295 )));
2296 }
2297
2298 let template_id = attributes.template_instance_id.ok_or_else(|| {
2300 crate::error::EtherNetIpError::Protocol(
2301 "UDT template instance ID not found".to_string(),
2302 )
2303 })?;
2304
2305 let (definition, _structure_size_bytes) = self
2306 .load_udt_definition_from_template(template_id, udt_name)
2307 .await?;
2308
2309 Ok(definition)
2310 }
2311
2312 async fn get_udt_definition_by_template_id(
2313 &mut self,
2314 template_id: u32,
2315 udt_name: &str,
2316 ) -> crate::error::Result<(UdtDefinition, u32)> {
2317 if let Some(cached) = self.udt_manager.lock().await.get_definition(udt_name) {
2318 return Ok((cached.clone(), 0));
2319 }
2320
2321 self.load_udt_definition_from_template(template_id, udt_name)
2322 .await
2323 }
2324
2325 async fn load_udt_definition_from_template(
2326 &mut self,
2327 template_id: u32,
2328 udt_name: &str,
2329 ) -> crate::error::Result<(UdtDefinition, u32)> {
2330 let (template_attributes, template_data) = self.read_udt_template(template_id).await?;
2331 let template = self.udt_manager.lock().await.parse_udt_template(
2332 template_id,
2333 template_attributes.member_count,
2334 template_attributes.structure_size_bytes,
2335 &template_data,
2336 )?;
2337
2338 let definition = UdtDefinition {
2339 name: udt_name.to_string(),
2340 members: template.members,
2341 };
2342
2343 self.udt_manager
2344 .lock()
2345 .await
2346 .add_definition(definition.clone());
2347
2348 Ok((definition, template_attributes.structure_size_bytes))
2349 }
2350
2351 pub async fn get_tag_attributes(
2368 &mut self,
2369 tag_name: &str,
2370 ) -> crate::error::Result<TagAttributes> {
2371 if let Some(cached) = self.udt_manager.lock().await.get_tag_attributes(tag_name) {
2373 return Ok(cached.clone());
2374 }
2375
2376 let request = self.build_get_attributes_request(tag_name)?;
2378
2379 let response = self.send_cip_request(&request).await?;
2381 let cip_data = self.extract_cip_from_response(&response)?;
2382
2383 let attributes = self.parse_attributes_response(tag_name, &cip_data)?;
2385
2386 self.udt_manager
2388 .lock()
2389 .await
2390 .add_tag_attributes(attributes.clone());
2391
2392 Ok(attributes)
2393 }
2394
2395 async fn read_udt_template(
2397 &mut self,
2398 template_id: u32,
2399 ) -> crate::error::Result<(TemplateAttributes, Vec<u8>)> {
2400 let template_attributes = self.get_template_attributes(template_id).await?;
2401 let read_size = template_attributes
2402 .definition_size_words
2403 .checked_mul(4)
2404 .and_then(|bytes| bytes.checked_sub(23))
2405 .ok_or_else(|| {
2406 crate::error::EtherNetIpError::Protocol(format!(
2407 "Template {} reported invalid definition size {} words",
2408 template_id, template_attributes.definition_size_words
2409 ))
2410 })?;
2411
2412 let mut template_data = Vec::with_capacity(read_size as usize);
2413 let mut offset = 0u32;
2414
2415 while offset < read_size {
2416 let chunk_size = (read_size - offset).min(200);
2417 let request = self.build_read_template_request(template_id, offset, chunk_size)?;
2418 let response = self.send_cip_request(&request).await?;
2419 let cip_data = self.extract_cip_from_response(&response)?;
2420 let (chunk, partial_transfer) = self.parse_template_response_chunk(&cip_data)?;
2421
2422 if chunk.is_empty() {
2423 return Err(crate::error::EtherNetIpError::Protocol(format!(
2424 "Template {} returned an empty chunk at offset {}",
2425 template_id, offset
2426 )));
2427 }
2428
2429 offset = offset.saturating_add(chunk.len() as u32);
2430 template_data.extend_from_slice(&chunk);
2431
2432 if !partial_transfer && chunk.len() < chunk_size as usize {
2433 break;
2434 }
2435 }
2436
2437 Ok((template_attributes, template_data))
2438 }
2439
2440 async fn get_template_attributes(
2441 &mut self,
2442 template_id: u32,
2443 ) -> crate::error::Result<TemplateAttributes> {
2444 let request = self.build_get_template_attributes_request(template_id)?;
2445 let response = self.send_cip_request(&request).await?;
2446 let cip_data = self.extract_cip_from_response(&response)?;
2447 self.parse_template_attributes_response(template_id, &cip_data)
2448 }
2449
2450 fn build_get_attributes_request(&self, tag_name: &str) -> crate::error::Result<Vec<u8>> {
2452 let mut request = Vec::new();
2453
2454 request.push(0x03);
2456
2457 let tag_bytes = tag_name.as_bytes();
2459 request.push(0x91); request.push(tag_bytes.len() as u8);
2461 request.extend_from_slice(tag_bytes);
2462
2463 request.extend_from_slice(&[0x02, 0x00]); request.extend_from_slice(&[0x01, 0x00]);
2468
2469 request.extend_from_slice(&[0x02, 0x00]);
2471
2472 Ok(request)
2473 }
2474
2475 fn build_get_template_attributes_request(
2476 &self,
2477 template_id: u32,
2478 ) -> crate::error::Result<Vec<u8>> {
2479 let mut request = Vec::new();
2480 let template_id = u16::try_from(template_id).map_err(|_| {
2481 crate::error::EtherNetIpError::Protocol(format!(
2482 "Template instance {} exceeds 16-bit path encoding",
2483 template_id
2484 ))
2485 })?;
2486
2487 request.push(0x03);
2488 request.push(0x03);
2489 request.extend_from_slice(&[0x20, 0x6C, 0x25, 0x00]);
2490 request.extend_from_slice(&template_id.to_le_bytes());
2491 request.extend_from_slice(&[0x04, 0x00]);
2492 request.extend_from_slice(&[0x01, 0x00]);
2493 request.extend_from_slice(&[0x02, 0x00]);
2494 request.extend_from_slice(&[0x04, 0x00]);
2495 request.extend_from_slice(&[0x05, 0x00]);
2496
2497 Ok(request)
2498 }
2499
2500 fn build_read_template_request(
2502 &self,
2503 template_id: u32,
2504 read_offset: u32,
2505 read_size: u32,
2506 ) -> crate::error::Result<Vec<u8>> {
2507 let mut request = Vec::new();
2508 let template_id = u16::try_from(template_id).map_err(|_| {
2509 crate::error::EtherNetIpError::Protocol(format!(
2510 "Template instance {} exceeds 16-bit path encoding",
2511 template_id
2512 ))
2513 })?;
2514 let read_size = u16::try_from(read_size).map_err(|_| {
2515 crate::error::EtherNetIpError::Protocol(format!(
2516 "Template read size {} exceeds 16-bit service limit",
2517 read_size
2518 ))
2519 })?;
2520
2521 request.push(0x4C);
2522 request.push(0x03);
2523 request.extend_from_slice(&[0x20, 0x6C, 0x25, 0x00]);
2524 request.extend_from_slice(&template_id.to_le_bytes());
2525 request.extend_from_slice(&read_offset.to_le_bytes());
2526 request.extend_from_slice(&read_size.to_le_bytes());
2527
2528 Ok(request)
2529 }
2530
2531 fn parse_attributes_response(
2533 &self,
2534 tag_name: &str,
2535 response: &[u8],
2536 ) -> crate::error::Result<TagAttributes> {
2537 if response.len() < 8 {
2538 return Err(crate::error::EtherNetIpError::Protocol(
2539 "Attributes response too short".to_string(),
2540 ));
2541 }
2542
2543 let mut offset = 0;
2544
2545 let data_type = u16::from_le_bytes([response[offset], response[offset + 1]]);
2547 offset += 2;
2548
2549 let size = u32::from_le_bytes([
2551 response[offset],
2552 response[offset + 1],
2553 response[offset + 2],
2554 response[offset + 3],
2555 ]);
2556 offset += 4;
2557
2558 let template_instance_id = if response.len() > offset + 4 {
2560 Some(u32::from_le_bytes([
2561 response[offset],
2562 response[offset + 1],
2563 response[offset + 2],
2564 response[offset + 3],
2565 ]))
2566 } else {
2567 None
2568 };
2569
2570 let attributes = TagAttributes {
2572 name: tag_name.to_string(),
2573 data_type,
2574 data_type_name: self.get_data_type_name(data_type),
2575 dimensions: Vec::new(), permissions: udt::TagPermissions::ReadWrite, scope: if tag_name.contains(':') {
2578 let parts: Vec<&str> = tag_name.split(':').collect();
2579 if parts.len() >= 2 {
2580 udt::TagScope::Program(parts[0].to_string())
2581 } else {
2582 udt::TagScope::Controller
2583 }
2584 } else {
2585 udt::TagScope::Controller
2586 },
2587 template_instance_id,
2588 size,
2589 };
2590
2591 Ok(attributes)
2592 }
2593
2594 fn parse_template_attributes_response(
2595 &self,
2596 template_id: u32,
2597 response: &[u8],
2598 ) -> crate::error::Result<TemplateAttributes> {
2599 if response.len() < 4 {
2600 return Err(crate::error::EtherNetIpError::Protocol(
2601 "Template attribute response too short".to_string(),
2602 ));
2603 }
2604
2605 let general_status = response[2];
2606 if general_status != 0x00 {
2607 return Err(crate::error::EtherNetIpError::Protocol(format!(
2608 "Template {} attribute read failed: {}",
2609 template_id,
2610 self.get_cip_error_message(general_status)
2611 )));
2612 }
2613
2614 let additional_status_words = response[3] as usize;
2615 let mut offset = 4 + additional_status_words * 2;
2616 if response.len() < offset + 2 {
2617 return Err(crate::error::EtherNetIpError::Protocol(
2618 "Template attribute response missing attribute count".to_string(),
2619 ));
2620 }
2621
2622 let attr_count = u16::from_le_bytes([response[offset], response[offset + 1]]) as usize;
2623 offset += 2;
2624
2625 let mut attributes = TemplateAttributes {
2626 structure_handle: 0,
2627 member_count: 0,
2628 definition_size_words: 0,
2629 structure_size_bytes: 0,
2630 };
2631
2632 for _ in 0..attr_count {
2633 if response.len() < offset + 4 {
2634 return Err(crate::error::EtherNetIpError::Protocol(
2635 "Template attribute response truncated".to_string(),
2636 ));
2637 }
2638
2639 let attr_id = u16::from_le_bytes([response[offset], response[offset + 1]]);
2640 let attr_status = u16::from_le_bytes([response[offset + 2], response[offset + 3]]);
2641 offset += 4;
2642
2643 if attr_status != 0 {
2644 return Err(crate::error::EtherNetIpError::Protocol(format!(
2645 "Template {} attribute {} read returned status 0x{:04X}",
2646 template_id, attr_id, attr_status
2647 )));
2648 }
2649
2650 match attr_id {
2651 1 => {
2652 if response.len() < offset + 2 {
2653 return Err(crate::error::EtherNetIpError::Protocol(
2654 "Template attribute 1 missing value".to_string(),
2655 ));
2656 }
2657 attributes.structure_handle =
2658 u16::from_le_bytes([response[offset], response[offset + 1]]);
2659 offset += 2;
2660 }
2661 2 => {
2662 if response.len() < offset + 2 {
2663 return Err(crate::error::EtherNetIpError::Protocol(
2664 "Template attribute 2 missing value".to_string(),
2665 ));
2666 }
2667 attributes.member_count =
2668 u16::from_le_bytes([response[offset], response[offset + 1]]);
2669 offset += 2;
2670 }
2671 4 => {
2672 if response.len() < offset + 4 {
2673 return Err(crate::error::EtherNetIpError::Protocol(
2674 "Template attribute 4 missing value".to_string(),
2675 ));
2676 }
2677 attributes.definition_size_words = u32::from_le_bytes([
2678 response[offset],
2679 response[offset + 1],
2680 response[offset + 2],
2681 response[offset + 3],
2682 ]);
2683 offset += 4;
2684 }
2685 5 => {
2686 if response.len() < offset + 4 {
2687 return Err(crate::error::EtherNetIpError::Protocol(
2688 "Template attribute 5 missing value".to_string(),
2689 ));
2690 }
2691 attributes.structure_size_bytes = u32::from_le_bytes([
2692 response[offset],
2693 response[offset + 1],
2694 response[offset + 2],
2695 response[offset + 3],
2696 ]);
2697 offset += 4;
2698 }
2699 _ => {
2700 return Err(crate::error::EtherNetIpError::Protocol(format!(
2701 "Unexpected template attribute {} in response",
2702 attr_id
2703 )));
2704 }
2705 }
2706 }
2707
2708 if attributes.definition_size_words == 0 {
2709 return Err(crate::error::EtherNetIpError::Protocol(format!(
2710 "Template {} reported zero definition size",
2711 template_id
2712 )));
2713 }
2714
2715 Ok(attributes)
2716 }
2717
2718 fn parse_template_response_chunk(
2719 &self,
2720 response: &[u8],
2721 ) -> crate::error::Result<(Vec<u8>, bool)> {
2722 if response.len() < 4 {
2723 return Err(crate::error::EtherNetIpError::Protocol(
2724 "Template response too short".to_string(),
2725 ));
2726 }
2727
2728 let general_status = response[2];
2729 let partial_transfer = general_status == 0x06;
2730 if general_status != 0x00 && !partial_transfer {
2731 return Err(crate::error::EtherNetIpError::Protocol(format!(
2732 "Template read failed: {}",
2733 self.get_cip_error_message(general_status)
2734 )));
2735 }
2736
2737 let additional_status_words = response[3] as usize;
2738 let data_start = 4 + additional_status_words * 2;
2739 if data_start > response.len() {
2740 return Err(crate::error::EtherNetIpError::Protocol(
2741 "Template response missing payload".to_string(),
2742 ));
2743 }
2744
2745 Ok((response[data_start..].to_vec(), partial_transfer))
2746 }
2747
2748 fn get_data_type_name(&self, data_type: u16) -> String {
2750 match data_type {
2751 0x00C1 => "BOOL".to_string(),
2752 0x00C2 => "SINT".to_string(),
2753 0x00C3 => "INT".to_string(),
2754 0x00C4 => "DINT".to_string(),
2755 0x00C5 => "LINT".to_string(),
2756 0x00C6 => "USINT".to_string(),
2757 0x00C7 => "UINT".to_string(),
2758 0x00C8 => "UDINT".to_string(),
2759 0x00C9 => "ULINT".to_string(),
2760 0x00CA => "REAL".to_string(),
2761 0x00CB => "LREAL".to_string(),
2762 0x00CE => "STRING".to_string(),
2763 0x00A0 => "UDT".to_string(),
2764 _ => format!("UNKNOWN(0x{:04X})", data_type),
2765 }
2766 }
2767
2768 fn build_tag_list_request_from_instance(
2770 &self,
2771 start_instance: u32,
2772 ) -> crate::error::Result<Vec<u8>> {
2773 let start_instance = u16::try_from(start_instance).map_err(|_| {
2774 crate::error::EtherNetIpError::Protocol(format!(
2775 "Tag discovery start instance {} exceeds 16-bit Symbol Object range",
2776 start_instance
2777 ))
2778 })?;
2779 let mut request = vec![
2780 0x55, 0x03, 0x20, 0x6B, 0x25, 0x00,
2784 ];
2785 request.extend_from_slice(&start_instance.to_le_bytes());
2786
2787 request.extend_from_slice(&[0x02, 0x00]);
2789
2790 request.extend_from_slice(&[0x01, 0x00]);
2792
2793 request.extend_from_slice(&[0x02, 0x00]);
2795
2796 Ok(request)
2797 }
2798
2799 fn build_program_tag_list_request(&self, _program_name: &str) -> crate::error::Result<Vec<u8>> {
2801 let mut request = vec![
2802 0x55, 0x03, 0x20, 0x6C, 0x25,
2806 ];
2807 request.extend_from_slice(&[0x00, 0x00, 0x00]);
2808
2809 request.extend_from_slice(&[0x02, 0x00]); request.extend_from_slice(&[0x01, 0x00]);
2814
2815 request.extend_from_slice(&[0x02, 0x00]);
2817
2818 Ok(request)
2819 }
2820
2821 fn parse_tag_list_response_page(&self, response: &[u8]) -> crate::error::Result<TagListPage> {
2823 if response.len() < 4 {
2824 return Err(crate::error::EtherNetIpError::Protocol(
2825 "Tag list response too short".to_string(),
2826 ));
2827 }
2828
2829 let general_status = response[2];
2830 let partial_transfer = general_status == 0x06;
2831 if general_status != 0x00 && !partial_transfer {
2832 return Err(crate::error::EtherNetIpError::Protocol(format!(
2833 "Tag discovery failed: {}. Some PLCs may not support tag discovery. Try reading tags directly by name.",
2834 self.get_cip_error_message(general_status)
2835 )));
2836 }
2837
2838 let additional_status_words = response[3] as usize;
2839 let mut offset = 4 + additional_status_words * 2;
2840 if response.len() == offset {
2841 return Ok(TagListPage {
2842 tags: Vec::new(),
2843 last_instance_id: None,
2844 partial_transfer: false,
2845 });
2846 }
2847 if response.len() < offset + 4 {
2848 return Err(crate::error::EtherNetIpError::Protocol(
2849 "Tag list response missing first entry".to_string(),
2850 ));
2851 }
2852 let mut tags = Vec::new();
2853 let mut last_instance_id = None;
2854
2855 while offset + 8 <= response.len() {
2856 let instance_id = u32::from_le_bytes([
2857 response[offset],
2858 response[offset + 1],
2859 response[offset + 2],
2860 response[offset + 3],
2861 ]);
2862 last_instance_id = Some(instance_id);
2863 offset += 4;
2864
2865 let name_length = u16::from_le_bytes([response[offset], response[offset + 1]]) as usize;
2866 offset += 2;
2867
2868 if offset
2869 .checked_add(name_length)
2870 .is_none_or(|end| end > response.len())
2871 {
2872 break;
2873 }
2874
2875 let name_bytes = &response[offset..offset + name_length];
2876 let tag_name = String::from_utf8_lossy(name_bytes).to_string();
2877 offset += name_length;
2878
2879 if offset + 2 > response.len() {
2880 break;
2881 }
2882
2883 let raw_tag_type = u16::from_le_bytes([response[offset], response[offset + 1]]);
2884 offset += 2;
2885
2886 if tag_name.starts_with("__") || tag_name.contains(':') {
2888 continue;
2889 }
2890
2891 let array_dims = ((raw_tag_type & 0x6000) >> 13) as usize;
2892 let is_structure = (raw_tag_type & 0x8000) != 0;
2893 let reserved = (raw_tag_type & 0x1000) != 0;
2894 let type_param = raw_tag_type & 0x0FFF;
2895 let is_user_atomic =
2896 !is_structure && !reserved && (0x0001..=0x00FF).contains(&type_param);
2897 let is_user_structure =
2898 is_structure && !reserved && (0x0100..=0x0EFF).contains(&type_param);
2899
2900 if !is_user_atomic && !is_user_structure {
2901 continue;
2902 }
2903
2904 let data_type = if is_structure {
2905 0x00A0
2906 } else if (raw_tag_type & 0x00FF) == 0x00C1 {
2907 0x00C1
2908 } else {
2909 type_param
2910 };
2911
2912 let template_instance_id = if is_structure && !reserved {
2913 Some(type_param as u32)
2914 } else {
2915 None
2916 };
2917
2918 tags.push(TagAttributes {
2919 name: tag_name,
2920 data_type,
2921 data_type_name: if is_structure {
2922 "UDT".to_string()
2923 } else {
2924 self.get_data_type_name(data_type)
2925 },
2926 dimensions: vec![0; array_dims],
2927 permissions: udt::TagPermissions::ReadWrite,
2928 scope: udt::TagScope::Controller,
2929 template_instance_id,
2930 size: 0,
2931 });
2932 }
2933
2934 Ok(TagListPage {
2935 tags,
2936 last_instance_id,
2937 partial_transfer,
2938 })
2939 }
2940
2941 fn parse_tag_list_response(&self, response: &[u8]) -> crate::error::Result<Vec<TagAttributes>> {
2943 Ok(self.parse_tag_list_response_page(response)?.tags)
2944 }
2945
2946 async fn negotiate_packet_size(&mut self) -> crate::error::Result<()> {
2950 let mut request = vec![
2953 0x03, 0x02, 0x20, 0x02, 0x24, 0x01, ];
2958 request.extend_from_slice(&[0x01, 0x00]); request.extend_from_slice(&[0x04, 0x00]);
2962
2963 let response = self.send_cip_request(&request).await?;
2965 let cip_data = self.extract_cip_from_response(&response)?;
2966
2967 if cip_data.len() >= 12 && cip_data[2] == 0x00 {
2972 let max_packet_size = u16::from_le_bytes([cip_data[10], cip_data[11]]) as u32;
2974
2975 self.max_packet_size
2977 .store(max_packet_size.clamp(504, 4000), Ordering::Relaxed);
2978 tracing::debug!("Negotiated packet size: {} bytes", self.max_packet_size());
2979 } else {
2980 self.max_packet_size.store(4000, Ordering::Relaxed);
2982 tracing::debug!(
2983 "Using default packet size: {} bytes",
2984 self.max_packet_size()
2985 );
2986 }
2987
2988 Ok(())
2989 }
2990
2991 pub async fn write_tag(&mut self, tag_name: &str, value: PlcValue) -> crate::error::Result<()> {
3035 tracing::debug!(
3036 "Writing '{}' to tag '{}'",
3037 match &value {
3038 PlcValue::String(s) => format!("\"{s}\""),
3039 _ => format!("{value:?}"),
3040 },
3041 tag_name
3042 );
3043
3044 let value = if let PlcValue::Udt(udt_data) = &value {
3047 if udt_data.symbol_id == 0 {
3048 tracing::debug!("[UDT WRITE] symbol_id is 0, reading tag to get symbol_id");
3049 let attributes = self.get_tag_attributes(tag_name).await?;
3051 let symbol_id = attributes.template_instance_id.ok_or_else(|| {
3052 crate::error::EtherNetIpError::Protocol(
3053 "UDT template instance ID not found. Cannot write UDT without symbol_id."
3054 .to_string(),
3055 )
3056 })? as i32;
3057
3058 PlcValue::Udt(UdtData {
3060 symbol_id,
3061 data: udt_data.data.clone(),
3062 })
3063 } else {
3064 value
3065 }
3066 } else {
3067 value
3068 };
3069
3070 if let Some((base_name, index)) = self.parse_array_element_access(tag_name) {
3072 tracing::debug!(
3073 "Detected array element write: {}[{}], using workaround",
3074 base_name,
3075 index
3076 );
3077 return self
3078 .write_array_element_workaround(&base_name, index, value)
3079 .await;
3080 }
3081
3082 if let PlcValue::Bool(_) = value
3083 && let Some((parent_path, index)) = self.parse_final_array_element_access(tag_name)
3084 && self.detect_bool_array_path(&parent_path).await?
3085 {
3086 return self
3087 .write_bool_array_element_workaround(&parent_path, index, value)
3088 .await;
3089 }
3090
3091 let cip_request = self.build_write_request(tag_name, &value)?;
3097
3098 let response = self.send_cip_request(&cip_request).await?;
3099
3100 let cip_response = self.extract_cip_from_response(&response)?;
3102
3103 if cip_response.len() < 3 {
3104 return Err(EtherNetIpError::Protocol(
3105 "Write response too short".to_string(),
3106 ));
3107 }
3108
3109 let service_reply = cip_response[0]; let general_status = cip_response[2]; tracing::trace!(
3113 "Write response - Service: 0x{:02X}, Status: 0x{:02X}",
3114 service_reply,
3115 general_status
3116 );
3117
3118 if let Err(e) = self.check_cip_error(&cip_response) {
3120 tracing::error!("[WRITE] CIP Error: {}", e);
3121 return Err(e);
3122 }
3123
3124 tracing::info!("Write operation completed successfully");
3125 Ok(())
3126 }
3127
3128 fn _build_ab_string_write_request(
3130 &self,
3131 tag_name: &str,
3132 value: &PlcValue,
3133 ) -> crate::error::Result<Vec<u8>> {
3134 if let PlcValue::String(string_value) = value {
3135 tracing::debug!(
3136 "Building correct Allen-Bradley string write request for tag: '{}'",
3137 tag_name
3138 );
3139
3140 let mut cip_request = Vec::new();
3141
3142 cip_request.push(0x4D);
3144
3145 let tag_bytes = tag_name.as_bytes();
3147 let path_len = if tag_bytes.len().is_multiple_of(2) {
3148 tag_bytes.len() + 2
3149 } else {
3150 tag_bytes.len() + 3
3151 } / 2;
3152 cip_request.push(path_len as u8);
3153
3154 cip_request.push(0x91); cip_request.push(tag_bytes.len() as u8);
3157 cip_request.extend_from_slice(tag_bytes);
3158
3159 if !tag_bytes.len().is_multiple_of(2) {
3161 cip_request.push(0x00);
3162 }
3163
3164 cip_request.extend_from_slice(&[0xA0, 0x02]);
3166
3167 cip_request.extend_from_slice(&[0x01, 0x00]);
3169
3170 let string_bytes = string_value.as_bytes();
3172 let max_len: u16 = 82; let current_len = string_bytes.len().min(max_len as usize) as u16;
3174
3175 cip_request.extend_from_slice(¤t_len.to_le_bytes());
3178
3179 cip_request.extend_from_slice(&max_len.to_le_bytes());
3181
3182 let mut data_array = vec![0u8; max_len as usize];
3184 data_array[..current_len as usize]
3185 .copy_from_slice(&string_bytes[..current_len as usize]);
3186 cip_request.extend_from_slice(&data_array);
3187
3188 tracing::trace!(
3189 "Built correct AB string write request ({} bytes): len={}, maxlen={}, data_len={}",
3190 cip_request.len(),
3191 current_len,
3192 max_len,
3193 string_bytes.len()
3194 );
3195 tracing::trace!(
3196 "First 32 bytes: {:02X?}",
3197 &cip_request[..std::cmp::min(32, cip_request.len())]
3198 );
3199
3200 Ok(cip_request)
3201 } else {
3202 Err(EtherNetIpError::Protocol(
3203 "Expected string value for Allen-Bradley string write".to_string(),
3204 ))
3205 }
3206 }
3207
3208 fn build_write_request(
3218 &self,
3219 tag_name: &str,
3220 value: &PlcValue,
3221 ) -> crate::error::Result<Vec<u8>> {
3222 tracing::debug!("Building write request for tag: '{}'", tag_name);
3223
3224 let path = self.build_tag_path(tag_name);
3226
3227 let mut data = BytesMut::new();
3228 data.extend_from_slice(&values::write_data_type(value).to_le_bytes());
3229 data.extend_from_slice(&[0x01, 0x00]); values::encode_payload(value, &mut data);
3231
3232 let request = CipRequest::new(WRITE_TAG, path, data.to_vec());
3233 let mut cip_request = BytesMut::new();
3234 request.encode(&mut cip_request)?;
3235
3236 tracing::trace!(
3237 "Built CIP write request ({} bytes): {:02X?}",
3238 cip_request.len(),
3239 cip_request
3240 );
3241 Ok(cip_request.to_vec())
3242 }
3243
3244 fn build_write_request_raw(
3246 &self,
3247 tag_name: &str,
3248 data: &[u8],
3249 ) -> crate::error::Result<Vec<u8>> {
3250 let path = self.build_tag_path(tag_name);
3251 let request = CipRequest::new(WRITE_TAG, path, data.to_vec());
3252 let mut cip_request = BytesMut::new();
3253 request.encode(&mut cip_request)?;
3254 Ok(cip_request.to_vec())
3255 }
3256
3257 #[allow(dead_code)]
3259 fn serialize_value(&self, value: &PlcValue) -> crate::error::Result<Vec<u8>> {
3260 let mut data = BytesMut::new();
3261 value.encode(&mut data);
3262 Ok(data.to_vec())
3263 }
3264
3265 pub fn build_list_tags_request(&self) -> Vec<u8> {
3266 tracing::debug!("Building list tags request");
3267
3268 let path_array = vec![
3270 0x20, 0x6B, 0x25, 0x00, 0x00, 0x00,
3276 ];
3277
3278 let request_data = vec![0x02, 0x00, 0x01, 0x00, 0x02, 0x00];
3280
3281 let request = CipRequest::new(0x55, path_array, request_data);
3283 let mut cip_request = BytesMut::new();
3284 request
3285 .encode(&mut cip_request)
3286 .expect("list-tags request path is static and valid");
3287
3288 tracing::trace!(
3289 "Built CIP list tags request ({} bytes): {:02X?}",
3290 cip_request.len(),
3291 cip_request
3292 );
3293
3294 cip_request.to_vec()
3295 }
3296
3297 fn parse_extended_error(&self, cip_data: &[u8]) -> crate::error::Result<String> {
3311 if cip_data.len() < 6 {
3312 return Err(EtherNetIpError::Protocol(
3313 "Extended error response too short".to_string(),
3314 ));
3315 }
3316
3317 let additional_status_size = cip_data[3] as usize; if additional_status_size == 0 || cip_data.len() < 4 + (additional_status_size * 2) {
3319 return Ok("Extended error (no additional status)".to_string());
3320 }
3321
3322 let extended_error_code_le = u16::from_le_bytes([cip_data[4], cip_data[5]]);
3325 let extended_error_code_be = u16::from_be_bytes([cip_data[4], cip_data[5]]);
3326
3327 let error_msg = match extended_error_code_le {
3330 0x0001 => "Connection failure (extended)".to_string(),
3331 0x0002 => "Resource unavailable (extended)".to_string(),
3332 0x0003 => "Invalid parameter value (extended)".to_string(),
3333 0x0004 => "Path segment error (extended)".to_string(),
3334 0x0005 => "Path destination unknown (extended)".to_string(),
3335 0x0006 => "Partial transfer (extended)".to_string(),
3336 0x0007 => "Connection lost (extended)".to_string(),
3337 0x0008 => "Service not supported (extended)".to_string(),
3338 0x0009 => "Invalid attribute value (extended)".to_string(),
3339 0x000A => "Attribute list error (extended)".to_string(),
3340 0x000B => "Already in requested mode/state (extended)".to_string(),
3341 0x000C => "Object state conflict (extended)".to_string(),
3342 0x000D => "Object already exists (extended)".to_string(),
3343 0x000E => "Attribute not settable (extended)".to_string(),
3344 0x000F => "Privilege violation (extended)".to_string(),
3345 0x0010 => "Device state conflict (extended)".to_string(),
3346 0x0011 => "Reply data too large (extended)".to_string(),
3347 0x0012 => "Fragmentation of a primitive value (extended)".to_string(),
3348 0x0013 => "Not enough data (extended)".to_string(),
3349 0x0014 => "Attribute not supported (extended)".to_string(),
3350 0x0015 => "Too much data (extended)".to_string(),
3351 0x0016 => "Object does not exist (extended)".to_string(),
3352 0x0017 => "Service fragmentation sequence not in progress (extended)".to_string(),
3353 0x0018 => "No stored attribute data (extended)".to_string(),
3354 0x0019 => "Store operation failure (extended)".to_string(),
3355 0x001A => "Routing failure, request packet too large (extended)".to_string(),
3356 0x001B => "Routing failure, response packet too large (extended)".to_string(),
3357 0x001C => "Missing attribute list entry data (extended)".to_string(),
3358 0x001D => "Invalid attribute value list (extended)".to_string(),
3359 0x001E => "Embedded service error (extended)".to_string(),
3360 0x001F => "Vendor specific error (extended)".to_string(),
3361 0x0020 => "Invalid parameter (extended)".to_string(),
3362 0x0021 => "Write-once value or medium already written (extended)".to_string(),
3363 0x0022 => "Invalid reply received (extended)".to_string(),
3364 0x0023 => "Buffer overflow (extended)".to_string(),
3365 0x0024 => "Invalid message format (extended)".to_string(),
3366 0x0025 => "Key failure in path (extended)".to_string(),
3367 0x0026 => "Path size invalid (extended)".to_string(),
3368 0x0027 => "Unexpected attribute in list (extended)".to_string(),
3369 0x0028 => "Invalid member ID (extended)".to_string(),
3370 0x0029 => "Member not settable (extended)".to_string(),
3371 0x002A => "Group 2 only server general failure (extended)".to_string(),
3372 0x002B => "Unknown Modbus error (extended)".to_string(),
3373 0x002C => "Attribute not gettable (extended)".to_string(),
3374 _ => {
3376 match extended_error_code_be {
3378 0x0001 => "Connection failure (extended, BE)".to_string(),
3379 0x0002 => "Resource unavailable (extended, BE)".to_string(),
3380 0x0003 => "Invalid parameter value (extended, BE)".to_string(),
3381 0x0004 => "Path segment error (extended, BE)".to_string(),
3382 0x0005 => "Path destination unknown (extended, BE)".to_string(),
3383 0x0006 => "Partial transfer (extended, BE)".to_string(),
3384 0x0007 => "Connection lost (extended, BE)".to_string(),
3385 0x0008 => "Service not supported (extended, BE)".to_string(),
3386 0x0009 => "Invalid attribute value (extended, BE)".to_string(),
3387 0x000A => "Attribute list error (extended, BE)".to_string(),
3388 0x000B => "Already in requested mode/state (extended, BE)".to_string(),
3389 0x000C => "Object state conflict (extended, BE)".to_string(),
3390 0x000D => "Object already exists (extended, BE)".to_string(),
3391 0x000E => "Attribute not settable (extended, BE)".to_string(),
3392 0x000F => "Privilege violation (extended, BE)".to_string(),
3393 0x0010 => "Device state conflict (extended, BE)".to_string(),
3394 0x0011 => "Reply data too large (extended, BE)".to_string(),
3395 0x0012 => "Fragmentation of a primitive value (extended, BE)".to_string(),
3396 0x0013 => "Not enough data (extended, BE)".to_string(),
3397 0x0014 => "Attribute not supported (extended, BE)".to_string(),
3398 0x0015 => "Too much data (extended, BE)".to_string(),
3399 0x0016 => "Object does not exist (extended, BE)".to_string(),
3400 0x0017 => {
3401 "Service fragmentation sequence not in progress (extended, BE)".to_string()
3402 }
3403 0x0018 => "No stored attribute data (extended, BE)".to_string(),
3404 0x0019 => "Store operation failure (extended, BE)".to_string(),
3405 0x001A => {
3406 "Routing failure, request packet too large (extended, BE)".to_string()
3407 }
3408 0x001B => {
3409 "Routing failure, response packet too large (extended, BE)".to_string()
3410 }
3411 0x001C => "Missing attribute list entry data (extended, BE)".to_string(),
3412 0x001D => "Invalid attribute value list (extended, BE)".to_string(),
3413 0x001E => "Embedded service error (extended, BE)".to_string(),
3414 0x001F => "Vendor specific error (extended, BE)".to_string(),
3415 0x0020 => "Invalid parameter (extended, BE)".to_string(),
3416 0x0021 => {
3417 "Write-once value or medium already written (extended, BE)".to_string()
3418 }
3419 0x0022 => "Invalid reply received (extended, BE)".to_string(),
3420 0x0023 => "Buffer overflow (extended, BE)".to_string(),
3421 0x0024 => "Invalid message format (extended, BE)".to_string(),
3422 0x0025 => "Key failure in path (extended, BE)".to_string(),
3423 0x0026 => "Path size invalid (extended, BE)".to_string(),
3424 0x0027 => "Unexpected attribute in list (extended, BE)".to_string(),
3425 0x0028 => "Invalid member ID (extended, BE)".to_string(),
3426 0x0029 => "Member not settable (extended, BE)".to_string(),
3427 0x002A => "Group 2 only server general failure (extended, BE)".to_string(),
3428 0x002B => "Unknown Modbus error (extended, BE)".to_string(),
3429 0x002C => "Attribute not gettable (extended, BE)".to_string(),
3430 _ if extended_error_code_le == 0x2107 || extended_error_code_be == 0x2107 => {
3432 format!(
3436 "Vendor-specific or composite extended error: 0x{extended_error_code_le:04X} (LE) / 0x{extended_error_code_be:04X} (BE). Raw bytes: [0x{:02X}, 0x{:02X}]. This may indicate the PLC does not support writing to UDT array element members directly.",
3437 cip_data[4], cip_data[5]
3438 )
3439 }
3440 _ => format!(
3441 "Unknown extended CIP error code: 0x{extended_error_code_le:04X} (LE) / 0x{extended_error_code_be:04X} (BE). Raw bytes: [0x{:02X}, 0x{:02X}]",
3442 cip_data[4], cip_data[5]
3443 ),
3444 }
3445 }
3446 };
3447
3448 Ok(error_msg)
3449 }
3450
3451 fn check_cip_error(&self, cip_data: &[u8]) -> crate::error::Result<()> {
3454 if cip_data.len() < 3 {
3455 return Err(EtherNetIpError::Protocol(
3456 "CIP response too short for status check".to_string(),
3457 ));
3458 }
3459
3460 let general_status = cip_data[2];
3461
3462 if general_status == 0x00 {
3463 return Ok(());
3465 }
3466
3467 if general_status == 0xFF {
3469 let error_msg = self.parse_extended_error(cip_data)?;
3470 return Err(EtherNetIpError::Protocol(format!(
3471 "CIP Extended Error: {error_msg}"
3472 )));
3473 }
3474
3475 let error_msg = self.get_cip_error_message(general_status);
3477 Err(EtherNetIpError::Protocol(format!(
3478 "CIP Error 0x{general_status:02X}: {error_msg}"
3479 )))
3480 }
3481
3482 fn get_cip_error_message(&self, status: u8) -> String {
3483 match status {
3484 0x00 => "Success".to_string(),
3485 0x01 => "Connection failure".to_string(),
3486 0x02 => "Resource unavailable".to_string(),
3487 0x03 => "Invalid parameter value".to_string(),
3488 0x04 => "Path segment error".to_string(),
3489 0x05 => "Path destination unknown".to_string(),
3490 0x06 => "Partial transfer".to_string(),
3491 0x07 => "Connection lost".to_string(),
3492 0x08 => "Service not supported".to_string(),
3493 0x09 => "Invalid attribute value".to_string(),
3494 0x0A => "Attribute list error".to_string(),
3495 0x0B => "Already in requested mode/state".to_string(),
3496 0x0C => "Object state conflict".to_string(),
3497 0x0D => "Object already exists".to_string(),
3498 0x0E => "Attribute not settable".to_string(),
3499 0x0F => "Privilege violation".to_string(),
3500 0x10 => "Device state conflict".to_string(),
3501 0x11 => "Reply data too large".to_string(),
3502 0x12 => "Fragmentation of a primitive value".to_string(),
3503 0x13 => "Not enough data".to_string(),
3504 0x14 => "Attribute not supported".to_string(),
3505 0x15 => "Too much data".to_string(),
3506 0x16 => "Object does not exist".to_string(),
3507 0x17 => "Service fragmentation sequence not in progress".to_string(),
3508 0x18 => "No stored attribute data".to_string(),
3509 0x19 => "Store operation failure".to_string(),
3510 0x1A => "Routing failure, request packet too large".to_string(),
3511 0x1B => "Routing failure, response packet too large".to_string(),
3512 0x1C => "Missing attribute list entry data".to_string(),
3513 0x1D => "Invalid attribute value list".to_string(),
3514 0x1E => "Embedded service error".to_string(),
3515 0x1F => "Vendor specific error".to_string(),
3516 0x20 => "Invalid parameter".to_string(),
3517 0x21 => "Write-once value or medium already written".to_string(),
3518 0x22 => "Invalid reply received".to_string(),
3519 0x23 => "Buffer overflow".to_string(),
3520 0x24 => "Invalid message format".to_string(),
3521 0x25 => "Key failure in path".to_string(),
3522 0x26 => "Path size invalid".to_string(),
3523 0x27 => "Unexpected attribute in list".to_string(),
3524 0x28 => "Invalid member ID".to_string(),
3525 0x29 => "Member not settable".to_string(),
3526 0x2A => "Group 2 only server general failure".to_string(),
3527 0x2B => "Unknown Modbus error".to_string(),
3528 0x2C => "Attribute not gettable".to_string(),
3529 _ => format!("Unknown CIP error code: 0x{status:02X}"),
3530 }
3531 }
3532
3533 fn describe_multiple_service_error(
3534 &self,
3535 general_status: u8,
3536 operations: &[BatchOperation],
3537 ) -> String {
3538 if general_status == 0x1E
3539 && operations.iter().any(|op| {
3540 matches!(
3541 op,
3542 BatchOperation::Write {
3543 value: PlcValue::String(_),
3544 ..
3545 }
3546 )
3547 })
3548 {
3549 return "Multiple Service Response error: 0x1E (Embedded service error). On CompactLogix/ControlLogix this commonly indicates the controller rejected a direct STRING write in the batch request; treat it as a PLC firmware limitation, not a protocol bug.".to_string();
3550 }
3551
3552 format!("Multiple Service Response error: 0x{general_status:02X}")
3553 }
3554
3555 async fn validate_session(&mut self) -> crate::error::Result<()> {
3556 let time_since_activity = self.last_activity.lock().await.elapsed();
3557
3558 if time_since_activity > Duration::from_secs(30) {
3560 self.send_keep_alive().await?;
3561 }
3562
3563 Ok(())
3564 }
3565
3566 async fn send_keep_alive(&mut self) -> crate::error::Result<()> {
3567 let packet = vec![0u8; 24];
3571 let mut stream = self.stream.lock().await;
3576 stream.write_all(&packet).await?;
3577 *self.last_activity.lock().await = Instant::now();
3578 Ok(())
3579 }
3580
3581 async fn read_tag_raw(&mut self, tag_name: &str) -> crate::error::Result<Vec<u8>> {
3583 let response = self
3584 .send_cip_request(&self.build_read_request(tag_name)?)
3585 .await?;
3586 self.extract_cip_from_response(&response)
3587 }
3588
3589 #[allow(dead_code)]
3591 async fn write_tag_raw(&mut self, tag_name: &str, data: &[u8]) -> crate::error::Result<()> {
3592 let request = self.build_write_request_raw(tag_name, data)?;
3593 let response = self.send_cip_request(&request).await?;
3594
3595 let cip_response = self.extract_cip_from_response(&response)?;
3597
3598 if cip_response.len() < 3 {
3599 return Err(EtherNetIpError::Protocol(
3600 "Write response too short".to_string(),
3601 ));
3602 }
3603
3604 let service_reply = cip_response[0]; let general_status = cip_response[2]; tracing::trace!(
3608 "Write response - Service: 0x{:02X}, Status: 0x{:02X}",
3609 service_reply,
3610 general_status
3611 );
3612
3613 if let Err(e) = self.check_cip_error(&cip_response) {
3615 tracing::error!("[WRITE] CIP Error: {}", e);
3616 return Err(e);
3617 }
3618
3619 tracing::info!("Write completed successfully");
3620 Ok(())
3621 }
3622
3623 fn build_unconnected_send(&self, embedded_message: &[u8]) -> Vec<u8> {
3640 let mut ucmm = vec![
3641 0x52, 0x02,
3644 0x20, 0x06, 0x24, 0x01, 0x0A, 0xF0,
3652 ];
3653
3654 let msg_len = embedded_message.len() as u16;
3656 ucmm.extend_from_slice(&msg_len.to_le_bytes());
3657
3658 ucmm.extend_from_slice(embedded_message);
3660
3661 if embedded_message.len() % 2 == 1 {
3663 ucmm.push(0x00);
3664 }
3665
3666 let route_path_bytes = if let Some(route_path) = self.route_path_snapshot() {
3669 route_path.to_cip_bytes()
3670 } else {
3671 Vec::new()
3672 };
3673
3674 let route_path_words = if route_path_bytes.is_empty() {
3675 0
3676 } else {
3677 (route_path_bytes.len() / 2) as u8
3678 };
3679 ucmm.push(route_path_words);
3680
3681 ucmm.push(0x00);
3683
3684 if !route_path_bytes.is_empty() {
3686 tracing::trace!(
3687 "Adding route path to Unconnected Send: {:02X?} ({} bytes, {} words)",
3688 route_path_bytes,
3689 route_path_bytes.len(),
3690 route_path_words
3691 );
3692 ucmm.extend_from_slice(&route_path_bytes);
3693 }
3694
3695 ucmm
3696 }
3697
3698 pub async fn send_cip_request(&self, cip_request: &[u8]) -> Result<Vec<u8>> {
3705 tracing::trace!(
3706 "Sending CIP request ({} bytes): {:02X?}",
3707 cip_request.len(),
3708 cip_request
3709 );
3710
3711 let ucmm_message = self.build_unconnected_send(cip_request);
3714
3715 tracing::trace!(
3716 "Unconnected Send message ({} bytes): {:02X?}",
3717 ucmm_message.len(),
3718 &ucmm_message[..std::cmp::min(64, ucmm_message.len())]
3719 );
3720
3721 let response_data = self.send_rr_data_item(&ucmm_message).await?;
3722
3723 if let Ok(raw_cip_data) = self.extract_unconnected_data_item(&response_data) {
3724 let use_direct_fallback = raw_cip_data.len() >= 3
3725 && raw_cip_data[0] == 0xD2
3726 && raw_cip_data[2] != 0x00
3727 && self.route_path_snapshot().is_none();
3728
3729 if use_direct_fallback {
3730 tracing::warn!(
3731 "Unconnected Send returned 0xD2 status 0x{:02X}; retrying with direct CIP SendRRData fallback",
3732 raw_cip_data[2]
3733 );
3734 return self.send_rr_data_item(cip_request).await;
3735 }
3736 }
3737
3738 Ok(response_data)
3739 }
3740
3741 async fn send_rr_data_item(&self, item_data: &[u8]) -> Result<Vec<u8>> {
3742 let send_data = SendDataRequest::unconnected(item_data);
3743 let mut packet = BytesMut::new();
3744 let mut cpf = BytesMut::new();
3745 send_data.encode(&mut cpf);
3746 EncapsulationHeader::send_rr_data(cpf.len() as u16, self.session_handle)
3747 .encode(&mut packet);
3748 packet.extend_from_slice(&cpf);
3749
3750 tracing::trace!(
3751 "Built packet ({} bytes): {:02X?}",
3752 packet.len(),
3753 &packet[..std::cmp::min(64, packet.len())]
3754 );
3755
3756 let mut stream = self.stream.lock().await;
3758 stream
3759 .write_all(&packet)
3760 .await
3761 .map_err(EtherNetIpError::Io)?;
3762
3763 let mut header = [0u8; 24];
3765 match timeout(Duration::from_secs(10), stream.read_exact(&mut header)).await {
3766 Ok(Ok(_)) => {}
3767 Ok(Err(e)) => return Err(EtherNetIpError::Io(e)),
3768 Err(_) => return Err(EtherNetIpError::Timeout(Duration::from_secs(10))),
3769 }
3770
3771 let mut header_bytes = &header[..];
3773 let response_header = EncapsulationHeader::decode(&mut header_bytes)?;
3774 if response_header.status != 0 {
3775 return Err(EtherNetIpError::Protocol(format!(
3776 "EIP Command failed. Status: 0x{:08X}",
3777 response_header.status
3778 )));
3779 }
3780
3781 let response_length = response_header.length as usize;
3783 if response_length == 0 {
3784 return Ok(Vec::new());
3785 }
3786
3787 let mut response_data = vec![0u8; response_length];
3789 match timeout(
3790 Duration::from_secs(10),
3791 stream.read_exact(&mut response_data),
3792 )
3793 .await
3794 {
3795 Ok(Ok(_)) => {}
3796 Ok(Err(e)) => return Err(EtherNetIpError::Io(e)),
3797 Err(_) => return Err(EtherNetIpError::Timeout(Duration::from_secs(10))),
3798 }
3799
3800 *self.last_activity.lock().await = Instant::now();
3802
3803 tracing::trace!(
3804 "Received response ({} bytes): {:02X?}",
3805 response_data.len(),
3806 &response_data[..std::cmp::min(32, response_data.len())]
3807 );
3808
3809 Ok(response_data)
3810 }
3811
3812 fn extract_unconnected_data_item(&self, response: &[u8]) -> crate::error::Result<Vec<u8>> {
3813 let mut response = response;
3814 let send_data = SendDataRequest::decode(&mut response)?;
3815 if let Some(item) = send_data
3816 .items
3817 .into_iter()
3818 .find(|item| item.type_id == 0x00B2)
3819 {
3820 return Ok(item.data);
3821 }
3822
3823 Err(EtherNetIpError::Protocol(
3824 "No Unconnected Data Item (0x00B2) found in response".to_string(),
3825 ))
3826 }
3827
3828 fn unwrap_unconnected_send_reply(&self, cip_data: &[u8]) -> crate::error::Result<Vec<u8>> {
3829 if cip_data.is_empty() || cip_data[0] != 0xD2 {
3830 return Ok(cip_data.to_vec());
3831 }
3832
3833 if cip_data.len() < 4 {
3834 return Err(EtherNetIpError::Protocol(
3835 "Unconnected Send reply too short".to_string(),
3836 ));
3837 }
3838
3839 let general_status = cip_data[2];
3840 let additional_status_words = cip_data[3] as usize;
3841 let embedded_offset = 4 + (additional_status_words * 2);
3842
3843 if general_status != 0x00 {
3844 let error_msg = self.get_cip_error_message(general_status);
3845 return Err(EtherNetIpError::Protocol(format!(
3846 "Unconnected Send failed (0xD2): CIP Error 0x{general_status:02X}: {error_msg}"
3847 )));
3848 }
3849
3850 if embedded_offset >= cip_data.len() {
3851 return Err(EtherNetIpError::Protocol(
3852 "Unconnected Send succeeded but no embedded response payload was returned"
3853 .to_string(),
3854 ));
3855 }
3856
3857 Ok(cip_data[embedded_offset..].to_vec())
3858 }
3859
3860 fn extract_cip_from_response(&self, response: &[u8]) -> crate::error::Result<Vec<u8>> {
3862 tracing::trace!(
3863 "Extracting CIP from response ({} bytes): {:02X?}",
3864 response.len(),
3865 &response[..std::cmp::min(32, response.len())]
3866 );
3867 let cip_data = self.extract_unconnected_data_item(response)?;
3868 tracing::trace!(
3869 "Found Unconnected Data Item, extracted CIP data ({} bytes)",
3870 cip_data.len()
3871 );
3872 tracing::trace!(
3873 "CIP data bytes: {:02X?}",
3874 &cip_data[..std::cmp::min(16, cip_data.len())]
3875 );
3876 self.unwrap_unconnected_send_reply(&cip_data)
3877 }
3878
3879 fn parse_cip_response(&self, cip_response: &[u8]) -> crate::error::Result<PlcValue> {
3881 tracing::trace!(
3882 "Parsing CIP response ({} bytes): {:02X?}",
3883 cip_response.len(),
3884 cip_response
3885 );
3886
3887 if let Err(e) = self.check_cip_error(cip_response) {
3888 tracing::error!("CIP Error: {}", e);
3889 return Err(e);
3890 }
3891
3892 let mut response_bytes = cip_response;
3893 let response = CipResponse::decode(&mut response_bytes)?;
3894
3895 if response.service == 0xCC {
3896 if response.data.len() < 2 {
3897 return Err(EtherNetIpError::Protocol(
3898 "Read response too short for data".to_string(),
3899 ));
3900 }
3901
3902 let data_type = u16::from_le_bytes([response.data[0], response.data[1]]);
3903 let value_data = &response.data[2..];
3904 tracing::trace!(
3905 "Data type: 0x{:04X}, Value data ({} bytes): {:02X?}",
3906 data_type,
3907 value_data.len(),
3908 value_data
3909 );
3910 Ok(values::decode_payload(data_type, value_data)?)
3911 } else if response.service == 0xCD {
3912 tracing::debug!("Write operation successful");
3913 Ok(PlcValue::Bool(true))
3914 } else {
3915 Err(EtherNetIpError::Protocol(format!(
3916 "Unknown service reply: 0x{:02X}",
3917 response.service
3918 )))
3919 }
3920 }
3921
3922 pub async fn unregister_session(&mut self) -> crate::error::Result<()> {
3924 tracing::info!("Unregistering session and cleaning up connections...");
3925
3926 let _ = self.close_all_connected_sessions().await;
3928
3929 let mut packet = BytesMut::with_capacity(24);
3930 EncapsulationHeader::new(UNREGISTER_SESSION, 0, self.session_handle).encode(&mut packet);
3931
3932 self.stream
3933 .lock()
3934 .await
3935 .write_all(&packet)
3936 .await
3937 .map_err(EtherNetIpError::Io)?;
3938
3939 tracing::info!("Session unregistered and all connections closed");
3940 Ok(())
3941 }
3942
3943 fn build_read_request(&self, tag_name: &str) -> crate::error::Result<Vec<u8>> {
3945 self.build_read_request_with_count(tag_name, 1)
3946 }
3947
3948 fn build_read_request_with_count(
3952 &self,
3953 tag_name: &str,
3954 element_count: u16,
3955 ) -> crate::error::Result<Vec<u8>> {
3956 tracing::debug!(
3957 "Building read request for tag: '{}' with count: {}",
3958 tag_name,
3959 element_count
3960 );
3961
3962 let path = self.build_tag_path(tag_name);
3964
3965 let path_size_words = (path.len() / 2) as u8;
3967 tracing::debug!(
3968 "Path size calculation: {} bytes / 2 = {} words for tag '{}'",
3969 path.len(),
3970 path_size_words,
3971 tag_name
3972 );
3973 tracing::debug!(
3974 "Path bytes ({} bytes, {} words) for tag '{}': {:02X?}",
3975 path.len(),
3976 path_size_words,
3977 tag_name,
3978 path
3979 );
3980 let request = CipRequest::new(READ_TAG, path, element_count.to_le_bytes().to_vec());
3981 let mut cip_request = BytesMut::new();
3982 request.encode(&mut cip_request)?;
3983
3984 tracing::debug!(
3985 "Built CIP read request ({} bytes) for tag '{}': {:02X?}",
3986 cip_request.len(),
3987 tag_name,
3988 cip_request
3989 );
3990 Ok(cip_request.to_vec())
3991 }
3992
3993 #[cfg_attr(not(test), allow(dead_code))]
4002 pub fn build_element_id_segment(&self, index: u32) -> Vec<u8> {
4003 let mut segment = Vec::new();
4004
4005 if index <= 255 {
4006 segment.push(0x28);
4009 segment.push(index as u8);
4010 } else if index <= 65535 {
4011 segment.push(0x29);
4014 segment.push(0x00); segment.extend_from_slice(&(index as u16).to_le_bytes());
4016 } else {
4017 segment.push(0x2A);
4020 segment.push(0x00); segment.extend_from_slice(&index.to_le_bytes());
4022 }
4023
4024 segment
4025 }
4026
4027 #[cfg_attr(not(test), allow(dead_code))]
4032 pub fn build_base_tag_path(&self, tag_name: &str) -> Vec<u8> {
4033 match TagPath::parse(tag_name) {
4035 Ok(path) => {
4036 let base_path = match &path {
4038 TagPath::Array { base_path, .. } => base_path.as_ref(),
4039 _ => &path,
4040 };
4041 base_path.to_cip_path().unwrap_or_else(|_| {
4042 let mut path = Vec::new();
4045 path.push(0x91); let name_bytes = tag_name.as_bytes();
4047 path.push(name_bytes.len() as u8);
4048 path.extend_from_slice(name_bytes);
4049 if path.len() % 2 != 0 {
4051 path.push(0x00);
4052 }
4053 path
4054 })
4055 }
4056 Err(_) => {
4057 let mut path = Vec::new();
4059 path.push(0x91); let name_bytes = tag_name.as_bytes();
4061 path.push(name_bytes.len() as u8);
4062 path.extend_from_slice(name_bytes);
4063 if path.len() % 2 != 0 {
4065 path.push(0x00);
4066 }
4067 path
4068 }
4069 }
4070 }
4071
4072 #[cfg_attr(not(test), allow(dead_code))]
4100 pub fn build_read_array_request(
4101 &self,
4102 base_array_name: &str,
4103 start_index: u32,
4104 element_count: u16,
4105 ) -> Vec<u8> {
4106 let mut cip_request = Vec::new();
4107
4108 cip_request.push(0x4C);
4111
4112 let mut full_path = self.build_base_tag_path(base_array_name);
4117
4118 tracing::trace!(
4119 "build_read_array_request: base_path for '{}' = {:02X?} ({} bytes)",
4120 base_array_name,
4121 full_path,
4122 full_path.len()
4123 );
4124
4125 let element_segment = self.build_element_id_segment(start_index);
4128 tracing::trace!(
4129 "build_read_array_request: element_segment for index {} = {:02X?} ({} bytes)",
4130 start_index,
4131 element_segment,
4132 element_segment.len()
4133 );
4134 full_path.extend_from_slice(&element_segment);
4135
4136 if !full_path.len().is_multiple_of(2) {
4138 full_path.push(0x00);
4139 }
4140
4141 let path_size = (full_path.len() / 2) as u8;
4143 cip_request.push(path_size);
4144 cip_request.extend_from_slice(&full_path);
4145
4146 cip_request.extend_from_slice(&element_count.to_le_bytes());
4149
4150 tracing::trace!(
4151 "build_read_array_request: final request = {:02X?} ({} bytes), path_size = {} words ({} bytes)",
4152 cip_request,
4153 cip_request.len(),
4154 path_size,
4155 full_path.len()
4156 );
4157
4158 cip_request
4159 }
4160
4161 fn build_tag_path(&self, tag_name: &str) -> Vec<u8> {
4167 match TagPath::parse(tag_name) {
4171 Ok(tag_path) => {
4172 tracing::debug!("Parsed tag path for '{}': {:?}", tag_name, tag_path);
4173 match tag_path.to_cip_path() {
4175 Ok(path) => {
4176 tracing::debug!(
4177 "TagPath generated {} bytes ({} words) for '{}': {:02X?}",
4178 path.len(),
4179 path.len() / 2,
4180 tag_name,
4181 path
4182 );
4183 path
4184 }
4185 Err(e) => {
4186 tracing::warn!("TagPath.to_cip_path() failed for '{}': {}", tag_name, e);
4187 self.build_simple_tag_path_legacy(tag_name)
4189 }
4190 }
4191 }
4192 Err(e) => {
4193 tracing::warn!("TagPath::parse() failed for '{}': {}", tag_name, e);
4194 self.build_simple_tag_path_legacy(tag_name)
4196 }
4197 }
4198 }
4199
4200 fn build_simple_tag_path_legacy(&self, tag_name: &str) -> Vec<u8> {
4202 let mut path = Vec::new();
4203 path.push(0x91); path.push(tag_name.len() as u8);
4205 path.extend_from_slice(tag_name.as_bytes());
4206
4207 if !tag_name.len().is_multiple_of(2) {
4209 path.push(0x00);
4210 }
4211
4212 path
4213 }
4214
4215 async fn _get_connected_session(
4216 &mut self,
4217 session_name: &str,
4218 ) -> crate::error::Result<ConnectedSession> {
4219 {
4221 let sessions = self.connected_sessions.lock().await;
4222 if let Some(session) = sessions.get(session_name) {
4223 return Ok(session.clone());
4224 }
4225 }
4226
4227 let session = self.establish_connected_session(session_name).await?;
4229
4230 let mut sessions = self.connected_sessions.lock().await;
4232 sessions.insert(session_name.to_string(), session.clone());
4233
4234 Ok(session)
4235 }
4236
4237 #[allow(dead_code)]
4239 fn parse_udt_structure(&self, data: &[u8]) -> crate::error::Result<PlcValue> {
4240 tracing::debug!("Parsing UDT structure with {} bytes", data.len());
4241
4242 if data.len() >= 12 {
4244 let _offset = 0;
4245
4246 for alignment in 0..4 {
4248 if alignment + 12 <= data.len() {
4249 let aligned_data = &data[alignment..];
4250
4251 if aligned_data.len() >= 4 {
4253 let dint1_bytes = [
4254 aligned_data[0],
4255 aligned_data[1],
4256 aligned_data[2],
4257 aligned_data[3],
4258 ];
4259 let dint1_value = i32::from_le_bytes(dint1_bytes);
4260
4261 if aligned_data.len() >= 8 {
4263 let dint2_bytes = [
4264 aligned_data[4],
4265 aligned_data[5],
4266 aligned_data[6],
4267 aligned_data[7],
4268 ];
4269 let dint2_value = i32::from_le_bytes(dint2_bytes);
4270
4271 if aligned_data.len() >= 12 {
4273 let real_bytes = [
4274 aligned_data[8],
4275 aligned_data[9],
4276 aligned_data[10],
4277 aligned_data[11],
4278 ];
4279 let real_value = f32::from_le_bytes(real_bytes);
4280
4281 tracing::trace!(
4282 "Alignment {}: DINT1={}, DINT2={}, REAL={}",
4283 alignment,
4284 dint1_value,
4285 dint2_value,
4286 real_value
4287 );
4288
4289 if self.is_reasonable_udt_values(
4291 dint1_value,
4292 dint2_value,
4293 real_value,
4294 ) {
4295 tracing::debug!(
4298 "Found reasonable UDT values at alignment {}",
4299 alignment
4300 );
4301 return Ok(PlcValue::Udt(UdtData {
4302 symbol_id: 0, data: data.to_vec(),
4304 }));
4305 }
4306 }
4307 }
4308 }
4309 }
4310 }
4311 }
4312
4313 if data.len() >= 4 {
4315 let interpretations = vec![
4317 ("DINT_at_start", 0, 4),
4318 ("DINT_at_end", data.len().saturating_sub(4), data.len()),
4319 ("DINT_middle", data.len() / 2, data.len() / 2 + 4),
4320 ];
4321
4322 for (name, start, end) in interpretations {
4323 if end <= data.len() && end > start {
4324 let bytes = &data[start..end];
4325 if bytes.len() == 4 {
4326 let dint_value =
4327 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
4328 tracing::trace!("{}: DINT = {}", name, dint_value);
4329
4330 if self.is_reasonable_value(dint_value) {
4331 return Ok(PlcValue::Udt(UdtData {
4333 symbol_id: 0, data: data.to_vec(),
4335 }));
4336 }
4337 }
4338 }
4339 }
4340 }
4341
4342 Err(crate::error::EtherNetIpError::Protocol(
4343 "Could not parse UDT structure".to_string(),
4344 ))
4345 }
4346
4347 #[allow(dead_code)]
4350 fn parse_udt_simple(&self, data: &[u8]) -> crate::error::Result<PlcValue> {
4351 Ok(PlcValue::Udt(UdtData {
4353 symbol_id: 0, data: data.to_vec(),
4355 }))
4356 }
4357
4358 #[allow(dead_code)]
4360 fn is_reasonable_udt_values(&self, dint1: i32, dint2: i32, real: f32) -> bool {
4361 let dint1_reasonable = (-1000..=1000).contains(&dint1);
4363 let dint2_reasonable = (-1000..=1000).contains(&dint2);
4364 let real_reasonable = (-1000.0..=1000.0).contains(&real) && real.is_finite();
4365
4366 tracing::trace!(
4367 "Reasonableness check: DINT1={} ({}), DINT2={} ({}), REAL={} ({})",
4368 dint1,
4369 dint1_reasonable,
4370 dint2,
4371 dint2_reasonable,
4372 real,
4373 real_reasonable
4374 );
4375
4376 dint1_reasonable && dint2_reasonable && real_reasonable
4377 }
4378
4379 #[allow(dead_code)]
4381 fn is_reasonable_value(&self, value: i32) -> bool {
4382 (-1000..=1000).contains(&value)
4383 }
4384}
4385
4386#[cfg(test)]
4387mod discovery_tests {
4388 use super::{EipClient, TemplateAttributes};
4389
4390 #[test]
4391 fn build_tag_list_request_rejects_instance_above_u16() {
4392 let client = EipClient::new_unconnected_for_testing();
4393 let request = client
4394 .build_tag_list_request_from_instance(0x12345678)
4395 .expect_err("instance should be rejected");
4396
4397 assert!(format!("{request}").contains("exceeds 16-bit"));
4398 }
4399
4400 #[test]
4401 fn build_tag_list_request_encodes_path_size_and_start_instance() {
4402 let client = EipClient::new_unconnected_for_testing();
4403 let request = client
4404 .build_tag_list_request_from_instance(0x5678)
4405 .expect("request should build");
4406
4407 assert_eq!(request[0], 0x55);
4408 assert_eq!(request[1], 0x03);
4409 assert_eq!(&request[2..8], &[0x20, 0x6B, 0x25, 0x00, 0x78, 0x56]);
4410 }
4411
4412 #[test]
4413 fn parse_tag_list_response_page_handles_partial_transfer() {
4414 let client = EipClient::new_unconnected_for_testing();
4415 let response = [
4416 0xD5, 0x00, 0x06,
4417 0x00, 0x34, 0x12, 0x00, 0x00, 0x04, 0x00, b'R', b'a', b't', b'e', 0xC4, 0x00, ];
4423
4424 let page = client
4425 .parse_tag_list_response_page(&response)
4426 .expect("response should parse");
4427
4428 assert!(page.partial_transfer);
4429 assert_eq!(page.last_instance_id, Some(0x1234));
4430 assert_eq!(page.tags.len(), 1);
4431 assert_eq!(page.tags[0].name, "Rate");
4432 assert_eq!(page.tags[0].data_type, 0x00C4);
4433 assert_eq!(page.tags[0].data_type_name, "DINT");
4434 }
4435
4436 #[test]
4437 fn build_get_template_attributes_request_encodes_template_object_path() {
4438 let client = EipClient::new_unconnected_for_testing();
4439 let request = client
4440 .build_get_template_attributes_request(0x0456)
4441 .expect("request should build");
4442
4443 assert_eq!(request[0], 0x03);
4444 assert_eq!(request[1], 0x03);
4445 assert_eq!(&request[2..8], &[0x20, 0x6C, 0x25, 0x00, 0x56, 0x04]);
4446 assert_eq!(
4447 &request[8..],
4448 &[0x04, 0x00, 0x01, 0x00, 0x02, 0x00, 0x04, 0x00, 0x05, 0x00]
4449 );
4450 }
4451
4452 #[test]
4453 fn build_read_template_request_encodes_template_read_size() {
4454 let client = EipClient::new_unconnected_for_testing();
4455 let request = client
4456 .build_read_template_request(0x0456, 0x0010, 0x0032)
4457 .expect("request should build");
4458
4459 assert_eq!(request[0], 0x4C);
4460 assert_eq!(request[1], 0x03);
4461 assert_eq!(&request[2..8], &[0x20, 0x6C, 0x25, 0x00, 0x56, 0x04]);
4462 assert_eq!(&request[8..12], &[0x10, 0x00, 0x00, 0x00]);
4463 assert_eq!(&request[12..14], &[0x32, 0x00]);
4464 }
4465
4466 #[test]
4467 fn parse_template_attributes_response_reads_mixed_width_values() {
4468 let client = EipClient::new_unconnected_for_testing();
4469 let response = [
4470 0x83, 0x00, 0x00, 0x00, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00, 0x34, 0x12, 0x02, 0x00, 0x00, 0x00, 0x07, 0x00, 0x04, 0x00, 0x00, 0x00, 0x19, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x58, 0x00, 0x00, 0x00, ];
4477
4478 let attributes = client
4479 .parse_template_attributes_response(0x0456, &response)
4480 .expect("response should parse");
4481
4482 assert_eq!(
4483 attributes,
4484 TemplateAttributes {
4485 structure_handle: 0x1234,
4486 member_count: 7,
4487 definition_size_words: 25,
4488 structure_size_bytes: 88,
4489 }
4490 );
4491 }
4492}
4493
4494