genja_core/inventory/connections.rs
1use super::ResolvedConnectionParams;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::any::Any;
5use std::collections::HashMap;
6use std::fmt;
7use std::sync::{Arc, RwLock};
8use tokio::sync::Mutex;
9
10#[async_trait]
11pub trait Connection
12where
13 Self: Any + Send + Sync + fmt::Debug,
14{
15 fn create(&self, key: &ConnectionKey) -> Box<dyn Connection>;
16
17 fn is_alive(&self) -> bool;
18
19 async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String>;
20
21 async fn execute_command(&mut self, _command: &str) -> Result<String, String> {
22 Err("connection does not implement execute_command".to_string())
23 }
24
25 fn close(&mut self) -> ConnectionKey;
26}
27
28/// A unique identifier for a connection in the connection manager.
29///
30/// `ConnectionKey` serves as a composite key for looking up and managing connections
31/// in the `ConnectionManager`. It combines a hostname with a connection plugin name to
32/// uniquely identify a specific connection instance. This allows the same host to have
33/// multiple concurrent connections handled by different plugins (e.g., SSH, NETCONF, HTTP).
34///
35/// The struct implements `Hash` and `Eq` to enable its use as a key in hash-based
36/// collections like `HashMap` and `DashMap`.
37///
38/// # Hash Function Behavior
39///
40/// When inserting a `ConnectionKey` into a hash-based collection (like `DashMap` in
41/// `ConnectionManager`), the hash function is used to:
42///
43/// 1. **Compute Hash Value**: Both `hostname` and `plugin_name` fields are hashed
44/// together to produce a single hash value. This is done automatically by Rust's
45/// derive macro for `Hash`, which hashes each field in declaration order.
46///
47/// 2. **Determine Bucket**: The hash value is used to determine which internal bucket
48/// in the hash map should store this key-value pair. This enables O(1) average-case
49/// lookup performance.
50///
51/// 3. **Handle Collisions**: If two different keys produce the same hash value (a hash
52/// collision), the `Eq` implementation is used to distinguish between them. The
53/// collection stores multiple entries in the same bucket and uses `Eq` to find the
54/// exact match.
55///
56/// 4. **Enable Deduplication**: When inserting with the same `hostname` and
57/// `plugin_name`, the hash function ensures the key maps to the same bucket,
58/// and `Eq` confirms it's the same key, allowing the collection to update the
59/// existing entry rather than creating a duplicate.
60///
61/// # Fields
62///
63/// * `hostname` - The hostname or IP address of the target device. This identifies
64/// the remote endpoint for the connection.
65/// * `plugin_name` - The connection plugin name (e.g., "ssh", "netconf", "http").
66/// This distinguishes between different connection plugin types to the same host.
67///
68/// # Examples
69///
70/// ## Basic Usage
71///
72/// ```
73/// # use genja_core::inventory::ConnectionKey;
74/// let key = ConnectionKey::new("10.0.0.1", "ssh");
75/// assert_eq!(key.hostname, "10.0.0.1");
76/// assert_eq!(key.plugin_name, "ssh");
77/// ```
78///
79/// ## Multiple Connection Plugins per Host
80///
81/// ```
82/// # use genja_core::inventory::ConnectionKey;
83/// use std::collections::HashMap;
84///
85/// let mut connections = HashMap::new();
86/// let ssh_key = ConnectionKey::new("router1", "ssh");
87/// let netconf_key = ConnectionKey::new("router1", "netconf");
88///
89/// // Same host can have different connection plugins
90/// // Each key produces a different hash due to different plugin_name
91/// connections.insert(ssh_key, "SSH connection");
92/// connections.insert(netconf_key, "NETCONF connection");
93/// assert_eq!(connections.len(), 2);
94/// ```
95///
96/// ## Key Equality and Deduplication
97///
98/// ```
99/// # use genja_core::inventory::ConnectionKey;
100/// use std::collections::HashMap;
101///
102/// let mut connections = HashMap::new();
103/// let key1 = ConnectionKey::new("router1", "ssh");
104/// let key2 = ConnectionKey::new("router1", "ssh");
105///
106/// // Both keys have the same hostname and plugin_name
107/// // They produce the same hash and are equal via Eq
108/// connections.insert(key1, "First connection");
109/// connections.insert(key2, "Second connection"); // Replaces first
110/// assert_eq!(connections.len(), 1);
111/// assert_eq!(connections.values().next(), Some(&"Second connection"));
112/// ```
113///
114/// ## Hash-Based Lookup in ConnectionManager
115///
116/// ```
117/// # use genja_core::inventory::{ConnectionKey, ConnectionManager};
118/// let manager = ConnectionManager::default();
119/// let key = ConnectionKey::new("router1", "ssh");
120///
121/// // The hash function enables fast lookup:
122/// // 1. Hash is computed from key
123/// // 2. Hash determines which bucket to search
124/// // 3. Eq is used to find exact match in bucket
125/// if let Some(connection) = manager.get(&key) {
126/// println!("Found existing connection");
127/// }
128/// ```
129#[derive(Debug, Clone, Eq, PartialEq, Hash)]
130pub struct ConnectionKey {
131 pub hostname: String,
132 pub plugin_name: String,
133}
134
135impl ConnectionKey {
136 /// Creates a new `ConnectionKey` from a hostname and plugin name.
137 ///
138 /// This constructor provides a convenient way to create a connection key by accepting
139 /// any type that can be converted into a `String` for both the hostname and connection
140 /// type parameters. This allows passing `&str`, `String`, or other string-like types
141 /// without explicit conversion.
142 ///
143 /// The resulting key uniquely identifies a connection in the `ConnectionManager` by
144 /// combining the target hostname with the connection plugin name.
145 ///
146 /// # Parameters
147 ///
148 /// * `hostname` - The hostname or IP address of the target device. Accepts any type
149 /// implementing `Into<String>`, such as `&str` or `String`. This identifies the
150 /// remote endpoint for the connection.
151 /// * `plugin_name` - The connection plugin name (e.g., "ssh", "netconf", "http").
152 /// Accepts any type implementing `Into<String>`. This distinguishes between different
153 /// connection plugin names to the same host.
154 ///
155 /// # Returns
156 ///
157 /// Returns a new `ConnectionKey` instance with the provided hostname and plugin name.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// # use genja_core::inventory::ConnectionKey;
163 /// // Using string slices
164 /// let key1 = ConnectionKey::new("10.0.0.1", "ssh");
165 ///
166 /// // Using owned strings
167 /// let hostname = String::from("router1");
168 /// let plugin_name = String::from("netconf");
169 /// let key2 = ConnectionKey::new(hostname, plugin_name);
170 ///
171 /// // Mixed types
172 /// let key3 = ConnectionKey::new("10.0.0.2", String::from("http"));
173 /// ```
174 pub fn new(hostname: impl Into<String>, plugin_name: impl Into<String>) -> Self {
175 Self {
176 hostname: hostname.into(),
177 plugin_name: plugin_name.into(),
178 }
179 }
180}
181
182pub type ConnectionFactory =
183 dyn Fn(&ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>> + Send + Sync;
184
185/// Statistics tracking connection lifecycle operations per connection plugin name.
186///
187/// `ConnectionCounters` provides a simple counter-based mechanism for monitoring connection
188/// operations in the `ConnectionManager`. Each connection plugin name (e.g., "ssh", "netconf", "http")
189/// has its own set of counters that track how many times connections of that type have been
190/// created, opened, and closed.
191///
192/// These counters are useful for:
193/// - **Performance Monitoring**: Identify connection pool efficiency and reuse patterns
194/// - **Debugging**: Detect connection leaks, excessive creation, or improper cleanup
195/// - **Testing**: Verify connection lifecycle behavior in unit and integration tests
196/// - **Metrics**: Export connection statistics for observability systems
197///
198/// # Counter Semantics
199///
200/// * `create_calls` - Incremented when a new connection instance is created by the factory.
201/// This happens on the first call to `get_or_create()` for a unique `ConnectionKey`.
202/// Multiple calls with the same key do not increment this counter.
203///
204/// * `open_calls` - Incremented when `open()` is called on a connection. This happens when
205/// `open_connection()` is called and the connection's `is_alive()` returns `false`.
206/// Calling `open_connection()` on an already-alive connection does not increment this counter.
207///
208/// * `close_calls` - Incremented when a connection is closed via `close_connection()` or
209/// `close_all_connections()`. Each connection is counted only once when it's removed from
210/// the pool.
211///
212/// # Thread Safety
213///
214/// The counters are stored in a `DashMap<String, ConnectionCounters>` in the `ConnectionManager`,
215/// providing thread-safe concurrent access. Multiple threads can increment counters for different
216/// connection plugin names simultaneously without blocking each other.
217///
218/// # Usage Patterns
219///
220/// ## Ideal Pattern (Efficient Connection Reuse)
221/// ```text
222/// create_calls: 1
223/// open_calls: 1
224/// close_calls: 1
225/// ```
226/// This indicates a connection was created once, opened once, and properly cleaned up.
227/// Multiple operations reused the same connection without reopening it.
228///
229/// ## Connection Leak Pattern
230/// ```text
231/// create_calls: 5
232/// open_calls: 5
233/// close_calls: 0
234/// ```
235/// This indicates connections are being created but never closed, suggesting a resource leak.
236///
237/// ## Excessive Recreation Pattern
238/// ```text
239/// create_calls: 100
240/// open_calls: 100
241/// close_calls: 100
242/// ```
243/// This indicates connections are being created and destroyed repeatedly instead of being
244/// reused, suggesting inefficient connection pooling.
245///
246/// # Examples
247///
248/// ## Monitoring Connection Usage
249///
250/// ```
251/// # use async_trait::async_trait;
252/// # use std::sync::Arc;
253/// # use tokio::runtime::Builder;
254/// # use tokio::sync::Mutex;
255/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
256/// # #[derive(Debug)]
257/// # struct SshConnection { alive: bool }
258/// # #[async_trait]
259/// # impl Connection for SshConnection {
260/// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
261/// # Box::new(SshConnection { alive: false })
262/// # }
263/// # fn is_alive(&self) -> bool { self.alive }
264/// # async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
265/// # self.alive = true; Ok(())
266/// # }
267/// # fn close(&mut self) -> ConnectionKey {
268/// # self.alive = false;
269/// # ConnectionKey::new("router1", "ssh")
270/// # }
271/// # }
272/// # let factory = Arc::new(|_key: &ConnectionKey| {
273/// # Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
274/// # });
275/// let manager = ConnectionManager::with_connection_factory(factory);
276/// let key = ConnectionKey::new("router1", "ssh");
277/// let params = ResolvedConnectionParams {
278/// hostname: "10.0.0.1".to_string(),
279/// port: Some(22),
280/// username: Some("admin".to_string()),
281/// password: None,
282/// platform: None,
283/// extras: None,
284/// };
285///
286/// // Perform operations
287/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
288/// runtime.block_on(async {
289/// manager.open_connection(&key, ¶ms).await?;
290/// manager.open_connection(&key, ¶ms).await?; // Reuses existing connection
291/// Ok::<(), String>(())
292/// })?;
293/// manager.close_connection(&key);
294///
295/// // Check counters
296/// let counters = manager.connection_counters_for("ssh").unwrap();
297/// assert_eq!(counters.create_calls, 1); // Created once
298/// assert_eq!(counters.open_calls, 1); // Opened once (second call reused)
299/// assert_eq!(counters.close_calls, 1); // Closed once
300/// # Ok::<(), String>(())
301/// ```
302///
303/// ## Detecting Connection Leaks in Tests
304///
305/// ```
306/// # use async_trait::async_trait;
307/// # use std::sync::Arc;
308/// # use tokio::runtime::Builder;
309/// # use tokio::sync::Mutex;
310/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
311/// # #[derive(Debug)]
312/// # struct SshConnection { alive: bool }
313/// # #[async_trait]
314/// # impl Connection for SshConnection {
315/// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
316/// # Box::new(SshConnection { alive: false })
317/// # }
318/// # fn is_alive(&self) -> bool { self.alive }
319/// # async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
320/// # self.alive = true; Ok(())
321/// # }
322/// # fn close(&mut self) -> ConnectionKey {
323/// # self.alive = false;
324/// # ConnectionKey::new("router1", "ssh")
325/// # }
326/// # }
327/// # let factory = Arc::new(|_key: &ConnectionKey| {
328/// # Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
329/// # });
330/// let manager = ConnectionManager::with_connection_factory(factory);
331/// let params = ResolvedConnectionParams {
332/// hostname: "10.0.0.1".to_string(),
333/// port: Some(22),
334/// username: Some("admin".to_string()),
335/// password: None,
336/// platform: None,
337/// extras: None,
338/// };
339///
340/// // Open multiple connections
341/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
342/// for i in 1..=5 {
343/// let key = ConnectionKey::new(format!("router{}", i), "ssh");
344/// runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
345/// }
346///
347/// // Verify all connections were created
348/// let counters = manager.connection_counters_for("ssh").unwrap();
349/// assert_eq!(counters.create_calls, 5);
350/// assert_eq!(counters.open_calls, 5);
351///
352/// // Clean up and verify no leaks
353/// manager.close_all_connections();
354/// let counters = manager.connection_counters_for("ssh").unwrap();
355/// assert_eq!(counters.close_calls, 5); // All connections closed
356/// # Ok::<(), String>(())
357/// ```
358///
359/// ## Comparing Multiple Connection Types
360///
361/// ```
362/// # use async_trait::async_trait;
363/// # use std::sync::Arc;
364/// # use tokio::runtime::Builder;
365/// # use tokio::sync::Mutex;
366/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
367/// # #[derive(Debug)]
368/// # struct TestConnection { conn_type: String, alive: bool }
369/// # #[async_trait]
370/// # impl Connection for TestConnection {
371/// # fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
372/// # Box::new(TestConnection { conn_type: key.plugin_name.clone(), alive: false })
373/// # }
374/// # fn is_alive(&self) -> bool { self.alive }
375/// # async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
376/// # self.alive = true; Ok(())
377/// # }
378/// # fn close(&mut self) -> ConnectionKey {
379/// # self.alive = false;
380/// # ConnectionKey::new("host", &self.conn_type)
381/// # }
382/// # }
383/// # let factory = Arc::new(|key: &ConnectionKey| {
384/// # Some(Arc::new(Mutex::new(TestConnection {
385/// # conn_type: key.plugin_name.clone(),
386/// # alive: false
387/// # })) as Arc<Mutex<dyn Connection>>)
388/// # });
389/// let manager = ConnectionManager::with_connection_factory(factory);
390/// let params = ResolvedConnectionParams {
391/// hostname: "10.0.0.1".to_string(),
392/// port: Some(22),
393/// username: Some("admin".to_string()),
394/// password: None,
395/// platform: None,
396/// extras: None,
397/// };
398///
399/// // Open different connection plugin names
400/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
401/// runtime.block_on(async {
402/// manager.open_connection(&ConnectionKey::new("router1", "ssh"), ¶ms).await?;
403/// manager.open_connection(&ConnectionKey::new("router1", "netconf"), ¶ms).await?;
404/// Ok::<(), String>(())
405/// })?;
406///
407/// // Get snapshot of all counters
408/// let snapshot = manager.connection_counters_snapshot();
409/// let ssh_counters = snapshot.get("ssh").unwrap();
410/// let netconf_counters = snapshot.get("netconf").unwrap();
411///
412/// assert_eq!(ssh_counters.create_calls, 1);
413/// assert_eq!(netconf_counters.create_calls, 1);
414/// # Ok::<(), String>(())
415/// ```
416#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
417pub struct ConnectionCounters {
418 pub create_calls: usize,
419 pub open_calls: usize,
420 pub close_calls: usize,
421}
422/// Thread-safe manager for connection lifecycle and pooling.
423///
424/// `ConnectionManager` provides centralized management of connections to remote hosts,
425/// handling connection creation, caching, opening, and closing. It uses a factory pattern
426/// to create connections dynamically based on connection plugin name, and maintains a pool of
427/// active connections for reuse across multiple operations.
428///
429/// The manager is designed for concurrent access and uses lock-free data structures
430/// (`DashMap`) for the connection pool and counters, with an `RwLock` for the factory
431/// to minimize contention.
432///
433/// # Architecture
434///
435/// The manager consists of four main components:
436///
437/// 1. **Connection Pool** (`connections_map`): A `DashMap` storing active connections
438/// keyed by `ConnectionKey` (hostname + connection plugin name). Connections are wrapped
439/// in `Arc<Mutex<_>>` for thread-safe sharing and interior mutability.
440///
441/// 2. **Connection Factory** (`connection_factory`): An optional factory function that
442/// creates new connections on demand. The factory is wrapped in `RwLock<Option<Arc<_>>>`
443/// to allow runtime configuration while supporting concurrent reads.
444///
445/// 3. **Usage Counters** (`counters`): A `DashMap` tracking create, open, and close
446/// operations per connection plugin name. Useful for monitoring, debugging, and testing.
447///
448/// 4. **Caching Strategy**: Connections are created lazily on first access and cached
449/// for subsequent use. The same connection instance is reused until explicitly closed.
450///
451/// # Connection Lifecycle
452///
453/// 1. **Creation**: When `get_or_create()` is called with a new key, the factory is
454/// invoked to create a connection. The connection is inserted into the pool and
455/// the `create_calls` counter is incremented.
456///
457/// 2. **Opening**: The `open_connection()` method checks if a connection is alive
458/// before calling `open()`. Only actual open operations increment the `open_calls`
459/// counter.
460///
461/// 3. **Reuse**: Subsequent calls with the same key return the cached connection
462/// without creating a new one or reopening it if it's still alive.
463///
464/// 4. **Closing**: Connections can be closed individually via `close_connection()` or
465/// all at once via `close_all_connections()`. Closed connections are removed from
466/// the pool and the `close_calls` counter is incremented.
467///
468/// # Thread Safety
469///
470/// The manager is fully thread-safe and designed for concurrent access:
471///
472/// - **Lock-Free Pool**: `DashMap` provides concurrent access to the connection pool
473/// without requiring a global lock. Different threads can access different connections
474/// simultaneously.
475///
476/// - **Per-Connection Locking**: Each connection is wrapped in `Mutex`, allowing
477/// fine-grained locking. Only the thread actively using a connection holds its lock.
478///
479/// - **Factory Configuration**: The factory uses `RwLock` to allow multiple concurrent
480/// reads (connection creation) while serializing writes (factory updates).
481///
482/// - **Lock Ordering**: Methods acquire locks in a consistent order (factory → connection)
483/// and release them promptly to prevent deadlocks.
484///
485/// # Factory Pattern
486///
487/// The connection factory is a function that takes a `ConnectionKey` and returns an
488/// optional connection. This design allows:
489///
490/// - **Plugin-Based Architecture**: Different connection plugin names (SSH, NETCONF, HTTP)
491/// can be registered dynamically via plugins.
492///
493/// - **Lazy Loading**: Connections are only created when needed, reducing startup time
494/// and resource usage.
495///
496/// - **Graceful Degradation**: If no plugin is registered for a connection plugin name, the
497/// factory returns `None` and the manager propagates this to the caller.
498///
499/// # Usage Counters
500///
501/// The manager tracks three types of operations per connection plugin name:
502///
503/// - `create_calls`: Number of times a new connection was created
504/// - `open_calls`: Number of times `open()` was called on a connection
505/// - `close_calls`: Number of times a connection was closed
506///
507/// These counters are useful for:
508/// - Monitoring connection pool efficiency
509/// - Debugging connection leaks or excessive creation
510/// - Testing connection lifecycle behavior
511///
512/// # Examples
513///
514/// ## Basic Setup with Factory
515///
516/// ```
517/// use async_trait::async_trait;
518/// use std::sync::Arc;
519/// use tokio::sync::Mutex;
520/// use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
521///
522/// #[derive(Debug)]
523/// struct SshConnection {
524/// alive: bool,
525/// }
526///
527/// #[async_trait]
528/// impl Connection for SshConnection {
529/// fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
530/// Box::new(SshConnection { alive: false })
531/// }
532///
533/// fn is_alive(&self) -> bool {
534/// self.alive
535/// }
536///
537/// async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
538/// -> Result<(), String> {
539/// self.alive = true;
540/// Ok(())
541/// }
542///
543/// fn close(&mut self) -> ConnectionKey {
544/// self.alive = false;
545/// ConnectionKey::new("router1", "ssh")
546/// }
547/// }
548///
549/// // Create a factory that returns SSH connections
550/// let factory = Arc::new(|key: &ConnectionKey| {
551/// if key.plugin_name == "ssh" {
552/// Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
553/// } else {
554/// None
555/// }
556/// });
557///
558/// let manager = ConnectionManager::with_connection_factory(factory);
559/// ```
560///
561/// ## Connection Reuse
562///
563/// ```
564/// # use async_trait::async_trait;
565/// # use std::sync::Arc;
566/// # use tokio::sync::Mutex;
567/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
568/// # #[derive(Debug)]
569/// # struct SshConnection { alive: bool }
570/// # #[async_trait]
571/// # impl Connection for SshConnection {
572/// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
573/// # Box::new(SshConnection { alive: false })
574/// # }
575/// # fn is_alive(&self) -> bool { self.alive }
576/// # async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
577/// # -> Result<(), String> { self.alive = true; Ok(()) }
578/// # fn close(&mut self) -> ConnectionKey {
579/// # self.alive = false;
580/// # ConnectionKey::new("router1", "ssh")
581/// # }
582/// # }
583/// # let factory = Arc::new(|_key: &ConnectionKey| {
584/// # Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
585/// # });
586/// let manager = ConnectionManager::with_connection_factory(factory);
587/// let key = ConnectionKey::new("router1", "ssh");
588///
589/// // First access creates the connection
590/// let conn1 = manager.get_or_create(key.clone())?.unwrap();
591///
592/// // Second access returns the same connection
593/// let conn2 = manager.get_or_create(key)?.unwrap();
594///
595/// assert!(Arc::ptr_eq(&conn1, &conn2));
596/// # Ok::<(), String>(())
597/// ```
598///
599/// ## Monitoring Connection Usage
600///
601/// ```
602/// # use async_trait::async_trait;
603/// # use std::sync::Arc;
604/// # use tokio::runtime::Builder;
605/// # use tokio::sync::Mutex;
606/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
607/// # #[derive(Debug)]
608/// # struct SshConnection { alive: bool }
609/// # #[async_trait]
610/// # impl Connection for SshConnection {
611/// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
612/// # Box::new(SshConnection { alive: false })
613/// # }
614/// # fn is_alive(&self) -> bool { self.alive }
615/// # async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
616/// # self.alive = true; Ok(())
617/// # }
618/// # fn close(&mut self) -> ConnectionKey {
619/// # self.alive = false;
620/// # ConnectionKey::new("router1", "ssh")
621/// # }
622/// # }
623/// # let factory = Arc::new(|_key: &ConnectionKey| {
624/// # Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
625/// # });
626/// let manager = ConnectionManager::with_connection_factory(factory);
627/// let key = ConnectionKey::new("router1", "ssh");
628/// let params = ResolvedConnectionParams {
629/// hostname: "10.0.0.1".to_string(),
630/// port: Some(22),
631/// username: Some("admin".to_string()),
632/// password: None,
633/// platform: None,
634/// extras: None,
635/// };
636///
637/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
638/// runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
639///
640/// // Check counters
641/// let counters = manager.connection_counters_for("ssh").unwrap();
642/// assert_eq!(counters.create_calls, 1);
643/// assert_eq!(counters.open_calls, 1);
644/// # Ok::<(), String>(())
645/// ```
646///
647/// ## Cleanup
648///
649/// ```
650/// # use async_trait::async_trait;
651/// # use std::sync::Arc;
652/// # use tokio::sync::Mutex;
653/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
654/// # #[derive(Debug)]
655/// # struct SshConnection { alive: bool }
656/// # #[async_trait]
657/// # impl Connection for SshConnection {
658/// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
659/// # Box::new(SshConnection { alive: false })
660/// # }
661/// # fn is_alive(&self) -> bool { self.alive }
662/// # async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
663/// # -> Result<(), String> { self.alive = true; Ok(()) }
664/// # fn close(&mut self) -> ConnectionKey {
665/// # self.alive = false;
666/// # ConnectionKey::new("router1", "ssh")
667/// # }
668/// # }
669/// # let factory = Arc::new(|_key: &ConnectionKey| {
670/// # Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
671/// # });
672/// let manager = ConnectionManager::with_connection_factory(factory);
673/// let key1 = ConnectionKey::new("router1", "ssh");
674/// let key2 = ConnectionKey::new("router2", "ssh");
675///
676/// manager.get_or_create(key1.clone())?;
677/// manager.get_or_create(key2.clone())?;
678///
679/// // Close specific connection
680/// manager.close_connection(&key1);
681///
682/// // Close all remaining connections
683/// manager.close_all_connections();
684///
685/// let counters = manager.connection_counters_for("ssh").unwrap();
686/// assert_eq!(counters.close_calls, 2);
687/// # Ok::<(), String>(())
688/// ```
689pub struct ConnectionManager {
690 connections_map: DashMap<ConnectionKey, Arc<Mutex<dyn Connection>>>,
691 connection_factory: RwLock<Option<Arc<ConnectionFactory>>>,
692 counters: Arc<DashMap<String, ConnectionCounters>>,
693}
694
695impl fmt::Debug for ConnectionManager {
696 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697 f.debug_struct("ConnectionManager")
698 .field("connections_map_len", &self.connections_map.len())
699 .field(
700 "has_connection_factory",
701 &self
702 .connection_factory
703 .read()
704 .map(|factory| factory.is_some())
705 .unwrap_or(false),
706 )
707 .finish()
708 }
709}
710
711impl ConnectionManager {
712 pub fn with_connection_factory(factory: Arc<ConnectionFactory>) -> Self {
713 Self {
714 connections_map: DashMap::new(),
715 connection_factory: RwLock::new(Some(factory)),
716 counters: Arc::new(DashMap::new()),
717 }
718 }
719
720 fn increment_create(&self, connection_type: &str) {
721 let mut entry = self
722 .counters
723 .entry(connection_type.to_string())
724 .or_default();
725 entry.create_calls += 1;
726 }
727
728 fn increment_open(&self, connection_type: &str) {
729 let mut entry = self
730 .counters
731 .entry(connection_type.to_string())
732 .or_default();
733 entry.open_calls += 1;
734 }
735
736 fn increment_close(&self, connection_type: &str) {
737 let mut entry = self
738 .counters
739 .entry(connection_type.to_string())
740 .or_default();
741 entry.close_calls += 1;
742 }
743
744 pub fn connection_counters_for(&self, connection_type: &str) -> Option<ConnectionCounters> {
745 self.counters.get(connection_type).map(|entry| *entry)
746 }
747
748 pub fn connection_counters_snapshot(&self) -> HashMap<String, ConnectionCounters> {
749 self.counters
750 .iter()
751 .map(|entry| (entry.key().clone(), *entry.value()))
752 .collect()
753 }
754
755 pub fn get(&self, key: &ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>> {
756 self.connections_map
757 .get(key)
758 .map(|entry| entry.value().clone())
759 }
760
761 pub fn insert(&self, key: ConnectionKey, connection: Arc<Mutex<dyn Connection>>) {
762 self.connections_map.insert(key, connection);
763 }
764
765 /// Retrieves an existing connection or creates a new one using the configured factory.
766 ///
767 /// This method provides thread-safe, lazy initialization of connections. It first checks
768 /// for an existing connection in the map, and if missing, it uses the connection factory
769 /// to create one and inserts it.
770 ///
771 /// # Concurrency and Race Behavior
772 ///
773 /// - Creation uses `DashMap::entry`, so only one connection is created per unique key,
774 /// even under concurrent access.
775 /// - The factory is called while holding the entry lock for that key’s shard. This avoids
776 /// race conditions but means a slow factory can temporarily block other operations on the
777 /// same shard.
778 /// - If the factory returns `None`, no entry is inserted and subsequent calls may retry.
779 ///
780 /// # Parameters
781 ///
782 /// * `key` - A `ConnectionKey` identifying the connection by hostname and connection plugin name.
783 ///
784 /// # Returns
785 ///
786 /// - `Ok(Some(connection))` if a connection exists or was created
787 /// - `Ok(None)` if the factory returns `None` (e.g., no plugin registered)
788 /// - `Err(...)` if the factory lock is poisoned or not configured
789 ///
790 /// # Errors
791 ///
792 /// - `"connection factory not set"` if no factory is configured
793 /// - `"connection factory lock poisoned"` if the factory lock is poisoned
794 ///
795 /// # Examples
796 ///
797 /// ```
798 /// use async_trait::async_trait;
799 /// use std::sync::Arc;
800 /// use tokio::sync::Mutex;
801 /// use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
802 ///
803 /// #[derive(Debug)]
804 /// struct DummyConnection;
805 ///
806 /// #[async_trait]
807 /// impl Connection for DummyConnection {
808 /// fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
809 /// Box::new(DummyConnection)
810 /// }
811 /// fn is_alive(&self) -> bool { true }
812 /// async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
813 /// -> Result<(), String> { Ok(()) }
814 /// fn close(&mut self) -> ConnectionKey {
815 /// ConnectionKey::new("router1", "ssh")
816 /// }
817 /// }
818 ///
819 /// let factory = Arc::new(|_key: &ConnectionKey| {
820 /// Some(Arc::new(Mutex::new(DummyConnection)) as Arc<Mutex<dyn Connection>>)
821 /// });
822 /// let manager = ConnectionManager::with_connection_factory(factory);
823 ///
824 /// let key = ConnectionKey::new("router1", "ssh");
825 /// let connection = manager.get_or_create(key)?;
826 /// assert!(connection.is_some());
827 /// # Ok::<(), String>(())
828 /// ```
829 pub fn get_or_create(
830 &self,
831 key: ConnectionKey,
832 ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String> {
833 let factory = {
834 let guard = self
835 .connection_factory
836 .read()
837 .map_err(|_| "connection factory lock poisoned".to_string())?;
838 guard
839 .clone()
840 .ok_or_else(|| "connection factory not set".to_string())?
841 };
842
843 match self.connections_map.entry(key) {
844 dashmap::mapref::entry::Entry::Occupied(entry) => Ok(Some(entry.get().clone())),
845 dashmap::mapref::entry::Entry::Vacant(entry) => {
846 let key_for_factory = entry.key().clone();
847 let connection_type = key_for_factory.plugin_name.clone();
848 let Some(connection) = factory(&key_for_factory) else {
849 return Ok(None);
850 };
851 self.increment_create(&connection_type);
852 entry.insert(connection.clone());
853 Ok(Some(connection))
854 }
855 }
856 }
857
858 pub fn set_connection_factory(&self, factory: Arc<ConnectionFactory>) {
859 if let Ok(mut slot) = self.connection_factory.write() {
860 *slot = Some(factory);
861 }
862 }
863
864 /// Close the connection associated with the given key and remove
865 /// it from `connections_map`.
866 pub fn close_connection(&self, key: &ConnectionKey) {
867 if let Some((_, connection)) = self.connections_map.remove(key) {
868 let mut connection = connection.blocking_lock();
869 connection.close();
870 self.increment_close(&key.plugin_name);
871 }
872 }
873
874 /// Close all connections in `connections_map` and then clear the map.
875 pub fn close_all_connections(&self) {
876 self.connections_map.iter().for_each(|entry| {
877 let mut connection = entry.value().blocking_lock();
878 connection.close();
879 self.increment_close(&entry.key().plugin_name);
880 });
881 self.connections_map.clear();
882 }
883
884 /// Opens a connection for the specified key, creating it if necessary.
885 ///
886 /// This method provides a high-level interface for establishing connections to remote hosts.
887 /// It combines connection retrieval/creation with automatic opening, ensuring the connection
888 /// is ready for use before returning. The method handles the full lifecycle:
889 ///
890 /// 1. **Retrieve or Create**: Calls `get_or_create()` to obtain a connection from the map
891 /// or create a new one using the configured factory
892 /// 2. **Check Alive Status**: Acquires the connection's mutex and checks if it's already open
893 /// 3. **Open if Needed**: If the connection is not alive, calls `open()` with the provided
894 /// parameters and increments the open counter
895 /// 4. **Return Ready Connection**: Returns the connection wrapped in `Arc<Mutex<_>>` for
896 /// thread-safe access
897 ///
898 /// # Parameters
899 ///
900 /// * `key` - A `ConnectionKey` identifying the connection by hostname and connection plugin name.
901 /// This key is used to look up or create the connection in the manager's map.
902 /// * `params` - A `ResolvedConnectionParams` containing the connection parameters such as
903 /// hostname, port, username, password, and platform. These parameters are passed to the
904 /// connection's `open()` method if the connection needs to be established.
905 ///
906 /// # Thread Safety and Locking
907 ///
908 /// The method uses a two-phase locking strategy to prevent deadlocks:
909 ///
910 /// 1. **Factory Lock**: `get_or_create()` briefly acquires the factory's `RwLock` to clone
911 /// the `Arc<ConnectionFactory>`, then releases it before calling the factory function.
912 /// This prevents holding the factory lock during connection creation.
913 ///
914 /// 2. **Connection Lock**: After obtaining the connection, the method acquires its `Mutex`
915 /// in a scoped block (`{ ... }`). The lock is automatically released when the scope ends,
916 /// before returning the connection. This allows the caller to acquire the lock again
917 /// without deadlock.
918 ///
919 /// **Why the scoped lock?**
920 /// ```text
921 /// Without scope: With scope:
922 /// --------------- -----------
923 /// let mut guard = conn.lock(); {
924 /// guard.open(params)?; let mut guard = conn.lock();
925 /// // guard still held guard.open(params)?;
926 /// Ok(Some(conn)) } // guard dropped here
927 /// // Caller tries conn.lock() Ok(Some(conn))
928 /// // DEADLOCK! 💥 // Caller can lock successfully ✓
929 /// ```
930 ///
931 /// # Connection Lifecycle
932 ///
933 /// The method respects the connection's alive state:
934 /// - If `is_alive()` returns `true`, the connection is already open and no action is taken
935 /// - If `is_alive()` returns `false`, `open()` is called to establish the connection
936 /// - The `open_calls` counter is incremented only when `open()` is actually called
937 ///
938 /// This prevents unnecessary reconnection attempts and tracks actual connection operations.
939 ///
940 /// # Returns
941 ///
942 /// Returns `Ok(Some(Arc<Mutex<dyn Connection>>))` if:
943 /// - The connection was successfully retrieved or created, AND
944 /// - The connection was already alive OR was successfully opened
945 ///
946 /// Returns `Ok(None)` if:
947 /// - The factory function returned `None` (e.g., no plugin registered for this connection plugin name)
948 ///
949 /// Returns `Err(String)` if:
950 /// - The connection factory is not configured: `"connection factory not set"`
951 /// - The factory lock is poisoned: `"connection factory lock poisoned"`
952 /// - The connection lock is poisoned: `"connection lock poisoned"`
953 /// - The connection's `open()` method returns an error (error message from the connection)
954 ///
955 /// # Examples
956 ///
957 /// ## Basic Usage
958 ///
959 /// ```
960 /// use async_trait::async_trait;
961 /// use std::sync::Arc;
962 /// use tokio::runtime::Builder;
963 /// use tokio::sync::Mutex;
964 /// use genja_core::inventory::{
965 /// Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
966 /// };
967 ///
968 /// #[derive(Debug)]
969 /// struct SshConnection {
970 /// alive: bool,
971 /// }
972 ///
973 /// #[async_trait]
974 /// impl Connection for SshConnection {
975 /// fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
976 /// Box::new(SshConnection { alive: false })
977 /// }
978 ///
979 /// fn is_alive(&self) -> bool {
980 /// self.alive
981 /// }
982 ///
983 /// async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
984 /// self.alive = true;
985 /// Ok(())
986 /// }
987 ///
988 /// fn close(&mut self) -> ConnectionKey {
989 /// self.alive = false;
990 /// ConnectionKey::new("router1", "ssh")
991 /// }
992 /// }
993 ///
994 /// let factory = Arc::new(|_key: &ConnectionKey| {
995 /// Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
996 /// });
997 /// let manager = ConnectionManager::with_connection_factory(factory);
998 ///
999 /// let key = ConnectionKey::new("router1", "ssh");
1000 /// let params = ResolvedConnectionParams {
1001 /// hostname: "10.0.0.1".to_string(),
1002 /// port: Some(22),
1003 /// username: Some("admin".to_string()),
1004 /// password: None,
1005 /// platform: None,
1006 /// extras: None,
1007 /// };
1008 ///
1009 /// // First call creates and opens the connection
1010 /// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
1011 /// let connection = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
1012 /// assert!(connection.is_some());
1013 ///
1014 /// // Second call reuses the existing connection without reopening
1015 /// let same_connection = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
1016 /// assert!(Arc::ptr_eq(&connection.unwrap(), &same_connection.unwrap()));
1017 /// # Ok::<(), String>(())
1018 /// ```
1019 ///
1020 /// ## Handling Missing Plugins
1021 ///
1022 /// ```
1023 /// use std::sync::Arc;
1024 /// use tokio::runtime::Builder;
1025 /// use genja_core::inventory::{ConnectionKey, ConnectionManager, ResolvedConnectionParams};
1026 ///
1027 /// // Factory returns None for unknown connection plugin names
1028 /// let factory = Arc::new(|key: &ConnectionKey| {
1029 /// if key.plugin_name == "ssh" {
1030 /// // ... return SSH connection
1031 /// None // simplified for example
1032 /// } else {
1033 /// None // No plugin for this type
1034 /// }
1035 /// });
1036 /// let manager = ConnectionManager::with_connection_factory(factory);
1037 ///
1038 /// let key = ConnectionKey::new("router1", "telnet");
1039 /// let params = ResolvedConnectionParams {
1040 /// hostname: "10.0.0.1".to_string(),
1041 /// port: None,
1042 /// username: None,
1043 /// password: None,
1044 /// platform: None,
1045 /// extras: None,
1046 /// };
1047 ///
1048 /// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
1049 /// let result = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
1050 /// assert!(result.is_none()); // No plugin available
1051 /// # Ok::<(), String>(())
1052 /// ```
1053 ///
1054 /// ## Thread-Safe Concurrent Access
1055 ///
1056 /// ```
1057 /// use async_trait::async_trait;
1058 /// use std::sync::Arc;
1059 /// use std::thread;
1060 /// use tokio::runtime::Builder;
1061 /// use tokio::sync::Mutex;
1062 /// use genja_core::inventory::{
1063 /// Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
1064 /// };
1065 ///
1066 /// # #[derive(Debug)]
1067 /// # struct SshConnection { alive: bool }
1068 /// # #[async_trait]
1069 /// # impl Connection for SshConnection {
1070 /// # fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
1071 /// # Box::new(SshConnection { alive: false })
1072 /// # }
1073 /// # fn is_alive(&self) -> bool { self.alive }
1074 /// # async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
1075 /// # self.alive = true;
1076 /// # Ok(())
1077 /// # }
1078 /// # fn close(&mut self) -> ConnectionKey {
1079 /// # self.alive = false;
1080 /// # ConnectionKey::new("router1", "ssh")
1081 /// # }
1082 /// # }
1083 /// let factory = Arc::new(|_key: &ConnectionKey| {
1084 /// Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
1085 /// });
1086 /// let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
1087 /// let runtime = Arc::new(Builder::new_current_thread().enable_all().build().unwrap());
1088 ///
1089 /// let key = ConnectionKey::new("router1", "ssh");
1090 /// let params = Arc::new(ResolvedConnectionParams {
1091 /// hostname: "10.0.0.1".to_string(),
1092 /// port: None,
1093 /// username: None,
1094 /// password: None,
1095 /// platform: None,
1096 /// extras: None,
1097 /// });
1098 ///
1099 /// // Multiple threads can safely open the same connection
1100 /// let handles: Vec<_> = (0..3)
1101 /// .map(|_| {
1102 /// let manager = Arc::clone(&manager);
1103 /// let runtime = Arc::clone(&runtime);
1104 /// let key = key.clone();
1105 /// let params = Arc::clone(¶ms);
1106 /// thread::spawn(move || {
1107 /// runtime.block_on(async { manager.open_connection(&key, ¶ms).await })
1108 /// })
1109 /// })
1110 /// .collect();
1111 ///
1112 /// for handle in handles {
1113 /// let result = handle.join().unwrap();
1114 /// assert!(result.is_ok());
1115 /// }
1116 /// ```
1117 pub async fn open_connection(
1118 &self,
1119 key: &ConnectionKey,
1120 params: &ResolvedConnectionParams,
1121 ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String> {
1122 let Some(connection) = self.get_or_create(key.clone())? else {
1123 return Ok(None);
1124 };
1125
1126 {
1127 let mut guard = connection.lock().await;
1128 if !guard.is_alive() {
1129 guard.open(params).await?;
1130 self.increment_open(&key.plugin_name);
1131 }
1132 }
1133 Ok(Some(connection))
1134 }
1135}
1136
1137impl Default for ConnectionManager {
1138 fn default() -> Self {
1139 Self {
1140 connections_map: DashMap::new(),
1141 connection_factory: RwLock::new(None),
1142 counters: Arc::new(DashMap::new()),
1143 }
1144 }
1145}