Skip to main content

rust_ethernet_ip/client/
batch_exec.rs

1use super::EipClient;
2use crate::batch::{BatchConfig, BatchError, BatchOperation, BatchResult};
3use crate::protocol::values;
4use crate::types::PlcValue;
5use tokio::time::Instant;
6
7impl EipClient {
8    // =========================================================================
9    // BATCH OPERATIONS IMPLEMENTATION
10    // =========================================================================
11
12    /// Executes a batch of read and write operations
13    ///
14    /// This is the main entry point for batch operations. It takes a slice of
15    /// `BatchOperation` items and executes them efficiently by grouping them
16    /// into optimal CIP packets based on the current `BatchConfig`.
17    ///
18    /// # Arguments
19    ///
20    /// * `operations` - A slice of operations to execute
21    ///
22    /// # Returns
23    ///
24    /// A vector of [`BatchResult`] items, one per executed operation.
25    ///
26    /// When `optimize_packet_packing` is enabled, operations may be regrouped
27    /// by type for execution, so result order is not guaranteed to match the
28    /// original mixed-operation input order. Use [`BatchResult::operation`] to
29    /// correlate each result.
30    ///
31    /// # Performance
32    ///
33    /// Batch execution primarily reduces round trips by combining multiple
34    /// operations into fewer requests. Observed throughput varies significantly
35    /// between simulator and real hardware, and also depends on packet sizing,
36    /// controller model, route path, and tag mix.
37    ///
38    /// # Examples
39    ///
40    /// ```rust,no_run
41    /// use rust_ethernet_ip::{EipClient, BatchOperation, PlcValue};
42    ///
43    /// #[tokio::main]
44    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
45    ///     let mut client = EipClient::connect("192.168.1.100:44818").await?;
46    ///
47    ///     let operations = vec![
48    ///         BatchOperation::Read { tag_name: "Motor1_Speed".to_string() },
49    ///         BatchOperation::Read { tag_name: "Motor2_Speed".to_string() },
50    ///         BatchOperation::Write {
51    ///             tag_name: "SetPoint".to_string(),
52    ///             value: PlcValue::Dint(1500)
53    ///         },
54    ///     ];
55    ///
56    ///     let results = client.execute_batch(&operations).await?;
57    ///
58    ///     for result in results {
59    ///         match result.result {
60    ///             Ok(Some(value)) => println!("Read value: {:?}", value),
61    ///             Ok(None) => println!("Write successful"),
62    ///             Err(e) => println!("Operation failed: {}", e),
63    ///         }
64    ///     }
65    ///
66    ///     Ok(())
67    /// }
68    /// ```
69    pub async fn execute_batch(
70        &mut self,
71        operations: &[BatchOperation],
72    ) -> crate::error::Result<Vec<BatchResult>> {
73        if operations.is_empty() {
74            return Ok(Vec::new());
75        }
76
77        let start_time = Instant::now();
78        tracing::debug!(
79            "[BATCH] Starting batch execution with {} operations",
80            operations.len()
81        );
82
83        // Group operations based on configuration
84        let operation_groups = if self.batch_config.optimize_packet_packing {
85            self.optimize_operation_groups(operations)
86        } else {
87            self.sequential_operation_groups(operations)
88        };
89
90        let mut all_results = Vec::with_capacity(operations.len());
91
92        // Execute each group
93        for (group_index, group) in operation_groups.iter().enumerate() {
94            tracing::debug!(
95                "[BATCH] Processing group {} with {} operations",
96                group_index + 1,
97                group.len()
98            );
99
100            match self.execute_operation_group(group).await {
101                Ok(mut group_results) => {
102                    all_results.append(&mut group_results);
103                }
104                Err(e) => {
105                    if !self.batch_config.continue_on_error {
106                        return Err(e);
107                    }
108
109                    // Create error results for this group
110                    for op in group {
111                        let error_result = BatchResult {
112                            operation: op.clone(),
113                            result: Err(BatchError::NetworkError(e.to_string())),
114                            execution_time_us: 0,
115                        };
116                        all_results.push(error_result);
117                    }
118                }
119            }
120        }
121
122        let total_time = start_time.elapsed();
123        tracing::info!(
124            "[BATCH] Completed batch execution in {:?} - {} operations processed",
125            total_time,
126            all_results.len()
127        );
128
129        Ok(all_results)
130    }
131
132    /// Reads multiple tags in a single batch operation
133    ///
134    /// This is a convenience method for read-only batch operations.
135    /// It's optimized for reading many tags at once.
136    ///
137    /// # Arguments
138    ///
139    /// * `tag_names` - A slice of tag names to read
140    ///
141    /// # Returns
142    ///
143    /// A vector of tuples containing `(tag_name, result)` pairs
144    ///
145    /// # Examples
146    ///
147    /// ```rust,no_run
148    /// use rust_ethernet_ip::EipClient;
149    ///
150    /// #[tokio::main]
151    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
152    ///     let mut client = EipClient::connect("192.168.1.100:44818").await?;
153    ///
154    ///     let tags = ["Motor1_Speed", "Motor2_Speed", "Temperature", "Pressure"];
155    ///     let results = client.read_tags_batch(&tags).await?;
156    ///
157    ///     for (tag_name, result) in results {
158    ///         match result {
159    ///             Ok(value) => println!("{}: {:?}", tag_name, value),
160    ///             Err(e) => println!("{}: Error - {}", tag_name, e),
161    ///         }
162    ///     }
163    ///
164    ///     Ok(())
165    /// }
166    /// ```
167    pub async fn read_tags_batch(
168        &mut self,
169        tag_names: &[&str],
170    ) -> crate::error::Result<Vec<(String, std::result::Result<PlcValue, BatchError>)>> {
171        let operations: Vec<BatchOperation> = tag_names
172            .iter()
173            .map(|&name| BatchOperation::Read {
174                tag_name: name.to_string(),
175            })
176            .collect();
177
178        let results = self.execute_batch(&operations).await?;
179
180        Ok(results
181            .into_iter()
182            .map(|result| {
183                let tag_name = match &result.operation {
184                    BatchOperation::Read { tag_name } => tag_name.clone(),
185                    BatchOperation::Write { .. } => {
186                        unreachable!("Should only have read operations")
187                    }
188                };
189
190                let value_result = match result.result {
191                    Ok(Some(value)) => Ok(value),
192                    Ok(None) => Err(BatchError::Other(
193                        "Unexpected None result for read operation".to_string(),
194                    )),
195                    Err(e) => Err(e),
196                };
197
198                (tag_name, value_result)
199            })
200            .collect())
201    }
202
203    /// Writes multiple tag values in a single batch operation
204    ///
205    /// This is a convenience method for write-only batch operations.
206    /// It's optimized for writing many values at once.
207    ///
208    /// # Arguments
209    ///
210    /// * `tag_values` - A slice of `(tag_name, value)` tuples to write
211    ///
212    /// # Returns
213    ///
214    /// A vector of tuples containing `(tag_name, result)` pairs
215    ///
216    /// # Examples
217    ///
218    /// ```rust,no_run
219    /// use rust_ethernet_ip::{EipClient, PlcValue};
220    ///
221    /// #[tokio::main]
222    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223    ///     let mut client = EipClient::connect("192.168.1.100:44818").await?;
224    ///
225    ///     let writes = vec![
226    ///         ("SetPoint1", PlcValue::Bool(true)),
227    ///         ("SetPoint2", PlcValue::Dint(2000)),
228    ///         ("EnableFlag", PlcValue::Bool(true)),
229    ///     ];
230    ///
231    ///     let results = client.write_tags_batch(&writes).await?;
232    ///
233    ///     for (tag_name, result) in results {
234    ///         match result {
235    ///             Ok(_) => println!("{}: Write successful", tag_name),
236    ///             Err(e) => println!("{}: Write failed - {}", tag_name, e),
237    ///         }
238    ///     }
239    ///
240    ///     Ok(())
241    /// }
242    /// ```
243    pub async fn write_tags_batch(
244        &mut self,
245        tag_values: &[(&str, PlcValue)],
246    ) -> crate::error::Result<Vec<(String, std::result::Result<(), BatchError>)>> {
247        let operations: Vec<BatchOperation> = tag_values
248            .iter()
249            .map(|(name, value)| BatchOperation::Write {
250                tag_name: name.to_string(),
251                value: value.clone(),
252            })
253            .collect();
254
255        let results = self.execute_batch(&operations).await?;
256
257        Ok(results
258            .into_iter()
259            .map(|result| {
260                let tag_name = match &result.operation {
261                    BatchOperation::Write { tag_name, .. } => tag_name.clone(),
262                    BatchOperation::Read { .. } => {
263                        unreachable!("Should only have write operations")
264                    }
265                };
266
267                let write_result = match result.result {
268                    Ok(None) => Ok(()),
269                    Ok(Some(_)) => Err(BatchError::Other(
270                        "Unexpected value result for write operation".to_string(),
271                    )),
272                    Err(e) => Err(e),
273                };
274
275                (tag_name, write_result)
276            })
277            .collect())
278    }
279
280    /// Configures batch operation settings
281    ///
282    /// This method allows fine-tuning of batch operation behavior,
283    /// including performance optimizations and error handling.
284    ///
285    /// # Arguments
286    ///
287    /// * `config` - The new batch configuration to use
288    ///
289    /// # Examples
290    ///
291    /// ```rust,no_run
292    /// use rust_ethernet_ip::{EipClient, BatchConfig};
293    ///
294    /// #[tokio::main]
295    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
296    ///     let mut client = EipClient::connect("192.168.1.100:44818").await?;
297    ///
298    ///     let config = BatchConfig {
299    ///         max_operations_per_packet: 50,
300    ///         max_packet_size: 1500,
301    ///         packet_timeout_ms: 5000,
302    ///         continue_on_error: false,
303    ///         optimize_packet_packing: true,
304    ///     };
305    ///
306    ///     client.configure_batch_operations(config);
307    ///
308    ///     Ok(())
309    /// }
310    /// ```
311    pub fn configure_batch_operations(&mut self, config: BatchConfig) {
312        self.batch_config = config;
313        tracing::debug!(
314            "[BATCH] Updated batch configuration: max_ops={}, max_size={}, timeout={}ms",
315            self.batch_config.max_operations_per_packet,
316            self.batch_config.max_packet_size,
317            self.batch_config.packet_timeout_ms
318        );
319    }
320
321    /// Gets current batch operation configuration
322    pub fn get_batch_config(&self) -> &BatchConfig {
323        &self.batch_config
324    }
325
326    // =========================================================================
327    // INTERNAL BATCH OPERATION HELPERS
328    // =========================================================================
329
330    /// Groups operations optimally for batch processing
331    fn optimize_operation_groups(&self, operations: &[BatchOperation]) -> Vec<Vec<BatchOperation>> {
332        let mut groups = Vec::new();
333        let mut reads = Vec::new();
334        let mut writes = Vec::new();
335
336        // Separate reads and writes
337        for op in operations {
338            match op {
339                BatchOperation::Read { .. } => reads.push(op.clone()),
340                BatchOperation::Write { .. } => writes.push(op.clone()),
341            }
342        }
343
344        // Group reads
345        for chunk in reads.chunks(self.batch_config.max_operations_per_packet) {
346            groups.push(chunk.to_vec());
347        }
348
349        // Group writes
350        for chunk in writes.chunks(self.batch_config.max_operations_per_packet) {
351            groups.push(chunk.to_vec());
352        }
353
354        groups
355    }
356
357    /// Groups operations sequentially (preserves order)
358    fn sequential_operation_groups(
359        &self,
360        operations: &[BatchOperation],
361    ) -> Vec<Vec<BatchOperation>> {
362        operations
363            .chunks(self.batch_config.max_operations_per_packet)
364            .map(|chunk| chunk.to_vec())
365            .collect()
366    }
367
368    /// Executes a single group of operations as a CIP Multiple Service Packet
369    async fn execute_operation_group(
370        &mut self,
371        operations: &[BatchOperation],
372    ) -> crate::error::Result<Vec<BatchResult>> {
373        let start_time = Instant::now();
374        let mut results = Vec::with_capacity(operations.len());
375
376        // Build Multiple Service Packet request
377        let cip_request = self.build_multiple_service_packet(operations)?;
378
379        // Send request and get response
380        let response = self.send_cip_request(&cip_request).await?;
381
382        // Parse response and create results
383        let parsed_results = self.parse_multiple_service_response(&response, operations)?;
384
385        let execution_time = start_time.elapsed();
386
387        // Create BatchResult objects
388        for (i, operation) in operations.iter().enumerate() {
389            let op_execution_time = execution_time.as_micros() as u64 / operations.len() as u64;
390
391            let result = if i < parsed_results.len() {
392                match &parsed_results[i] {
393                    Ok(value) => Ok(value.clone()),
394                    Err(e) => Err(e.clone()),
395                }
396            } else {
397                Err(BatchError::Other(
398                    "Missing result from response".to_string(),
399                ))
400            };
401
402            results.push(BatchResult {
403                operation: operation.clone(),
404                result,
405                execution_time_us: op_execution_time,
406            });
407        }
408
409        Ok(results)
410    }
411
412    /// Builds a CIP Multiple Service Packet request
413    fn build_multiple_service_packet(
414        &self,
415        operations: &[BatchOperation],
416    ) -> crate::error::Result<Vec<u8>> {
417        let mut packet = Vec::with_capacity(8 + (operations.len() * 2));
418
419        // Multiple Service Packet service code
420        packet.push(0x0A);
421
422        // Request path (2 bytes for class 0x02, instance 1)
423        packet.push(0x02); // Path size in words
424        packet.push(0x20); // Class segment
425        packet.push(0x02); // Class 0x02 (Message Router)
426        packet.push(0x24); // Instance segment
427        packet.push(0x01); // Instance 1
428
429        // Number of services
430        packet.extend_from_slice(&(operations.len() as u16).to_le_bytes());
431
432        // Calculate offset table
433        let mut service_requests = Vec::with_capacity(operations.len());
434        let mut current_offset = 2 + (operations.len() * 2); // Start after offset table
435
436        for operation in operations {
437            // Build individual service request
438            let service_request = match operation {
439                BatchOperation::Read { tag_name } => self.build_read_request(tag_name)?,
440                BatchOperation::Write { tag_name, value } => {
441                    self.build_write_request(tag_name, value)?
442                }
443            };
444
445            service_requests.push(service_request);
446        }
447
448        // Add offset table
449        for service_request in &service_requests {
450            packet.extend_from_slice(&(current_offset as u16).to_le_bytes());
451            current_offset += service_request.len();
452        }
453
454        // Add service requests
455        for service_request in service_requests {
456            packet.extend_from_slice(&service_request);
457        }
458
459        tracing::trace!(
460            "[BATCH] Built Multiple Service Packet ({} bytes, {} services)",
461            packet.len(),
462            operations.len()
463        );
464
465        Ok(packet)
466    }
467
468    /// Parses a Multiple Service Packet response
469    fn parse_multiple_service_response(
470        &self,
471        response: &[u8],
472        operations: &[BatchOperation],
473    ) -> crate::error::Result<Vec<std::result::Result<Option<PlcValue>, BatchError>>> {
474        if response.len() < 6 {
475            return Err(crate::error::EtherNetIpError::Protocol(
476                "Response too short for Multiple Service Packet".to_string(),
477            ));
478        }
479
480        let mut results = Vec::new();
481
482        tracing::trace!(
483            "Raw Multiple Service Response ({} bytes): {:02X?}",
484            response.len(),
485            response
486        );
487
488        // First, extract the CIP data from the EtherNet/IP response
489        let cip_data = match self.extract_cip_from_response(response) {
490            Ok(data) => data,
491            Err(e) => {
492                tracing::error!("Failed to extract CIP data: {}", e);
493                return Err(e);
494            }
495        };
496
497        tracing::trace!(
498            "Extracted CIP data ({} bytes): {:02X?}",
499            cip_data.len(),
500            cip_data
501        );
502
503        if cip_data.len() < 6 {
504            return Err(crate::error::EtherNetIpError::Protocol(
505                "CIP data too short for Multiple Service Response".to_string(),
506            ));
507        }
508
509        // Parse Multiple Service Response header from CIP data:
510        // [0] = Service Code (0x8A)
511        // [1] = Reserved (0x00)
512        // [2] = General Status (0x00 for success)
513        // [3] = Additional Status Size (0x00)
514        // [4-5] = Number of replies (little endian)
515
516        let service_code = cip_data[0];
517        let general_status = cip_data[2];
518        let num_replies = u16::from_le_bytes([cip_data[4], cip_data[5]]) as usize;
519
520        tracing::debug!(
521            "Multiple Service Response: service=0x{:02X}, status=0x{:02X}, replies={}",
522            service_code,
523            general_status,
524            num_replies
525        );
526
527        if general_status != 0x00 {
528            return Err(crate::error::EtherNetIpError::Protocol(
529                self.describe_multiple_service_error(general_status, operations),
530            ));
531        }
532
533        if num_replies != operations.len() {
534            return Err(crate::error::EtherNetIpError::Protocol(format!(
535                "Reply count mismatch: expected {}, got {}",
536                operations.len(),
537                num_replies
538            )));
539        }
540
541        // Read reply offsets (each is 2 bytes, little endian)
542        let mut reply_offsets = Vec::new();
543        let mut offset = 6; // Skip header
544
545        for _i in 0..num_replies {
546            if offset + 2 > cip_data.len() {
547                return Err(crate::error::EtherNetIpError::Protocol(
548                    "CIP data too short for reply offsets".to_string(),
549                ));
550            }
551            let reply_offset =
552                u16::from_le_bytes([cip_data[offset], cip_data[offset + 1]]) as usize;
553            reply_offsets.push(reply_offset);
554            offset += 2;
555        }
556
557        tracing::trace!("Reply offsets: {:?}", reply_offsets);
558
559        // The reply data starts after all the offsets
560        let reply_base_offset = 6 + (num_replies * 2);
561
562        tracing::trace!("Reply base offset: {}", reply_base_offset);
563
564        // Parse each reply
565        for (i, &reply_offset) in reply_offsets.iter().enumerate() {
566            // Reply offset is relative to position 4 (after service code, reserved, status, additional status size)
567            let reply_start = 4 + reply_offset;
568
569            if reply_start >= cip_data.len() {
570                results.push(Err(BatchError::Other(
571                    "Reply offset beyond CIP data".to_string(),
572                )));
573                continue;
574            }
575
576            // Calculate reply end position
577            let reply_end = if i + 1 < reply_offsets.len() {
578                // Not the last reply - use next reply's offset as boundary
579                4 + reply_offsets[i + 1]
580            } else {
581                // Last reply - goes to end of CIP data
582                cip_data.len()
583            };
584
585            if reply_end > cip_data.len() || reply_start >= reply_end {
586                results.push(Err(BatchError::Other(
587                    "Invalid reply boundaries".to_string(),
588                )));
589                continue;
590            }
591
592            let reply_data = &cip_data[reply_start..reply_end];
593
594            tracing::trace!(
595                "Reply {} at offset {}: start={}, end={}, len={}",
596                i,
597                reply_offset,
598                reply_start,
599                reply_end,
600                reply_data.len()
601            );
602            tracing::trace!("Reply {} data: {:02X?}", i, reply_data);
603
604            let result = self.parse_individual_reply(reply_data, &operations[i]);
605            results.push(result);
606        }
607
608        Ok(results)
609    }
610
611    /// Parses an individual service reply within a Multiple Service Packet response
612    fn parse_individual_reply(
613        &self,
614        reply_data: &[u8],
615        operation: &BatchOperation,
616    ) -> std::result::Result<Option<PlcValue>, BatchError> {
617        if reply_data.len() < 4 {
618            return Err(BatchError::SerializationError(
619                "Reply too short".to_string(),
620            ));
621        }
622
623        tracing::trace!(
624            "Parsing individual reply ({} bytes): {:02X?}",
625            reply_data.len(),
626            reply_data
627        );
628
629        // Each individual reply in Multiple Service Response has the same format as standalone CIP response:
630        // [0] = Service Code (0xCC for read response, 0xCD for write response)
631        // [1] = Reserved (0x00)
632        // [2] = General Status (0x00 for success)
633        // [3] = Additional Status Size (0x00)
634        // [4..] = Response data (for reads) or empty (for writes)
635
636        let service_code = reply_data[0];
637        let general_status = reply_data[2];
638
639        tracing::trace!(
640            "Service code: 0x{:02X}, Status: 0x{:02X}",
641            service_code,
642            general_status
643        );
644
645        if general_status != 0x00 {
646            let error_msg = self.get_cip_error_message(general_status);
647            return Err(BatchError::CipError {
648                status: general_status,
649                message: error_msg,
650            });
651        }
652
653        match operation {
654            BatchOperation::Write { .. } => {
655                // Write operations return no data on success
656                Ok(None)
657            }
658            BatchOperation::Read { .. } => {
659                // Read operations return data starting at offset 4
660                if reply_data.len() < 6 {
661                    return Err(BatchError::SerializationError(
662                        "Read reply too short for data".to_string(),
663                    ));
664                }
665
666                // Parse the data directly (skip the 4-byte header)
667                // Data format: [type_low, type_high, value_bytes...]
668                let data = &reply_data[4..];
669                tracing::trace!("Parsing data ({} bytes): {:02X?}", data.len(), data);
670
671                if data.len() < 2 {
672                    return Err(BatchError::SerializationError(
673                        "Data too short for type".to_string(),
674                    ));
675                }
676
677                let data_type = u16::from_le_bytes([data[0], data[1]]);
678                let value_data = &data[2..];
679
680                tracing::trace!(
681                    "Data type: 0x{:04X}, Value data ({} bytes): {:02X?}",
682                    data_type,
683                    value_data.len(),
684                    value_data
685                );
686
687                if data_type == values::BOOL_ARRAY_DWORD {
688                    if value_data.len() < 4 {
689                        return Err(BatchError::SerializationError(
690                            "Missing packed BOOL array DWORD value".to_string(),
691                        ));
692                    }
693
694                    let packed_value = u32::from_le_bytes([
695                        value_data[0],
696                        value_data[1],
697                        value_data[2],
698                        value_data[3],
699                    ]);
700
701                    if let BatchOperation::Read { tag_name } = operation
702                        && let Some((_base_name, index)) = self.parse_array_element_access(tag_name)
703                    {
704                        let bit_index = index % 32;
705                        let value = (packed_value >> bit_index) & 1 != 0;
706                        tracing::trace!(
707                            "Parsed packed BOOL array element '{}' from DWORD 0x{:08X} using bit {} -> {}",
708                            tag_name,
709                            packed_value,
710                            bit_index,
711                            value
712                        );
713                        return Ok(Some(PlcValue::Bool(value)));
714                    }
715                }
716
717                values::decode_payload(data_type, value_data)
718                    .map(Some)
719                    .map_err(|e| BatchError::SerializationError(e.to_string()))
720            }
721        }
722    }
723}