Skip to main content

rust_ethernet_ip/
plc_manager.rs

1use crate::error::{EtherNetIpError, Result};
2use crate::EipClient;
3use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::time::{Duration, Instant};
6
7/// Configuration for a PLC connection
8#[derive(Debug, Clone)]
9pub struct PlcConfig {
10    /// IP address and port of the PLC
11    pub address: SocketAddr,
12    /// Maximum number of connections to maintain
13    pub max_connections: u32,
14    /// Connection timeout in milliseconds
15    pub connection_timeout: Duration,
16    /// Health check interval in milliseconds
17    pub health_check_interval: Duration,
18    /// Maximum packet size in bytes
19    pub max_packet_size: usize,
20}
21
22impl Default for PlcConfig {
23    fn default() -> Self {
24        Self {
25            address: "127.0.0.1:44818".parse().unwrap(),
26            max_connections: 5,
27            connection_timeout: Duration::from_secs(5),
28            health_check_interval: Duration::from_secs(30),
29            max_packet_size: 4000,
30        }
31    }
32}
33
34/// Represents the health status of a PLC connection
35#[derive(Debug, Clone)]
36pub struct ConnectionHealth {
37    /// Whether the connection is currently active
38    pub is_active: bool,
39    /// Last successful communication timestamp
40    pub last_success: Instant,
41    /// Number of failed attempts since last success
42    pub failed_attempts: u32,
43    /// Current connection latency in milliseconds
44    pub latency: Duration,
45}
46
47/// Represents a connection to a PLC
48#[derive(Debug)]
49pub struct PlcConnection {
50    /// The EIP client instance
51    client: EipClient,
52    /// Health status of the connection
53    health: ConnectionHealth,
54    /// Last time this connection was used
55    last_used: Instant,
56}
57
58impl PlcConnection {
59    /// Creates a new PLC connection
60    pub fn new(client: EipClient) -> Self {
61        Self {
62            client,
63            health: ConnectionHealth {
64                is_active: true,
65                last_success: Instant::now(),
66                failed_attempts: 0,
67                latency: Duration::from_millis(0),
68            },
69            last_used: Instant::now(),
70        }
71    }
72
73    /// Updates the health status of the connection
74    #[allow(dead_code)]
75    pub fn update_health(&mut self, is_active: bool, latency: Duration) {
76        self.health.is_active = is_active;
77        if is_active {
78            self.health.last_success = Instant::now();
79            self.health.failed_attempts = 0;
80            self.health.latency = latency;
81        } else {
82            self.health.failed_attempts += 1;
83        }
84    }
85}
86
87/// Manager for multiple PLC connections
88#[derive(Debug)]
89pub struct PlcManager {
90    /// Configuration for each PLC
91    configs: HashMap<SocketAddr, PlcConfig>,
92    /// Active connections for each PLC
93    connections: HashMap<SocketAddr, Vec<PlcConnection>>,
94    /// Health check interval
95    #[allow(dead_code)]
96    health_check_interval: Duration,
97}
98
99impl Default for PlcManager {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl PlcManager {
106    /// Creates a new PLC manager
107    pub fn new() -> Self {
108        Self {
109            configs: HashMap::new(),
110            connections: HashMap::new(),
111            health_check_interval: Duration::from_secs(30),
112        }
113    }
114
115    /// Adds a PLC configuration
116    pub fn add_plc(&mut self, config: PlcConfig) {
117        self.configs.insert(config.address, config);
118    }
119
120    /// Gets a connection to a PLC
121    pub async fn get_connection(&mut self, address: SocketAddr) -> Result<&mut EipClient> {
122        let config = self
123            .configs
124            .get(&address)
125            .ok_or_else(|| EtherNetIpError::Connection("PLC not configured".to_string()))?;
126
127        // First check if we have any connections for this address
128        if let std::collections::hash_map::Entry::Vacant(e) = self.connections.entry(address) {
129            // No connections exist, create a new one
130            let mut client = EipClient::new(&address.to_string()).await?;
131            client.set_max_packet_size(config.max_packet_size as u32);
132            let mut new_conn = PlcConnection::new(client);
133            new_conn.last_used = Instant::now();
134            e.insert(vec![new_conn]);
135            return Ok(&mut self.connections.get_mut(&address).unwrap()[0].client);
136        }
137
138        // Get mutable access to the connections
139        let connections = self.connections.get_mut(&address).unwrap();
140
141        // First try to find an inactive connection
142        for (i, connection) in connections.iter_mut().enumerate() {
143            if !connection.health.is_active {
144                let mut client = EipClient::new(&address.to_string()).await?;
145                client.set_max_packet_size(config.max_packet_size as u32);
146                connection.client = client;
147                connection.health.is_active = true;
148                connection.health.last_success = Instant::now();
149                connection.health.failed_attempts = 0;
150                connection.health.latency = Duration::from_millis(0);
151                connection.last_used = Instant::now();
152                return Ok(&mut connections[i].client);
153            }
154        }
155
156        // If we have room for more connections, create a new one
157        if connections.len() < config.max_connections as usize {
158            let mut client = EipClient::new(&address.to_string()).await?;
159            client.set_max_packet_size(config.max_packet_size as u32);
160            let mut new_conn = PlcConnection::new(client);
161            new_conn.last_used = Instant::now();
162            connections.push(new_conn);
163            return Ok(&mut connections.last_mut().unwrap().client);
164        }
165
166        // Pool is full: return the least-recently-used existing connection.
167        // Recreating the client here causes avoidable reconnect churn.
168        let lru_index = connections
169            .iter()
170            .enumerate()
171            .min_by_key(|(_, conn)| conn.last_used)
172            .map(|(i, _)| i)
173            .unwrap();
174
175        // Mark usage and return the existing client to maximize reuse.
176        connections[lru_index].last_used = Instant::now();
177        Ok(&mut connections[lru_index].client)
178    }
179
180    /// Performs health checks on all connections
181    pub async fn check_health(&mut self) {
182        for (address, connections) in &mut self.connections {
183            let _config = self.configs.get(address).unwrap();
184
185            for conn in connections.iter_mut() {
186                if !conn.health.is_active {
187                    if let Ok(new_client) = EipClient::new(&address.to_string()).await {
188                        conn.client = new_client;
189                        conn.health.is_active = true;
190                        conn.health.last_success = Instant::now();
191                        conn.health.failed_attempts = 0;
192                        conn.health.latency = Duration::from_millis(0);
193                        conn.last_used = Instant::now();
194                    }
195                }
196            }
197        }
198    }
199
200    /// Removes inactive connections
201    pub fn cleanup_connections(&mut self) {
202        for connections in self.connections.values_mut() {
203            connections.retain(|conn| conn.health.is_active);
204        }
205    }
206
207    pub async fn get_client(&mut self, address: &str) -> Result<&mut EipClient> {
208        let addr = address
209            .parse::<SocketAddr>()
210            .map_err(|_| EtherNetIpError::Connection("Invalid address format".to_string()))?;
211        self.get_connection(addr).await
212    }
213
214    async fn _get_or_create_connection(&mut self, address: &SocketAddr) -> Result<&mut EipClient> {
215        let config = self.configs.get(address).cloned().unwrap();
216        let connections = self.connections.entry(*address).or_default();
217
218        // Try to find an existing inactive connection
219        for (i, connection) in connections.iter_mut().enumerate() {
220            if !connection.health.is_active {
221                let mut client = EipClient::new(&address.to_string()).await?;
222                client.set_max_packet_size(config.max_packet_size as u32);
223                connection.client = client;
224                connection.health.is_active = true;
225                connection.health.last_success = Instant::now();
226                connection.health.failed_attempts = 0;
227                connection.health.latency = Duration::from_millis(0);
228                connection.last_used = Instant::now();
229                return Ok(&mut connections[i].client);
230            }
231        }
232
233        // If we have room for more connections, create a new one
234        if connections.len() < config.max_connections as usize {
235            let mut client = EipClient::new(&address.to_string()).await?;
236            client.set_max_packet_size(config.max_packet_size as u32);
237            let mut new_conn = PlcConnection::new(client);
238            new_conn.last_used = Instant::now();
239            connections.push(new_conn);
240            return Ok(&mut connections.last_mut().unwrap().client);
241        }
242
243        // Find the least recently used connection
244        let lru_index = connections
245            .iter()
246            .enumerate()
247            .min_by_key(|(_, conn)| conn.last_used)
248            .map(|(i, _)| i)
249            .unwrap();
250
251        // Update the LRU connection
252        let mut client = EipClient::new(&address.to_string()).await?;
253        client.set_max_packet_size(config.max_packet_size as u32);
254        connections[lru_index].client = client;
255        connections[lru_index].health.is_active = true;
256        connections[lru_index].health.last_success = Instant::now();
257        connections[lru_index].health.failed_attempts = 0;
258        connections[lru_index].health.latency = Duration::from_millis(0);
259        connections[lru_index].last_used = Instant::now();
260        Ok(&mut connections[lru_index].client)
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn test_plc_config_default() {
270        let config = PlcConfig::default();
271        assert_eq!(config.max_connections, 5);
272        assert_eq!(config.max_packet_size, 4000);
273    }
274
275    #[tokio::test]
276    async fn test_plc_manager_connection_pool() {
277        let mut manager = PlcManager::new();
278        let config = PlcConfig {
279            address: "127.0.0.1:44818".parse().unwrap(),
280            max_connections: 2,
281            ..Default::default()
282        };
283        manager.add_plc(config.clone());
284
285        // This will fail in tests since there's no actual PLC
286        // but it demonstrates the connection pool logic
287        let result = manager.get_connection(config.address).await;
288        assert!(result.is_err());
289    }
290}