Skip to main content

rust_ethernet_ip/client/
string.rs

1use super::EipClient;
2use crate::error::EtherNetIpError;
3use crate::protocol::encap::{EncapsulationHeader, SEND_RR_DATA};
4use crate::protocol::{Decode, Encode};
5use crate::types::{ConnectedSession, ConnectionParameters, PlcValue};
6use bytes::BytesMut;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::time::Instant;
9
10impl EipClient {
11    /// Writes a string value using Allen-Bradley UDT component access
12    /// This writes to TestString.LEN and TestString.DATA separately
13    pub async fn write_ab_string_components(
14        &mut self,
15        tag_name: &str,
16        value: &str,
17    ) -> crate::error::Result<()> {
18        tracing::debug!(
19            "[AB STRING] Writing string '{}' to tag '{}' using component access",
20            value,
21            tag_name
22        );
23
24        let string_bytes = value.as_bytes();
25        let string_len = string_bytes.len() as i32;
26
27        // Step 1: Write the length to TestString.LEN
28        let len_tag = format!("{tag_name}.LEN");
29        tracing::debug!("Step 1: Writing length {} to {}", string_len, len_tag);
30
31        match self.write_tag(&len_tag, PlcValue::Dint(string_len)).await {
32            Ok(_) => tracing::debug!("Length written successfully"),
33            Err(e) => {
34                tracing::error!("Length write failed: {}", e);
35                return Err(e);
36            }
37        }
38
39        // Step 2: Write the string data to TestString.DATA using array access
40        tracing::debug!("Step 2: Writing string data to {}.DATA", tag_name);
41
42        // We need to write each character individually to the DATA array
43        for (i, &byte) in string_bytes.iter().enumerate() {
44            let data_element = format!("{tag_name}.DATA[{i}]");
45            match self
46                .write_tag(&data_element, PlcValue::Sint(byte as i8))
47                .await
48            {
49                Ok(_) => tracing::trace!("wrote STRING byte at position {}", i),
50                Err(e) => {
51                    tracing::error!("Failed to write byte {} to position {}: {}", byte, i, e);
52                    return Err(e);
53                }
54            }
55        }
56
57        // Step 3: Clear remaining bytes (null terminate)
58        if string_bytes.len() < 82 {
59            let null_element = format!("{}.DATA[{}]", tag_name, string_bytes.len());
60            match self.write_tag(&null_element, PlcValue::Sint(0)).await {
61                Ok(_) => tracing::debug!("String null-terminated successfully"),
62                Err(e) => tracing::warn!("Could not null-terminate: {}", e),
63            }
64        }
65
66        tracing::info!("AB STRING component write completed!");
67        Ok(())
68    }
69
70    /// Writes a string using a single UDT write with proper AB STRING format
71    pub async fn write_ab_string_udt(
72        &mut self,
73        tag_name: &str,
74        value: &str,
75    ) -> crate::error::Result<()> {
76        tracing::debug!(
77            "[AB STRING UDT] Writing string '{}' to tag '{}' as UDT",
78            value,
79            tag_name
80        );
81
82        let string_bytes = value.as_bytes();
83        if string_bytes.len() > 82 {
84            return Err(EtherNetIpError::Protocol(
85                "String too long for Allen-Bradley STRING (max 82 chars)".to_string(),
86            ));
87        }
88
89        // Build a CIP request that writes the complete AB STRING structure
90        let mut cip_request = Vec::new();
91
92        // Service: Write Tag Service (0x4D)
93        cip_request.push(0x4D);
94
95        // Request Path
96        let tag_path = self.build_tag_path(tag_name);
97        cip_request.push((tag_path.len() / 2) as u8); // Path size in words
98        cip_request.extend_from_slice(&tag_path);
99
100        // Data Type: Allen-Bradley STRING (0x02A0) - but write as UDT components
101        cip_request.extend_from_slice(&[0xA0, 0x00]); // UDT type
102        cip_request.extend_from_slice(&[0x01, 0x00]); // Element count
103
104        // AB STRING UDT structure:
105        // - DINT .LEN (4 bytes)
106        // - SINT .DATA[82] (82 bytes)
107
108        // Write .LEN field (current string length)
109        let len = string_bytes.len() as u32;
110        cip_request.extend_from_slice(&len.to_le_bytes());
111
112        // Write .DATA field (82 bytes total)
113        cip_request.extend_from_slice(string_bytes); // Actual string data
114
115        // Pad with zeros to reach 82 bytes
116        let padding_needed = 82 - string_bytes.len();
117        cip_request.extend_from_slice(&vec![0u8; padding_needed]);
118
119        tracing::trace!("Built UDT write request: {} bytes total", cip_request.len());
120
121        let response = self.send_cip_request(&cip_request).await?;
122
123        if response.len() >= 3 {
124            let general_status = response[2];
125            if general_status == 0x00 {
126                tracing::info!("AB STRING UDT write successful!");
127                Ok(())
128            } else {
129                let error_msg = self.get_cip_error_message(general_status);
130                Err(EtherNetIpError::Protocol(format!(
131                    "AB STRING UDT write failed - CIP Error 0x{general_status:02X}: {error_msg}"
132                )))
133            }
134        } else {
135            Err(EtherNetIpError::Protocol(
136                "Invalid AB STRING UDT write response".to_string(),
137            ))
138        }
139    }
140
141    /// Establishes a Class 3 connected session for STRING operations
142    ///
143    /// Connected sessions are required for certain operations like STRING writes
144    /// in Allen-Bradley PLCs. This implements the Forward Open CIP service.
145    /// Will try multiple connection parameter configurations until one succeeds.
146    pub(super) async fn establish_connected_session(
147        &mut self,
148        session_name: &str,
149    ) -> crate::error::Result<ConnectedSession> {
150        tracing::debug!(
151            "[CONNECTED] Establishing connected session: '{}'",
152            session_name
153        );
154        tracing::debug!("[CONNECTED] Will try multiple parameter configurations...");
155
156        // Generate unique connection parameters
157        *self.connection_sequence.lock().await += 1;
158        let connection_serial = (*self.connection_sequence.lock().await & 0xFFFF) as u16;
159
160        // Try different configurations until one works
161        for config_id in 0..=5 {
162            tracing::debug!(
163                "[ATTEMPT {}] Trying configuration {}:",
164                config_id + 1,
165                config_id
166            );
167
168            let mut session = if config_id == 0 {
169                ConnectedSession::new(connection_serial)
170            } else {
171                ConnectedSession::with_config(connection_serial, config_id)
172            };
173
174            // Generate unique connection IDs for this attempt
175            session.o_to_t_connection_id =
176                0x2000_0000 + *self.connection_sequence.lock().await + (config_id as u32 * 0x1000);
177            session.t_to_o_connection_id =
178                0x3000_0000 + *self.connection_sequence.lock().await + (config_id as u32 * 0x1000);
179
180            // Build Forward Open request with this configuration
181            let forward_open_request = self.build_forward_open_request(&session)?;
182
183            tracing::debug!(
184                "[ATTEMPT {}] Sending Forward Open request ({} bytes)",
185                config_id + 1,
186                forward_open_request.len()
187            );
188
189            // Send Forward Open request
190            match self.send_cip_request(&forward_open_request).await {
191                Ok(response) => {
192                    // Try to parse the response - DON'T clone, modify the session directly!
193                    match self.parse_forward_open_response(&mut session, &response) {
194                        Ok(()) => {
195                            // Success! Store the session and return
196                            tracing::info!("[SUCCESS] Configuration {} worked!", config_id);
197                            tracing::debug!("Connection ID: 0x{:08X}", session.connection_id);
198                            tracing::debug!("O->T ID: 0x{:08X}", session.o_to_t_connection_id);
199                            tracing::debug!("T->O ID: 0x{:08X}", session.t_to_o_connection_id);
200                            tracing::debug!(
201                                "Using Connection ID: 0x{:08X} for messaging",
202                                session.connection_id
203                            );
204
205                            session.is_active = true;
206                            let mut sessions = self.connected_sessions.lock().await;
207                            sessions.insert(session_name.to_string(), session.clone());
208                            return Ok(session);
209                        }
210                        Err(e) => {
211                            tracing::warn!(
212                                "[ATTEMPT {}] Configuration {} failed: {}",
213                                config_id + 1,
214                                config_id,
215                                e
216                            );
217
218                            // If it's a specific status error, log it
219                            if e.to_string().contains("status: 0x") {
220                                tracing::debug!(
221                                    "Status indicates: parameter incompatibility or resource conflict"
222                                );
223                            }
224                        }
225                    }
226                }
227                Err(e) => {
228                    tracing::warn!(
229                        "[ATTEMPT {}] Network error with config {}: {}",
230                        config_id + 1,
231                        config_id,
232                        e
233                    );
234                }
235            }
236
237            // Small delay between attempts
238            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
239        }
240
241        // If we get here, all configurations failed
242        Err(EtherNetIpError::Protocol(
243            "All connection parameter configurations failed. PLC may not support connected messaging or has reached connection limits.".to_string()
244        ))
245    }
246
247    /// Builds a Forward Open CIP request for establishing connected sessions
248    fn build_forward_open_request(
249        &self,
250        session: &ConnectedSession,
251    ) -> crate::error::Result<Vec<u8>> {
252        let mut request = Vec::with_capacity(50);
253
254        // CIP Forward Open Service (0x54)
255        request.push(0x54);
256
257        // Request path length (Connection Manager object)
258        request.push(0x02); // 2 words
259
260        // Class ID: Connection Manager (0x06)
261        request.push(0x20); // Logical Class segment
262        request.push(0x06);
263
264        // Instance ID: Connection Manager instance (0x01)
265        request.push(0x24); // Logical Instance segment
266        request.push(0x01);
267
268        // Forward Open parameters
269
270        // Connection Timeout Ticks (1 byte) + Timeout multiplier (1 byte)
271        request.push(0x0A); // Timeout ticks (10)
272        request.push(session.timeout_multiplier);
273
274        // Originator -> Target Connection ID (4 bytes, little-endian)
275        request.extend_from_slice(&session.o_to_t_connection_id.to_le_bytes());
276
277        // Target -> Originator Connection ID (4 bytes, little-endian)
278        request.extend_from_slice(&session.t_to_o_connection_id.to_le_bytes());
279
280        // Connection Serial Number (2 bytes, little-endian)
281        request.extend_from_slice(&session.connection_serial.to_le_bytes());
282
283        // Originator Vendor ID (2 bytes, little-endian)
284        request.extend_from_slice(&session.originator_vendor_id.to_le_bytes());
285
286        // Originator Serial Number (4 bytes, little-endian)
287        request.extend_from_slice(&session.originator_serial.to_le_bytes());
288
289        // Connection Timeout Multiplier (1 byte) - repeated for target
290        request.push(session.timeout_multiplier);
291
292        // Reserved bytes (3 bytes)
293        request.extend_from_slice(&[0x00, 0x00, 0x00]);
294
295        // Originator -> Target RPI (4 bytes, little-endian, microseconds)
296        request.extend_from_slice(&session.rpi.to_le_bytes());
297
298        // Originator -> Target connection parameters (4 bytes)
299        let o_to_t_params = self.encode_connection_parameters(&session.o_to_t_params);
300        request.extend_from_slice(&o_to_t_params.to_le_bytes());
301
302        // Target -> Originator RPI (4 bytes, little-endian, microseconds)
303        request.extend_from_slice(&session.rpi.to_le_bytes());
304
305        // Target -> Originator connection parameters (4 bytes)
306        let t_to_o_params = self.encode_connection_parameters(&session.t_to_o_params);
307        request.extend_from_slice(&t_to_o_params.to_le_bytes());
308
309        // Transport type/trigger (1 byte) - Class 3, Application triggered
310        request.push(0xA3);
311
312        // Connection Path Size (1 byte)
313        request.push(0x02); // 2 words for Message Router path
314
315        // Connection Path - Target the Message Router
316        request.push(0x20); // Logical Class segment
317        request.push(0x02); // Message Router class (0x02)
318        request.push(0x24); // Logical Instance segment
319        request.push(0x01); // Message Router instance (0x01)
320
321        Ok(request)
322    }
323
324    /// Encodes connection parameters into a 32-bit value
325    fn encode_connection_parameters(&self, params: &ConnectionParameters) -> u32 {
326        let mut encoded = 0u32;
327
328        // Connection size (bits 0-15)
329        encoded |= params.size as u32;
330
331        // Variable flag (bit 25)
332        if params.variable_size {
333            encoded |= 1 << 25;
334        }
335
336        // Connection type (bits 29-30)
337        encoded |= (params.connection_type as u32) << 29;
338
339        // Priority (bits 26-27)
340        encoded |= (params.priority as u32) << 26;
341
342        encoded
343    }
344
345    /// Parses Forward Open response and updates session with connection info
346    fn parse_forward_open_response(
347        &self,
348        session: &mut ConnectedSession,
349        response: &[u8],
350    ) -> crate::error::Result<()> {
351        if response.len() < 2 {
352            return Err(EtherNetIpError::Protocol(
353                "Forward Open response too short".to_string(),
354            ));
355        }
356
357        let service = response[0];
358        let status = response[1];
359
360        // Check if this is a Forward Open Reply (0xD4)
361        if service != 0xD4 {
362            return Err(EtherNetIpError::Protocol(format!(
363                "Unexpected service in Forward Open response: 0x{service:02X}"
364            )));
365        }
366
367        // Check status
368        if status != 0x00 {
369            let error_msg = match status {
370                0x01 => "Connection failure - Resource unavailable or already exists",
371                0x02 => "Invalid parameter - Connection parameters rejected",
372                0x03 => "Connection timeout - PLC did not respond in time",
373                0x04 => "Connection limit exceeded - Too many connections",
374                0x08 => "Invalid service - Forward Open not supported",
375                0x0C => "Invalid attribute - Connection parameters invalid",
376                0x13 => "Path destination unknown - Target object not found",
377                0x26 => "Invalid parameter value - RPI or size out of range",
378                _ => &format!("Unknown status: 0x{status:02X}"),
379            };
380            return Err(EtherNetIpError::Protocol(format!(
381                "Forward Open failed with status 0x{status:02X}: {error_msg}"
382            )));
383        }
384
385        // Parse successful response
386        if response.len() < 16 {
387            return Err(EtherNetIpError::Protocol(
388                "Forward Open response data too short".to_string(),
389            ));
390        }
391
392        // CRITICAL FIX: The Forward Open response contains the actual connection IDs assigned by the PLC
393        // Use the IDs returned by the PLC, not our requested ones
394        let actual_o_to_t_id =
395            u32::from_le_bytes([response[2], response[3], response[4], response[5]]);
396        let actual_t_to_o_id =
397            u32::from_le_bytes([response[6], response[7], response[8], response[9]]);
398
399        // Update session with the actual assigned connection IDs
400        session.o_to_t_connection_id = actual_o_to_t_id;
401        session.t_to_o_connection_id = actual_t_to_o_id;
402        session.connection_id = actual_o_to_t_id; // Use O->T as the primary connection ID
403
404        tracing::info!("[FORWARD OPEN] Success!");
405        tracing::debug!(
406            "O->T Connection ID: 0x{:08X} (PLC assigned)",
407            session.o_to_t_connection_id
408        );
409        tracing::debug!(
410            "T->O Connection ID: 0x{:08X} (PLC assigned)",
411            session.t_to_o_connection_id
412        );
413        tracing::debug!(
414            "Using Connection ID: 0x{:08X} for messaging",
415            session.connection_id
416        );
417
418        Ok(())
419    }
420
421    /// Writes a string using connected explicit messaging
422    pub async fn write_string_connected(
423        &mut self,
424        tag_name: &str,
425        value: &str,
426    ) -> crate::error::Result<()> {
427        let session_name = format!("string_write_{tag_name}");
428        let mut sessions = self.connected_sessions.lock().await;
429
430        if !sessions.contains_key(&session_name) {
431            drop(sessions); // Release the lock before calling establish_connected_session
432            self.establish_connected_session(&session_name).await?;
433            sessions = self.connected_sessions.lock().await;
434        }
435
436        let session = sessions.get(&session_name).cloned().ok_or_else(|| {
437            crate::error::EtherNetIpError::Connection(format!(
438                "connected session '{session_name}' was not available after establishment"
439            ))
440        })?;
441        let request = self.build_connected_string_write_request(tag_name, value, &session)?;
442
443        drop(sessions); // Release the lock before sending the request
444        let response = self
445            .send_connected_cip_request(&request, &session, &session_name)
446            .await?;
447
448        // Check if write was successful
449        if response.len() >= 2 {
450            let status = response[1];
451            if status == 0x00 {
452                Ok(())
453            } else {
454                let error_msg = self.get_cip_error_message(status);
455                Err(EtherNetIpError::Protocol(format!(
456                    "CIP Error 0x{status:02X}: {error_msg}"
457                )))
458            }
459        } else {
460            Err(EtherNetIpError::Protocol(
461                "Invalid connected string write response".to_string(),
462            ))
463        }
464    }
465
466    /// Builds a string write request for connected messaging
467    fn build_connected_string_write_request(
468        &self,
469        tag_name: &str,
470        value: &str,
471        _session: &ConnectedSession,
472    ) -> crate::error::Result<Vec<u8>> {
473        let mut request = Vec::new();
474
475        // For connected messaging, use direct CIP Write service
476        // The connection is already established, so we can send the request directly
477
478        // CIP Write Service Code
479        request.push(0x4D);
480
481        // Tag path - use simple ANSI format for connected messaging
482        let tag_bytes = tag_name.as_bytes();
483        let path_size_words = (2 + tag_bytes.len()).div_ceil(2); // +1 for potential padding, /2 for word count
484        request.push(path_size_words as u8);
485
486        request.push(0x91); // ANSI symbol segment
487        request.push(tag_bytes.len() as u8); // Length of tag name
488        request.extend_from_slice(tag_bytes);
489
490        // Add padding byte if needed to make path even length
491        if !(2 + tag_bytes.len()).is_multiple_of(2) {
492            request.push(0x00);
493        }
494
495        // Data type for AB STRING
496        request.extend_from_slice(&[0xCE, 0x0F]); // AB STRING data type (4046)
497
498        // Number of elements (always 1 for a single string)
499        request.extend_from_slice(&[0x01, 0x00]);
500
501        // Build the AB STRING structure payload
502        let string_bytes = value.as_bytes();
503        let max_len: u16 = 82; // Standard AB STRING max length
504        let current_len = string_bytes.len().min(max_len as usize) as u16;
505
506        // STRING structure:
507        // - Len (2 bytes) - number of characters used
508        request.extend_from_slice(&current_len.to_le_bytes());
509
510        // - MaxLen (2 bytes) - maximum characters allowed (typically 82)
511        request.extend_from_slice(&max_len.to_le_bytes());
512
513        // - Data[MaxLen] (82 bytes) - the character array, zero-padded
514        let mut data_array = vec![0u8; max_len as usize];
515        data_array[..current_len as usize].copy_from_slice(&string_bytes[..current_len as usize]);
516        request.extend_from_slice(&data_array);
517
518        tracing::trace!(
519            "Built connected string write request ({} bytes) for '{}' = '{}' (len={}, maxlen={})",
520            request.len(),
521            tag_name,
522            value,
523            current_len,
524            max_len
525        );
526        tracing::trace!("Request: {:02X?}", request);
527
528        Ok(request)
529    }
530
531    /// Sends a CIP request using connected messaging
532    async fn send_connected_cip_request(
533        &mut self,
534        cip_request: &[u8],
535        session: &ConnectedSession,
536        session_name: &str,
537    ) -> crate::error::Result<Vec<u8>> {
538        tracing::debug!(
539            "[CONNECTED] Sending connected CIP request ({} bytes) using T->O connection ID 0x{:08X}",
540            cip_request.len(),
541            session.t_to_o_connection_id
542        );
543
544        let mut packet = BytesMut::new();
545        EncapsulationHeader::new(SEND_RR_DATA, 0, self.session_handle).encode(&mut packet);
546
547        // CPF (Common Packet Format) data starts here
548        let cpf_start = packet.len();
549
550        // Interface handle (4 bytes)
551        packet.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
552
553        // Timeout (2 bytes) - 5 seconds
554        packet.extend_from_slice(&[0x05, 0x00]);
555
556        // Item count (2 bytes) - 2 items: Address + Data
557        packet.extend_from_slice(&[0x02, 0x00]);
558
559        // Item 1: Connected Address Item (specifies which connection to use)
560        packet.extend_from_slice(&[0xA1, 0x00]); // Type: Connected Address Item (0x00A1)
561        packet.extend_from_slice(&[0x04, 0x00]); // Length: 4 bytes
562        // Use T->O connection ID (Target to Originator) for addressing
563        packet.extend_from_slice(&session.t_to_o_connection_id.to_le_bytes());
564
565        // Item 2: Connected Data Item (contains the CIP request + sequence)
566        packet.extend_from_slice(&[0xB1, 0x00]); // Type: Connected Data Item (0x00B1)
567        let data_length = cip_request.len() + 2; // +2 for sequence count
568        packet.extend_from_slice(&(data_length as u16).to_le_bytes()); // Length
569
570        // Clone session_name and session before acquiring the lock
571        let session_name_clone = session_name.to_string();
572        let _session_clone = session.clone();
573
574        // Get the current session mutably to increment sequence counter
575        let mut sessions = self.connected_sessions.lock().await;
576        let current_sequence = if let Some(session_mut) = sessions.get_mut(&session_name_clone) {
577            session_mut.sequence_count += 1;
578            session_mut.sequence_count
579        } else {
580            1 // Fallback if session not found
581        };
582
583        // Drop the lock before sending the request
584        drop(sessions);
585
586        // Sequence count (2 bytes) - incremental counter for this connection
587        packet.extend_from_slice(&current_sequence.to_le_bytes());
588
589        // CIP request data
590        packet.extend_from_slice(cip_request);
591
592        // Update packet length in header (total CPF data size)
593        let cpf_length = packet.len() - cpf_start;
594        packet[2..4].copy_from_slice(&(cpf_length as u16).to_le_bytes());
595
596        tracing::trace!(
597            "[CONNECTED] Sending packet ({} bytes) with sequence {}",
598            packet.len(),
599            current_sequence
600        );
601
602        // Send packet
603        let mut stream = self.stream.lock().await;
604        stream
605            .write_all(&packet)
606            .await
607            .map_err(EtherNetIpError::Io)?;
608
609        // Read response header
610        let mut header = [0u8; 24];
611        stream
612            .read_exact(&mut header)
613            .await
614            .map_err(EtherNetIpError::Io)?;
615
616        // Check EtherNet/IP command status
617        let mut header_bytes = &header[..];
618        let response_header = EncapsulationHeader::decode(&mut header_bytes)?;
619        if response_header.status != 0 {
620            return Err(EtherNetIpError::Protocol(format!(
621                "Connected message failed with status: 0x{:08X}",
622                response_header.status
623            )));
624        }
625
626        // Read response data
627        let response_length = response_header.length as usize;
628        let mut response_data = vec![0u8; response_length];
629        stream
630            .read_exact(&mut response_data)
631            .await
632            .map_err(EtherNetIpError::Io)?;
633
634        let mut last_activity = self.last_activity.lock().await;
635        *last_activity = Instant::now();
636
637        tracing::trace!(
638            "[CONNECTED] Received response ({} bytes)",
639            response_data.len()
640        );
641
642        // Extract connected CIP response
643        self.extract_connected_cip_from_response(&response_data)
644    }
645
646    /// Extracts CIP data from connected response
647    fn extract_connected_cip_from_response(
648        &self,
649        response: &[u8],
650    ) -> crate::error::Result<Vec<u8>> {
651        tracing::trace!(
652            "[CONNECTED] Extracting CIP from connected response ({} bytes): {:02X?}",
653            response.len(),
654            response
655        );
656
657        if response.len() < 12 {
658            return Err(EtherNetIpError::Protocol(
659                "Connected response too short for CPF header".to_string(),
660            ));
661        }
662
663        // Parse CPF (Common Packet Format) structure
664        // [0-3]: Interface handle
665        // [4-5]: Timeout
666        // [6-7]: Item count
667        let item_count = u16::from_le_bytes([response[6], response[7]]) as usize;
668        tracing::trace!("[CONNECTED] CPF item count: {}", item_count);
669
670        let mut pos = 8; // Start after CPF header
671
672        // Look for Connected Data Item (0x00B1)
673        for _i in 0..item_count {
674            if pos + 4 > response.len() {
675                return Err(EtherNetIpError::Protocol(
676                    "Response truncated while parsing items".to_string(),
677                ));
678            }
679
680            let item_type = u16::from_le_bytes([response[pos], response[pos + 1]]);
681            let item_length = u16::from_le_bytes([response[pos + 2], response[pos + 3]]) as usize;
682            pos += 4; // Skip item header
683
684            tracing::trace!(
685                "[CONNECTED] Found item: type=0x{:04X}, length={}",
686                item_type,
687                item_length
688            );
689
690            if pos
691                .checked_add(item_length)
692                .is_none_or(|end| end > response.len())
693            {
694                return Err(EtherNetIpError::Protocol(
695                    "Connected data item truncated".to_string(),
696                ));
697            }
698
699            if item_type == 0x00B1 {
700                // Connected Data Item
701                // Connected Data Item contains [sequence_count(2)][cip_data]
702                if item_length < 2 {
703                    return Err(EtherNetIpError::Protocol(
704                        "Connected data item too short for sequence".to_string(),
705                    ));
706                }
707
708                let sequence_count = u16::from_le_bytes([response[pos], response[pos + 1]]);
709                tracing::trace!("[CONNECTED] Sequence count: {}", sequence_count);
710
711                // Extract CIP data (skip 2-byte sequence count)
712                let cip_data = response[pos + 2..pos + item_length].to_vec();
713                tracing::trace!(
714                    "[CONNECTED] Extracted CIP data ({} bytes): {:02X?}",
715                    cip_data.len(),
716                    cip_data
717                );
718
719                return Ok(cip_data);
720            } else {
721                // Skip this item's data
722                pos += item_length;
723            }
724        }
725
726        Err(EtherNetIpError::Protocol(
727            "Connected Data Item (0x00B1) not found in response".to_string(),
728        ))
729    }
730
731    /// Closes a specific connected session
732    async fn close_connected_session(&mut self, session_name: &str) -> crate::error::Result<()> {
733        if let Some(session) = self.connected_sessions.lock().await.get(session_name) {
734            let session = session.clone(); // Clone to avoid borrowing issues
735
736            // Build Forward Close request
737            let forward_close_request = self.build_forward_close_request(&session)?;
738
739            // Send Forward Close request
740            let _response = self.send_cip_request(&forward_close_request).await?;
741
742            tracing::info!("[CONNECTED] Session '{}' closed successfully", session_name);
743        }
744
745        // Remove session from our tracking
746        let mut sessions = self.connected_sessions.lock().await;
747        sessions.remove(session_name);
748
749        Ok(())
750    }
751
752    /// Builds a Forward Close CIP request for terminating connected sessions
753    fn build_forward_close_request(
754        &self,
755        session: &ConnectedSession,
756    ) -> crate::error::Result<Vec<u8>> {
757        let mut request = Vec::with_capacity(21);
758
759        // CIP Forward Close Service (0x4E)
760        request.push(0x4E);
761
762        // Request path length (Connection Manager object)
763        request.push(0x02); // 2 words
764
765        // Class ID: Connection Manager (0x06)
766        request.push(0x20); // Logical Class segment
767        request.push(0x06);
768
769        // Instance ID: Connection Manager instance (0x01)
770        request.push(0x24); // Logical Instance segment
771        request.push(0x01);
772
773        // Forward Close parameters
774
775        // Connection Timeout Ticks (1 byte) + Timeout multiplier (1 byte)
776        request.push(0x0A); // Timeout ticks (10)
777        request.push(session.timeout_multiplier);
778
779        // Connection Serial Number (2 bytes, little-endian)
780        request.extend_from_slice(&session.connection_serial.to_le_bytes());
781
782        // Originator Vendor ID (2 bytes, little-endian)
783        request.extend_from_slice(&session.originator_vendor_id.to_le_bytes());
784
785        // Originator Serial Number (4 bytes, little-endian)
786        request.extend_from_slice(&session.originator_serial.to_le_bytes());
787
788        // Connection Path Size (1 byte)
789        request.push(0x02); // 2 words for Message Router path
790
791        // Connection Path - Target the Message Router
792        request.push(0x20); // Logical Class segment
793        request.push(0x02); // Message Router class (0x02)
794        request.push(0x24); // Logical Instance segment
795        request.push(0x01); // Message Router instance (0x01)
796
797        Ok(request)
798    }
799
800    /// Closes all connected sessions (called during disconnect)
801    pub(super) async fn close_all_connected_sessions(&mut self) -> crate::error::Result<()> {
802        let session_names: Vec<String> = self
803            .connected_sessions
804            .lock()
805            .await
806            .keys()
807            .cloned()
808            .collect();
809
810        for session_name in session_names {
811            let _ = self.close_connected_session(&session_name).await; // Ignore errors during cleanup
812        }
813
814        Ok(())
815    }
816
817    /// Writes a string using unconnected explicit messaging with proper AB STRING format
818    ///
819    /// This method uses standard unconnected messaging instead of connected messaging
820    /// and implements the proper Allen-Bradley STRING structure as described in the
821    /// provided information about `Len`, `MaxLen`, and `Data[82]` format.
822    pub async fn write_string_unconnected(
823        &mut self,
824        tag_name: &str,
825        value: &str,
826    ) -> crate::error::Result<()> {
827        tracing::debug!(
828            "[UNCONNECTED] Writing string '{}' to tag '{}' using unconnected messaging",
829            value,
830            tag_name
831        );
832
833        self.validate_session().await?;
834
835        let string_bytes = value.as_bytes();
836        if string_bytes.len() > 82 {
837            return Err(EtherNetIpError::Protocol(
838                "String too long for Allen-Bradley STRING (max 82 chars)".to_string(),
839            ));
840        }
841
842        // Build the CIP request with proper AB STRING structure
843        let mut cip_request = Vec::new();
844
845        // Service: Write Tag Service (0x4D)
846        cip_request.push(0x4D);
847
848        // Request Path Size (in words)
849        let tag_bytes = tag_name.as_bytes();
850        let path_len = if tag_bytes.len().is_multiple_of(2) {
851            tag_bytes.len() + 2
852        } else {
853            tag_bytes.len() + 3
854        } / 2;
855        cip_request.push(path_len as u8);
856
857        // Request Path: ANSI Extended Symbol Segment for tag name
858        cip_request.push(0x91); // ANSI Extended Symbol Segment
859        cip_request.push(tag_bytes.len() as u8); // Tag name length
860        cip_request.extend_from_slice(tag_bytes); // Tag name
861
862        // Pad to even length if necessary
863        if !tag_bytes.len().is_multiple_of(2) {
864            cip_request.push(0x00);
865        }
866
867        // For write operations, we don't include data type and element count
868        // The PLC infers the data type from the tag definition
869
870        // Build Allen-Bradley STRING structure based on what we see in read responses:
871        // Looking at read response: [CE, 0F, 01, 00, 00, 00, 31, 00, ...]
872        // Structure appears to be:
873        // - Some header/identifier (2 bytes): 0xCE, 0x0F
874        // - Length (2 bytes): number of characters
875        // - MaxLength or padding (2 bytes): 0x00, 0x00
876        // - Data array (variable length, null terminated)
877
878        let _current_len = string_bytes.len().min(82) as u16;
879
880        // Build the correct Allen-Bradley STRING structure to match what the PLC expects
881        // Analysis of read response: [CE, 0F, 01, 00, 00, 00, 31, 00, 00, 00, ...]
882        // Structure appears to be:
883        // - Header (2 bytes): 0xCE, 0x0F (Allen-Bradley STRING identifier)
884        // - Length (4 bytes, DINT): Number of characters currently used
885        // - Data (variable): Character data followed by padding to complete the structure
886
887        let current_len = string_bytes.len().min(82) as u32;
888
889        // AB STRING header/identifier - this appears to be required
890        cip_request.extend_from_slice(&[0xCE, 0x0F]);
891
892        // Length (4 bytes) - number of characters used as DINT
893        cip_request.extend_from_slice(&current_len.to_le_bytes());
894
895        // Data bytes - the actual string content
896        cip_request.extend_from_slice(&string_bytes[..current_len as usize]);
897
898        // Add padding if the total structure needs to be a specific size
899        // Based on reads, it looks like there might be additional padding after the data
900
901        tracing::trace!(
902            "Built Allen-Bradley STRING write request ({} bytes) for '{}' = '{}' (len={})",
903            cip_request.len(),
904            tag_name,
905            value,
906            current_len
907        );
908        tracing::trace!(
909            "Request structure: Service=0x4D, Path={} bytes, Header=0xCE0F, Len={} (4 bytes), Data",
910            path_len * 2,
911            current_len
912        );
913
914        // Send the request using standard unconnected messaging
915        let response = self.send_cip_request(&cip_request).await?;
916
917        // Extract CIP response from EtherNet/IP wrapper
918        let cip_response = self.extract_cip_from_response(&response)?;
919
920        // Check if write was successful - use correct CIP response format
921        if cip_response.len() >= 3 {
922            let service_reply = cip_response[0]; // Should be 0xCD (0x4D + 0x80) for Write Tag reply
923            let _additional_status_size = cip_response[1]; // Additional status size (usually 0)
924            let status = cip_response[2]; // CIP status code at position 2
925
926            tracing::trace!(
927                "Write response - Service: 0x{:02X}, Status: 0x{:02X}",
928                service_reply,
929                status
930            );
931
932            if status == 0x00 {
933                tracing::info!("[UNCONNECTED] String write completed successfully");
934                Ok(())
935            } else {
936                let error_msg = self.get_cip_error_message(status);
937                tracing::error!(
938                    "[UNCONNECTED] String write failed: {} (0x{:02X})",
939                    error_msg,
940                    status
941                );
942                Err(EtherNetIpError::Protocol(format!(
943                    "CIP Error 0x{status:02X}: {error_msg}"
944                )))
945            }
946        } else {
947            Err(EtherNetIpError::Protocol(
948                "Invalid unconnected string write response - too short".to_string(),
949            ))
950        }
951    }
952
953    /// Write a string value to a PLC tag using unconnected messaging
954    ///
955    /// # Arguments
956    ///
957    /// * `tag_name` - The name of the tag to write to
958    /// * `value` - The string value to write (max 82 characters)
959    ///
960    /// # Returns
961    ///
962    /// * `Ok(())` if the write was successful
963    /// * `Err(EtherNetIpError)` if the write failed
964    ///
965    /// # Errors
966    ///
967    /// * `StringTooLong` - If the string is longer than 82 characters
968    /// * `InvalidString` - If the string contains invalid characters
969    /// * `TagNotFound` - If the tag doesn't exist
970    /// * `WriteError` - If the write operation fails
971    pub async fn write_string(&mut self, tag_name: &str, value: &str) -> crate::error::Result<()> {
972        // Validate string length
973        if value.len() > 82 {
974            return Err(crate::error::EtherNetIpError::StringTooLong {
975                max_length: 82,
976                actual_length: value.len(),
977            });
978        }
979
980        // Validate string content (ASCII only)
981        if !value.is_ascii() {
982            return Err(crate::error::EtherNetIpError::InvalidString {
983                reason: "String contains non-ASCII characters".to_string(),
984            });
985        }
986
987        // Build the string write request
988        let request = self.build_string_write_request(tag_name, value)?;
989
990        // Send the request and get the response
991        let response = self.send_cip_request(&request).await?;
992
993        // Parse the response
994        let cip_response = self.extract_cip_from_response(&response)?;
995
996        // Check for errors in the response
997        if cip_response.len() < 2 {
998            return Err(crate::error::EtherNetIpError::InvalidResponse {
999                reason: "Response too short".to_string(),
1000            });
1001        }
1002
1003        let status = cip_response[0];
1004        if status != 0 {
1005            return Err(crate::error::EtherNetIpError::WriteError {
1006                status,
1007                message: self.get_cip_error_message(status),
1008            });
1009        }
1010
1011        Ok(())
1012    }
1013
1014    /// Build a string write request packet
1015    fn build_string_write_request(
1016        &self,
1017        tag_name: &str,
1018        value: &str,
1019    ) -> crate::error::Result<Vec<u8>> {
1020        let mut request = Vec::new();
1021
1022        // CIP Write Service (0x4D)
1023        request.push(0x4D);
1024
1025        // Tag path
1026        let tag_path = self.build_tag_path(tag_name);
1027        request.extend_from_slice(&tag_path);
1028
1029        // AB STRING data structure
1030        request.extend_from_slice(&(value.len() as u16).to_le_bytes()); // Len
1031        request.extend_from_slice(&82u16.to_le_bytes()); // MaxLen
1032
1033        // Data[82] with padding
1034        let mut data = [0u8; 82];
1035        let bytes = value.as_bytes();
1036        data[..bytes.len()].copy_from_slice(bytes);
1037        request.extend_from_slice(&data);
1038
1039        Ok(request)
1040    }
1041}