Skip to main content

rust_ethernet_ip/
fleet.rs

1use crate::client::{Client, ConnectionEvent};
2use crate::error::Result;
3use crate::route::RoutePath;
4use std::collections::HashMap;
5use std::hash::Hash;
6use tokio::sync::broadcast;
7
8/// Fleet-level connection event annotated with the PLC identifier.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct FleetEvent<PlcId> {
11    pub plc_id: PlcId,
12    pub event: ConnectionEvent,
13}
14
15/// Multi-PLC pool built from actor-backed [`Client`] handles.
16#[derive(Debug)]
17pub struct Fleet<PlcId> {
18    clients: HashMap<PlcId, Client>,
19    events: broadcast::Sender<FleetEvent<PlcId>>,
20}
21
22impl<PlcId> Default for Fleet<PlcId>
23where
24    PlcId: Clone + Eq + Hash + Send + Sync + 'static,
25{
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl<PlcId> Fleet<PlcId>
32where
33    PlcId: Clone + Eq + Hash + Send + Sync + 'static,
34{
35    /// Creates an empty fleet.
36    #[must_use]
37    pub fn new() -> Self {
38        let (events, _) = broadcast::channel(128);
39        Self {
40            clients: HashMap::new(),
41            events,
42        }
43    }
44
45    /// Connects and adds one PLC by address.
46    pub async fn connect(&mut self, plc_id: PlcId, addr: &str) -> Result<Client> {
47        let client = Client::connect(addr).await?;
48        self.insert_client(plc_id, client.clone());
49        Ok(client)
50    }
51
52    /// Connects and adds one routed PLC by address and route path.
53    pub async fn connect_with_route(
54        &mut self,
55        plc_id: PlcId,
56        addr: &str,
57        route: RoutePath,
58    ) -> Result<Client> {
59        let client = Client::with_route_path(addr, route).await?;
60        self.insert_client(plc_id, client.clone());
61        Ok(client)
62    }
63
64    /// Inserts an existing actor client into the fleet.
65    pub fn insert_client(&mut self, plc_id: PlcId, client: Client) -> Option<Client> {
66        self.forward_events(plc_id.clone(), client.clone());
67        self.clients.insert(plc_id, client)
68    }
69
70    /// Returns a cloneable client handle for one PLC.
71    #[must_use]
72    pub fn client(&self, plc_id: &PlcId) -> Option<Client> {
73        self.clients.get(plc_id).cloned()
74    }
75
76    /// Subscribes to fleet-level connection events.
77    pub fn events(&self) -> broadcast::Receiver<FleetEvent<PlcId>> {
78        self.events.subscribe()
79    }
80
81    /// Performs a health check against every PLC currently in the fleet.
82    pub async fn check_health(&self) -> HashMap<PlcId, Result<bool>> {
83        let mut health = HashMap::with_capacity(self.clients.len());
84        for (plc_id, client) in &self.clients {
85            health.insert(plc_id.clone(), client.check_health().await);
86        }
87        health
88    }
89
90    /// Returns the number of PLCs in the fleet.
91    #[must_use]
92    pub fn len(&self) -> usize {
93        self.clients.len()
94    }
95
96    /// Returns true when the fleet has no PLC clients.
97    #[must_use]
98    pub fn is_empty(&self) -> bool {
99        self.clients.is_empty()
100    }
101
102    fn forward_events(&self, plc_id: PlcId, client: Client) {
103        let events = self.events.clone();
104        tokio::spawn(async move {
105            let mut client_events = client.events();
106            while let Ok(event) = client_events.recv().await {
107                let _ = events.send(FleetEvent {
108                    plc_id: plc_id.clone(),
109                    event,
110                });
111            }
112        });
113    }
114}