rust_ethernet_ip/client/batch_exec.rs
1use super::EipClient;
2use crate::batch::{BatchConfig, BatchError, BatchOperation, BatchResult};
3use crate::protocol::values;
4use crate::types::PlcValue;
5use tokio::time::Instant;
6
7impl EipClient {
8 // =========================================================================
9 // BATCH OPERATIONS IMPLEMENTATION
10 // =========================================================================
11
12 /// Executes a batch of read and write operations
13 ///
14 /// This is the main entry point for batch operations. It takes a slice of
15 /// `BatchOperation` items and executes them efficiently by grouping them
16 /// into optimal CIP packets based on the current `BatchConfig`.
17 ///
18 /// # Arguments
19 ///
20 /// * `operations` - A slice of operations to execute
21 ///
22 /// # Returns
23 ///
24 /// A vector of [`BatchResult`] items, one per executed operation.
25 ///
26 /// When `optimize_packet_packing` is enabled, operations may be regrouped
27 /// by type for execution, so result order is not guaranteed to match the
28 /// original mixed-operation input order. Use [`BatchResult::operation`] to
29 /// correlate each result.
30 ///
31 /// # Performance
32 ///
33 /// Batch execution primarily reduces round trips by combining multiple
34 /// operations into fewer requests. Observed throughput varies significantly
35 /// between simulator and real hardware, and also depends on packet sizing,
36 /// controller model, route path, and tag mix.
37 ///
38 /// # Examples
39 ///
40 /// ```rust,no_run
41 /// use rust_ethernet_ip::{EipClient, BatchOperation, PlcValue};
42 ///
43 /// #[tokio::main]
44 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
45 /// let mut client = EipClient::connect("192.168.1.100:44818").await?;
46 ///
47 /// let operations = vec![
48 /// BatchOperation::Read { tag_name: "Motor1_Speed".to_string() },
49 /// BatchOperation::Read { tag_name: "Motor2_Speed".to_string() },
50 /// BatchOperation::Write {
51 /// tag_name: "SetPoint".to_string(),
52 /// value: PlcValue::Dint(1500)
53 /// },
54 /// ];
55 ///
56 /// let results = client.execute_batch(&operations).await?;
57 ///
58 /// for result in results {
59 /// match result.result {
60 /// Ok(Some(value)) => println!("Read value: {:?}", value),
61 /// Ok(None) => println!("Write successful"),
62 /// Err(e) => println!("Operation failed: {}", e),
63 /// }
64 /// }
65 ///
66 /// Ok(())
67 /// }
68 /// ```
69 pub async fn execute_batch(
70 &mut self,
71 operations: &[BatchOperation],
72 ) -> crate::error::Result<Vec<BatchResult>> {
73 if operations.is_empty() {
74 return Ok(Vec::new());
75 }
76
77 let start_time = Instant::now();
78 tracing::debug!(
79 "[BATCH] Starting batch execution with {} operations",
80 operations.len()
81 );
82
83 // Group operations based on configuration
84 let operation_groups = if self.batch_config.optimize_packet_packing {
85 self.optimize_operation_groups(operations)
86 } else {
87 self.sequential_operation_groups(operations)
88 };
89
90 let mut all_results = Vec::with_capacity(operations.len());
91
92 // Execute each group
93 for (group_index, group) in operation_groups.iter().enumerate() {
94 tracing::debug!(
95 "[BATCH] Processing group {} with {} operations",
96 group_index + 1,
97 group.len()
98 );
99
100 match self.execute_operation_group(group).await {
101 Ok(mut group_results) => {
102 all_results.append(&mut group_results);
103 }
104 Err(e) => {
105 if !self.batch_config.continue_on_error {
106 return Err(e);
107 }
108
109 // Create error results for this group
110 for op in group {
111 let error_result = BatchResult {
112 operation: op.clone(),
113 result: Err(BatchError::NetworkError(e.to_string())),
114 execution_time_us: 0,
115 };
116 all_results.push(error_result);
117 }
118 }
119 }
120 }
121
122 let total_time = start_time.elapsed();
123 tracing::info!(
124 "[BATCH] Completed batch execution in {:?} - {} operations processed",
125 total_time,
126 all_results.len()
127 );
128
129 Ok(all_results)
130 }
131
132 /// Reads multiple tags in a single batch operation
133 ///
134 /// This is a convenience method for read-only batch operations.
135 /// It's optimized for reading many tags at once.
136 ///
137 /// # Arguments
138 ///
139 /// * `tag_names` - A slice of tag names to read
140 ///
141 /// # Returns
142 ///
143 /// A vector of tuples containing `(tag_name, result)` pairs
144 ///
145 /// # Examples
146 ///
147 /// ```rust,no_run
148 /// use rust_ethernet_ip::EipClient;
149 ///
150 /// #[tokio::main]
151 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
152 /// let mut client = EipClient::connect("192.168.1.100:44818").await?;
153 ///
154 /// let tags = ["Motor1_Speed", "Motor2_Speed", "Temperature", "Pressure"];
155 /// let results = client.read_tags_batch(&tags).await?;
156 ///
157 /// for (tag_name, result) in results {
158 /// match result {
159 /// Ok(value) => println!("{}: {:?}", tag_name, value),
160 /// Err(e) => println!("{}: Error - {}", tag_name, e),
161 /// }
162 /// }
163 ///
164 /// Ok(())
165 /// }
166 /// ```
167 pub async fn read_tags_batch(
168 &mut self,
169 tag_names: &[&str],
170 ) -> crate::error::Result<Vec<(String, std::result::Result<PlcValue, BatchError>)>> {
171 let operations: Vec<BatchOperation> = tag_names
172 .iter()
173 .map(|&name| BatchOperation::Read {
174 tag_name: name.to_string(),
175 })
176 .collect();
177
178 let results = self.execute_batch(&operations).await?;
179
180 Ok(results
181 .into_iter()
182 .map(|result| {
183 let tag_name = match &result.operation {
184 BatchOperation::Read { tag_name } => tag_name.clone(),
185 BatchOperation::Write { .. } => {
186 unreachable!("Should only have read operations")
187 }
188 };
189
190 let value_result = match result.result {
191 Ok(Some(value)) => Ok(value),
192 Ok(None) => Err(BatchError::Other(
193 "Unexpected None result for read operation".to_string(),
194 )),
195 Err(e) => Err(e),
196 };
197
198 (tag_name, value_result)
199 })
200 .collect())
201 }
202
203 /// Writes multiple tag values in a single batch operation
204 ///
205 /// This is a convenience method for write-only batch operations.
206 /// It's optimized for writing many values at once.
207 ///
208 /// # Arguments
209 ///
210 /// * `tag_values` - A slice of `(tag_name, value)` tuples to write
211 ///
212 /// # Returns
213 ///
214 /// A vector of tuples containing `(tag_name, result)` pairs
215 ///
216 /// # Examples
217 ///
218 /// ```rust,no_run
219 /// use rust_ethernet_ip::{EipClient, PlcValue};
220 ///
221 /// #[tokio::main]
222 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223 /// let mut client = EipClient::connect("192.168.1.100:44818").await?;
224 ///
225 /// let writes = vec![
226 /// ("SetPoint1", PlcValue::Bool(true)),
227 /// ("SetPoint2", PlcValue::Dint(2000)),
228 /// ("EnableFlag", PlcValue::Bool(true)),
229 /// ];
230 ///
231 /// let results = client.write_tags_batch(&writes).await?;
232 ///
233 /// for (tag_name, result) in results {
234 /// match result {
235 /// Ok(_) => println!("{}: Write successful", tag_name),
236 /// Err(e) => println!("{}: Write failed - {}", tag_name, e),
237 /// }
238 /// }
239 ///
240 /// Ok(())
241 /// }
242 /// ```
243 pub async fn write_tags_batch(
244 &mut self,
245 tag_values: &[(&str, PlcValue)],
246 ) -> crate::error::Result<Vec<(String, std::result::Result<(), BatchError>)>> {
247 let operations: Vec<BatchOperation> = tag_values
248 .iter()
249 .map(|(name, value)| BatchOperation::Write {
250 tag_name: name.to_string(),
251 value: value.clone(),
252 })
253 .collect();
254
255 let results = self.execute_batch(&operations).await?;
256
257 Ok(results
258 .into_iter()
259 .map(|result| {
260 let tag_name = match &result.operation {
261 BatchOperation::Write { tag_name, .. } => tag_name.clone(),
262 BatchOperation::Read { .. } => {
263 unreachable!("Should only have write operations")
264 }
265 };
266
267 let write_result = match result.result {
268 Ok(None) => Ok(()),
269 Ok(Some(_)) => Err(BatchError::Other(
270 "Unexpected value result for write operation".to_string(),
271 )),
272 Err(e) => Err(e),
273 };
274
275 (tag_name, write_result)
276 })
277 .collect())
278 }
279
280 /// Configures batch operation settings
281 ///
282 /// This method allows fine-tuning of batch operation behavior,
283 /// including performance optimizations and error handling.
284 ///
285 /// # Arguments
286 ///
287 /// * `config` - The new batch configuration to use
288 ///
289 /// # Examples
290 ///
291 /// ```rust,no_run
292 /// use rust_ethernet_ip::{EipClient, BatchConfig};
293 ///
294 /// #[tokio::main]
295 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
296 /// let mut client = EipClient::connect("192.168.1.100:44818").await?;
297 ///
298 /// let config = BatchConfig {
299 /// max_operations_per_packet: 50,
300 /// max_packet_size: 1500,
301 /// packet_timeout_ms: 5000,
302 /// continue_on_error: false,
303 /// optimize_packet_packing: true,
304 /// };
305 ///
306 /// client.configure_batch_operations(config);
307 ///
308 /// Ok(())
309 /// }
310 /// ```
311 pub fn configure_batch_operations(&mut self, config: BatchConfig) {
312 self.batch_config = config;
313 tracing::debug!(
314 "[BATCH] Updated batch configuration: max_ops={}, max_size={}, timeout={}ms",
315 self.batch_config.max_operations_per_packet,
316 self.batch_config.max_packet_size,
317 self.batch_config.packet_timeout_ms
318 );
319 }
320
321 /// Gets current batch operation configuration
322 pub fn get_batch_config(&self) -> &BatchConfig {
323 &self.batch_config
324 }
325
326 // =========================================================================
327 // INTERNAL BATCH OPERATION HELPERS
328 // =========================================================================
329
330 /// Groups operations optimally for batch processing
331 fn optimize_operation_groups(&self, operations: &[BatchOperation]) -> Vec<Vec<BatchOperation>> {
332 let mut groups = Vec::new();
333 let mut reads = Vec::new();
334 let mut writes = Vec::new();
335
336 // Separate reads and writes
337 for op in operations {
338 match op {
339 BatchOperation::Read { .. } => reads.push(op.clone()),
340 BatchOperation::Write { .. } => writes.push(op.clone()),
341 }
342 }
343
344 // Group reads
345 for chunk in reads.chunks(self.batch_config.max_operations_per_packet) {
346 groups.push(chunk.to_vec());
347 }
348
349 // Group writes
350 for chunk in writes.chunks(self.batch_config.max_operations_per_packet) {
351 groups.push(chunk.to_vec());
352 }
353
354 groups
355 }
356
357 /// Groups operations sequentially (preserves order)
358 fn sequential_operation_groups(
359 &self,
360 operations: &[BatchOperation],
361 ) -> Vec<Vec<BatchOperation>> {
362 operations
363 .chunks(self.batch_config.max_operations_per_packet)
364 .map(|chunk| chunk.to_vec())
365 .collect()
366 }
367
368 /// Executes a single group of operations as a CIP Multiple Service Packet
369 async fn execute_operation_group(
370 &mut self,
371 operations: &[BatchOperation],
372 ) -> crate::error::Result<Vec<BatchResult>> {
373 let start_time = Instant::now();
374 let mut results = Vec::with_capacity(operations.len());
375
376 // Build Multiple Service Packet request
377 let cip_request = self.build_multiple_service_packet(operations)?;
378
379 // Send request and get response
380 let response = self.send_cip_request(&cip_request).await?;
381
382 // Parse response and create results
383 let parsed_results = self.parse_multiple_service_response(&response, operations)?;
384
385 let execution_time = start_time.elapsed();
386
387 // Create BatchResult objects
388 for (i, operation) in operations.iter().enumerate() {
389 let op_execution_time = execution_time.as_micros() as u64 / operations.len() as u64;
390
391 let result = if i < parsed_results.len() {
392 match &parsed_results[i] {
393 Ok(value) => Ok(value.clone()),
394 Err(e) => Err(e.clone()),
395 }
396 } else {
397 Err(BatchError::Other(
398 "Missing result from response".to_string(),
399 ))
400 };
401
402 results.push(BatchResult {
403 operation: operation.clone(),
404 result,
405 execution_time_us: op_execution_time,
406 });
407 }
408
409 Ok(results)
410 }
411
412 /// Builds a CIP Multiple Service Packet request
413 fn build_multiple_service_packet(
414 &self,
415 operations: &[BatchOperation],
416 ) -> crate::error::Result<Vec<u8>> {
417 let mut packet = Vec::with_capacity(8 + (operations.len() * 2));
418
419 // Multiple Service Packet service code
420 packet.push(0x0A);
421
422 // Request path (2 bytes for class 0x02, instance 1)
423 packet.push(0x02); // Path size in words
424 packet.push(0x20); // Class segment
425 packet.push(0x02); // Class 0x02 (Message Router)
426 packet.push(0x24); // Instance segment
427 packet.push(0x01); // Instance 1
428
429 // Number of services
430 packet.extend_from_slice(&(operations.len() as u16).to_le_bytes());
431
432 // Calculate offset table
433 let mut service_requests = Vec::with_capacity(operations.len());
434 let mut current_offset = 2 + (operations.len() * 2); // Start after offset table
435
436 for operation in operations {
437 // Build individual service request
438 let service_request = match operation {
439 BatchOperation::Read { tag_name } => self.build_read_request(tag_name)?,
440 BatchOperation::Write { tag_name, value } => {
441 self.build_write_request(tag_name, value)?
442 }
443 };
444
445 service_requests.push(service_request);
446 }
447
448 // Add offset table
449 for service_request in &service_requests {
450 packet.extend_from_slice(&(current_offset as u16).to_le_bytes());
451 current_offset += service_request.len();
452 }
453
454 // Add service requests
455 for service_request in service_requests {
456 packet.extend_from_slice(&service_request);
457 }
458
459 tracing::trace!(
460 "[BATCH] Built Multiple Service Packet ({} bytes, {} services)",
461 packet.len(),
462 operations.len()
463 );
464
465 Ok(packet)
466 }
467
468 /// Parses a Multiple Service Packet response
469 fn parse_multiple_service_response(
470 &self,
471 response: &[u8],
472 operations: &[BatchOperation],
473 ) -> crate::error::Result<Vec<std::result::Result<Option<PlcValue>, BatchError>>> {
474 if response.len() < 6 {
475 return Err(crate::error::EtherNetIpError::Protocol(
476 "Response too short for Multiple Service Packet".to_string(),
477 ));
478 }
479
480 let mut results = Vec::new();
481
482 tracing::trace!(
483 "Raw Multiple Service Response ({} bytes): {:02X?}",
484 response.len(),
485 response
486 );
487
488 // First, extract the CIP data from the EtherNet/IP response
489 let cip_data = match self.extract_cip_from_response(response) {
490 Ok(data) => data,
491 Err(e) => {
492 tracing::error!("Failed to extract CIP data: {}", e);
493 return Err(e);
494 }
495 };
496
497 tracing::trace!(
498 "Extracted CIP data ({} bytes): {:02X?}",
499 cip_data.len(),
500 cip_data
501 );
502
503 if cip_data.len() < 6 {
504 return Err(crate::error::EtherNetIpError::Protocol(
505 "CIP data too short for Multiple Service Response".to_string(),
506 ));
507 }
508
509 // Parse Multiple Service Response header from CIP data:
510 // [0] = Service Code (0x8A)
511 // [1] = Reserved (0x00)
512 // [2] = General Status (0x00 for success)
513 // [3] = Additional Status Size (0x00)
514 // [4-5] = Number of replies (little endian)
515
516 let service_code = cip_data[0];
517 let general_status = cip_data[2];
518 let num_replies = u16::from_le_bytes([cip_data[4], cip_data[5]]) as usize;
519
520 tracing::debug!(
521 "Multiple Service Response: service=0x{:02X}, status=0x{:02X}, replies={}",
522 service_code,
523 general_status,
524 num_replies
525 );
526
527 if general_status != 0x00 {
528 return Err(crate::error::EtherNetIpError::Protocol(
529 self.describe_multiple_service_error(general_status, operations),
530 ));
531 }
532
533 if num_replies != operations.len() {
534 return Err(crate::error::EtherNetIpError::Protocol(format!(
535 "Reply count mismatch: expected {}, got {}",
536 operations.len(),
537 num_replies
538 )));
539 }
540
541 // Read reply offsets (each is 2 bytes, little endian)
542 let mut reply_offsets = Vec::new();
543 let mut offset = 6; // Skip header
544
545 for _i in 0..num_replies {
546 if offset + 2 > cip_data.len() {
547 return Err(crate::error::EtherNetIpError::Protocol(
548 "CIP data too short for reply offsets".to_string(),
549 ));
550 }
551 let reply_offset =
552 u16::from_le_bytes([cip_data[offset], cip_data[offset + 1]]) as usize;
553 reply_offsets.push(reply_offset);
554 offset += 2;
555 }
556
557 tracing::trace!("Reply offsets: {:?}", reply_offsets);
558
559 // The reply data starts after all the offsets
560 let reply_base_offset = 6 + (num_replies * 2);
561
562 tracing::trace!("Reply base offset: {}", reply_base_offset);
563
564 // Parse each reply
565 for (i, &reply_offset) in reply_offsets.iter().enumerate() {
566 // Reply offset is relative to position 4 (after service code, reserved, status, additional status size)
567 let reply_start = 4 + reply_offset;
568
569 if reply_start >= cip_data.len() {
570 results.push(Err(BatchError::Other(
571 "Reply offset beyond CIP data".to_string(),
572 )));
573 continue;
574 }
575
576 // Calculate reply end position
577 let reply_end = if i + 1 < reply_offsets.len() {
578 // Not the last reply - use next reply's offset as boundary
579 4 + reply_offsets[i + 1]
580 } else {
581 // Last reply - goes to end of CIP data
582 cip_data.len()
583 };
584
585 if reply_end > cip_data.len() || reply_start >= reply_end {
586 results.push(Err(BatchError::Other(
587 "Invalid reply boundaries".to_string(),
588 )));
589 continue;
590 }
591
592 let reply_data = &cip_data[reply_start..reply_end];
593
594 tracing::trace!(
595 "Reply {} at offset {}: start={}, end={}, len={}",
596 i,
597 reply_offset,
598 reply_start,
599 reply_end,
600 reply_data.len()
601 );
602 tracing::trace!("Reply {} data: {:02X?}", i, reply_data);
603
604 let result = self.parse_individual_reply(reply_data, &operations[i]);
605 results.push(result);
606 }
607
608 Ok(results)
609 }
610
611 /// Parses an individual service reply within a Multiple Service Packet response
612 fn parse_individual_reply(
613 &self,
614 reply_data: &[u8],
615 operation: &BatchOperation,
616 ) -> std::result::Result<Option<PlcValue>, BatchError> {
617 if reply_data.len() < 4 {
618 return Err(BatchError::SerializationError(
619 "Reply too short".to_string(),
620 ));
621 }
622
623 tracing::trace!(
624 "Parsing individual reply ({} bytes): {:02X?}",
625 reply_data.len(),
626 reply_data
627 );
628
629 // Each individual reply in Multiple Service Response has the same format as standalone CIP response:
630 // [0] = Service Code (0xCC for read response, 0xCD for write response)
631 // [1] = Reserved (0x00)
632 // [2] = General Status (0x00 for success)
633 // [3] = Additional Status Size (0x00)
634 // [4..] = Response data (for reads) or empty (for writes)
635
636 let service_code = reply_data[0];
637 let general_status = reply_data[2];
638
639 tracing::trace!(
640 "Service code: 0x{:02X}, Status: 0x{:02X}",
641 service_code,
642 general_status
643 );
644
645 if general_status != 0x00 {
646 let error_msg = self.get_cip_error_message(general_status);
647 return Err(BatchError::CipError {
648 status: general_status,
649 message: error_msg,
650 });
651 }
652
653 match operation {
654 BatchOperation::Write { .. } => {
655 // Write operations return no data on success
656 Ok(None)
657 }
658 BatchOperation::Read { .. } => {
659 // Read operations return data starting at offset 4
660 if reply_data.len() < 6 {
661 return Err(BatchError::SerializationError(
662 "Read reply too short for data".to_string(),
663 ));
664 }
665
666 // Parse the data directly (skip the 4-byte header)
667 // Data format: [type_low, type_high, value_bytes...]
668 let data = &reply_data[4..];
669 tracing::trace!("Parsing data ({} bytes): {:02X?}", data.len(), data);
670
671 if data.len() < 2 {
672 return Err(BatchError::SerializationError(
673 "Data too short for type".to_string(),
674 ));
675 }
676
677 let data_type = u16::from_le_bytes([data[0], data[1]]);
678 let value_data = &data[2..];
679
680 tracing::trace!(
681 "Data type: 0x{:04X}, Value data ({} bytes): {:02X?}",
682 data_type,
683 value_data.len(),
684 value_data
685 );
686
687 if data_type == values::BOOL_ARRAY_DWORD {
688 if value_data.len() < 4 {
689 return Err(BatchError::SerializationError(
690 "Missing packed BOOL array DWORD value".to_string(),
691 ));
692 }
693
694 let packed_value = u32::from_le_bytes([
695 value_data[0],
696 value_data[1],
697 value_data[2],
698 value_data[3],
699 ]);
700
701 if let BatchOperation::Read { tag_name } = operation
702 && let Some((_base_name, index)) = self.parse_array_element_access(tag_name)
703 {
704 let bit_index = index % 32;
705 let value = (packed_value >> bit_index) & 1 != 0;
706 tracing::trace!(
707 "Parsed packed BOOL array element '{}' from DWORD 0x{:08X} using bit {} -> {}",
708 tag_name,
709 packed_value,
710 bit_index,
711 value
712 );
713 return Ok(Some(PlcValue::Bool(value)));
714 }
715 }
716
717 values::decode_payload(data_type, value_data)
718 .map(Some)
719 .map_err(|e| BatchError::SerializationError(e.to_string()))
720 }
721 }
722 }
723}