use crate::error::{EtherNetIpError, Result};
use crate::EipClient;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PlcConfig {
pub address: SocketAddr,
pub max_connections: u32,
pub connection_timeout: Duration,
pub health_check_interval: Duration,
pub max_packet_size: usize,
}
impl Default for PlcConfig {
fn default() -> Self {
Self {
address: "127.0.0.1:44818".parse().unwrap(),
max_connections: 5,
connection_timeout: Duration::from_secs(5),
health_check_interval: Duration::from_secs(30),
max_packet_size: 4000,
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionHealth {
pub is_active: bool,
pub last_success: Instant,
pub failed_attempts: u32,
pub latency: Duration,
}
#[derive(Debug)]
pub struct PlcConnection {
client: EipClient,
health: ConnectionHealth,
last_used: Instant,
}
impl PlcConnection {
pub fn new(client: EipClient) -> Self {
Self {
client,
health: ConnectionHealth {
is_active: true,
last_success: Instant::now(),
failed_attempts: 0,
latency: Duration::from_millis(0),
},
last_used: Instant::now(),
}
}
#[allow(dead_code)]
pub fn update_health(&mut self, is_active: bool, latency: Duration) {
self.health.is_active = is_active;
if is_active {
self.health.last_success = Instant::now();
self.health.failed_attempts = 0;
self.health.latency = latency;
} else {
self.health.failed_attempts += 1;
}
}
}
#[derive(Debug)]
pub struct PlcManager {
configs: HashMap<SocketAddr, PlcConfig>,
connections: HashMap<SocketAddr, Vec<PlcConnection>>,
#[allow(dead_code)]
health_check_interval: Duration,
}
impl Default for PlcManager {
fn default() -> Self {
Self::new()
}
}
impl PlcManager {
pub fn new() -> Self {
Self {
configs: HashMap::new(),
connections: HashMap::new(),
health_check_interval: Duration::from_secs(30),
}
}
pub fn add_plc(&mut self, config: PlcConfig) {
self.configs.insert(config.address, config);
}
pub async fn get_connection(&mut self, address: SocketAddr) -> Result<&mut EipClient> {
let config = self
.configs
.get(&address)
.ok_or_else(|| EtherNetIpError::Connection("PLC not configured".to_string()))?;
if let std::collections::hash_map::Entry::Vacant(e) = self.connections.entry(address) {
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
let mut new_conn = PlcConnection::new(client);
new_conn.last_used = Instant::now();
e.insert(vec![new_conn]);
return Ok(&mut self.connections.get_mut(&address).unwrap()[0].client);
}
let connections = self.connections.get_mut(&address).unwrap();
for (i, connection) in connections.iter_mut().enumerate() {
if !connection.health.is_active {
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
connection.client = client;
connection.health.is_active = true;
connection.health.last_success = Instant::now();
connection.health.failed_attempts = 0;
connection.health.latency = Duration::from_millis(0);
connection.last_used = Instant::now();
return Ok(&mut connections[i].client);
}
}
if connections.len() < config.max_connections as usize {
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
let mut new_conn = PlcConnection::new(client);
new_conn.last_used = Instant::now();
connections.push(new_conn);
return Ok(&mut connections.last_mut().unwrap().client);
}
let lru_index = connections
.iter()
.enumerate()
.min_by_key(|(_, conn)| conn.last_used)
.map(|(i, _)| i)
.unwrap();
connections[lru_index].last_used = Instant::now();
Ok(&mut connections[lru_index].client)
}
pub async fn check_health(&mut self) {
for (address, connections) in &mut self.connections {
let _config = self.configs.get(address).unwrap();
for conn in connections.iter_mut() {
if !conn.health.is_active {
if let Ok(new_client) = EipClient::new(&address.to_string()).await {
conn.client = new_client;
conn.health.is_active = true;
conn.health.last_success = Instant::now();
conn.health.failed_attempts = 0;
conn.health.latency = Duration::from_millis(0);
conn.last_used = Instant::now();
}
}
}
}
}
pub fn cleanup_connections(&mut self) {
for connections in self.connections.values_mut() {
connections.retain(|conn| conn.health.is_active);
}
}
pub async fn get_client(&mut self, address: &str) -> Result<&mut EipClient> {
let addr = address
.parse::<SocketAddr>()
.map_err(|_| EtherNetIpError::Connection("Invalid address format".to_string()))?;
self.get_connection(addr).await
}
async fn _get_or_create_connection(&mut self, address: &SocketAddr) -> Result<&mut EipClient> {
let config = self.configs.get(address).cloned().unwrap();
let connections = self.connections.entry(*address).or_default();
for (i, connection) in connections.iter_mut().enumerate() {
if !connection.health.is_active {
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
connection.client = client;
connection.health.is_active = true;
connection.health.last_success = Instant::now();
connection.health.failed_attempts = 0;
connection.health.latency = Duration::from_millis(0);
connection.last_used = Instant::now();
return Ok(&mut connections[i].client);
}
}
if connections.len() < config.max_connections as usize {
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
let mut new_conn = PlcConnection::new(client);
new_conn.last_used = Instant::now();
connections.push(new_conn);
return Ok(&mut connections.last_mut().unwrap().client);
}
let lru_index = connections
.iter()
.enumerate()
.min_by_key(|(_, conn)| conn.last_used)
.map(|(i, _)| i)
.unwrap();
let mut client = EipClient::new(&address.to_string()).await?;
client.set_max_packet_size(config.max_packet_size as u32);
connections[lru_index].client = client;
connections[lru_index].health.is_active = true;
connections[lru_index].health.last_success = Instant::now();
connections[lru_index].health.failed_attempts = 0;
connections[lru_index].health.latency = Duration::from_millis(0);
connections[lru_index].last_used = Instant::now();
Ok(&mut connections[lru_index].client)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_plc_config_default() {
let config = PlcConfig::default();
assert_eq!(config.max_connections, 5);
assert_eq!(config.max_packet_size, 4000);
}
#[tokio::test]
async fn test_plc_manager_connection_pool() {
let mut manager = PlcManager::new();
let config = PlcConfig {
address: "127.0.0.1:44818".parse().unwrap(),
max_connections: 2,
..Default::default()
};
manager.add_plc(config.clone());
let result = manager.get_connection(config.address).await;
assert!(result.is_err());
}
}