Skip to main content

genja_core/inventory/
connections.rs

1use super::ResolvedConnectionParams;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::any::Any;
5use std::collections::HashMap;
6use std::fmt;
7use std::sync::{Arc, RwLock};
8use tokio::sync::Mutex;
9
10#[async_trait]
11pub trait Connection
12where
13    Self: Any + Send + Sync + fmt::Debug,
14{
15    fn create(&self, key: &ConnectionKey) -> Box<dyn Connection>;
16
17    fn is_alive(&self) -> bool;
18
19    async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String>;
20
21    async fn execute_command(&mut self, _command: &str) -> Result<String, String> {
22        Err("connection does not implement execute_command".to_string())
23    }
24
25    fn close(&mut self) -> ConnectionKey;
26}
27
28/// A unique identifier for a connection in the connection manager.
29///
30/// `ConnectionKey` serves as a composite key for looking up and managing connections
31/// in the `ConnectionManager`. It combines a hostname with a connection plugin name to
32/// uniquely identify a specific connection instance. This allows the same host to have
33/// multiple concurrent connections handled by different plugins (e.g., SSH, NETCONF, HTTP).
34///
35/// The struct implements `Hash` and `Eq` to enable its use as a key in hash-based
36/// collections like `HashMap` and `DashMap`.
37///
38/// # Hash Function Behavior
39///
40/// When inserting a `ConnectionKey` into a hash-based collection (like `DashMap` in
41/// `ConnectionManager`), the hash function is used to:
42///
43/// 1. **Compute Hash Value**: Both `hostname` and `plugin_name` fields are hashed
44///    together to produce a single hash value. This is done automatically by Rust's
45///    derive macro for `Hash`, which hashes each field in declaration order.
46///
47/// 2. **Determine Bucket**: The hash value is used to determine which internal bucket
48///    in the hash map should store this key-value pair. This enables O(1) average-case
49///    lookup performance.
50///
51/// 3. **Handle Collisions**: If two different keys produce the same hash value (a hash
52///    collision), the `Eq` implementation is used to distinguish between them. The
53///    collection stores multiple entries in the same bucket and uses `Eq` to find the
54///    exact match.
55///
56/// 4. **Enable Deduplication**: When inserting with the same `hostname` and
57///    `plugin_name`, the hash function ensures the key maps to the same bucket,
58///    and `Eq` confirms it's the same key, allowing the collection to update the
59///    existing entry rather than creating a duplicate.
60///
61/// # Fields
62///
63/// * `hostname` - The hostname or IP address of the target device. This identifies
64///   the remote endpoint for the connection.
65/// * `plugin_name` - The connection plugin name (e.g., "ssh", "netconf", "http").
66///   This distinguishes between different connection plugin types to the same host.
67///
68/// # Examples
69///
70/// ## Basic Usage
71///
72/// ```
73/// # use genja_core::inventory::ConnectionKey;
74/// let key = ConnectionKey::new("10.0.0.1", "ssh");
75/// assert_eq!(key.hostname, "10.0.0.1");
76/// assert_eq!(key.plugin_name, "ssh");
77/// ```
78///
79/// ## Multiple Connection Plugins per Host
80///
81/// ```
82/// # use genja_core::inventory::ConnectionKey;
83/// use std::collections::HashMap;
84///
85/// let mut connections = HashMap::new();
86/// let ssh_key = ConnectionKey::new("router1", "ssh");
87/// let netconf_key = ConnectionKey::new("router1", "netconf");
88///
89/// // Same host can have different connection plugins
90/// // Each key produces a different hash due to different plugin_name
91/// connections.insert(ssh_key, "SSH connection");
92/// connections.insert(netconf_key, "NETCONF connection");
93/// assert_eq!(connections.len(), 2);
94/// ```
95///
96/// ## Key Equality and Deduplication
97///
98/// ```
99/// # use genja_core::inventory::ConnectionKey;
100/// use std::collections::HashMap;
101///
102/// let mut connections = HashMap::new();
103/// let key1 = ConnectionKey::new("router1", "ssh");
104/// let key2 = ConnectionKey::new("router1", "ssh");
105///
106/// // Both keys have the same hostname and plugin_name
107/// // They produce the same hash and are equal via Eq
108/// connections.insert(key1, "First connection");
109/// connections.insert(key2, "Second connection"); // Replaces first
110/// assert_eq!(connections.len(), 1);
111/// assert_eq!(connections.values().next(), Some(&"Second connection"));
112/// ```
113///
114/// ## Hash-Based Lookup in ConnectionManager
115///
116/// ```
117/// # use genja_core::inventory::{ConnectionKey, ConnectionManager};
118/// let manager = ConnectionManager::default();
119/// let key = ConnectionKey::new("router1", "ssh");
120///
121/// // The hash function enables fast lookup:
122/// // 1. Hash is computed from key
123/// // 2. Hash determines which bucket to search
124/// // 3. Eq is used to find exact match in bucket
125/// if let Some(connection) = manager.get(&key) {
126///     println!("Found existing connection");
127/// }
128/// ```
129#[derive(Debug, Clone, Eq, PartialEq, Hash)]
130pub struct ConnectionKey {
131    pub hostname: String,
132    pub plugin_name: String,
133}
134
135impl ConnectionKey {
136    /// Creates a new `ConnectionKey` from a hostname and plugin name.
137    ///
138    /// This constructor provides a convenient way to create a connection key by accepting
139    /// any type that can be converted into a `String` for both the hostname and connection
140    /// type parameters. This allows passing `&str`, `String`, or other string-like types
141    /// without explicit conversion.
142    ///
143    /// The resulting key uniquely identifies a connection in the `ConnectionManager` by
144    /// combining the target hostname with the connection plugin name.
145    ///
146    /// # Parameters
147    ///
148    /// * `hostname` - The hostname or IP address of the target device. Accepts any type
149    ///   implementing `Into<String>`, such as `&str` or `String`. This identifies the
150    ///   remote endpoint for the connection.
151    /// * `plugin_name` - The connection plugin name (e.g., "ssh", "netconf", "http").
152    ///   Accepts any type implementing `Into<String>`. This distinguishes between different
153    ///   connection plugin names to the same host.
154    ///
155    /// # Returns
156    ///
157    /// Returns a new `ConnectionKey` instance with the provided hostname and plugin name.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// # use genja_core::inventory::ConnectionKey;
163    /// // Using string slices
164    /// let key1 = ConnectionKey::new("10.0.0.1", "ssh");
165    ///
166    /// // Using owned strings
167    /// let hostname = String::from("router1");
168    /// let plugin_name = String::from("netconf");
169    /// let key2 = ConnectionKey::new(hostname, plugin_name);
170    ///
171    /// // Mixed types
172    /// let key3 = ConnectionKey::new("10.0.0.2", String::from("http"));
173    /// ```
174    pub fn new(hostname: impl Into<String>, plugin_name: impl Into<String>) -> Self {
175        Self {
176            hostname: hostname.into(),
177            plugin_name: plugin_name.into(),
178        }
179    }
180}
181
182pub type ConnectionFactory =
183    dyn Fn(&ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>> + Send + Sync;
184
185/// Statistics tracking connection lifecycle operations per connection plugin name.
186///
187/// `ConnectionCounters` provides a simple counter-based mechanism for monitoring connection
188/// operations in the `ConnectionManager`. Each connection plugin name (e.g., "ssh", "netconf", "http")
189/// has its own set of counters that track how many times connections of that type have been
190/// created, opened, and closed.
191///
192/// These counters are useful for:
193/// - **Performance Monitoring**: Identify connection pool efficiency and reuse patterns
194/// - **Debugging**: Detect connection leaks, excessive creation, or improper cleanup
195/// - **Testing**: Verify connection lifecycle behavior in unit and integration tests
196/// - **Metrics**: Export connection statistics for observability systems
197///
198/// # Counter Semantics
199///
200/// * `create_calls` - Incremented when a new connection instance is created by the factory.
201///   This happens on the first call to `get_or_create()` for a unique `ConnectionKey`.
202///   Multiple calls with the same key do not increment this counter.
203///
204/// * `open_calls` - Incremented when `open()` is called on a connection. This happens when
205///   `open_connection()` is called and the connection's `is_alive()` returns `false`.
206///   Calling `open_connection()` on an already-alive connection does not increment this counter.
207///
208/// * `close_calls` - Incremented when a connection is closed via `close_connection()` or
209///   `close_all_connections()`. Each connection is counted only once when it's removed from
210///   the pool.
211///
212/// # Thread Safety
213///
214/// The counters are stored in a `DashMap<String, ConnectionCounters>` in the `ConnectionManager`,
215/// providing thread-safe concurrent access. Multiple threads can increment counters for different
216/// connection plugin names simultaneously without blocking each other.
217///
218/// # Usage Patterns
219///
220/// ## Ideal Pattern (Efficient Connection Reuse)
221/// ```text
222/// create_calls: 1
223/// open_calls:   1
224/// close_calls:  1
225/// ```
226/// This indicates a connection was created once, opened once, and properly cleaned up.
227/// Multiple operations reused the same connection without reopening it.
228///
229/// ## Connection Leak Pattern
230/// ```text
231/// create_calls: 5
232/// open_calls:   5
233/// close_calls:  0
234/// ```
235/// This indicates connections are being created but never closed, suggesting a resource leak.
236///
237/// ## Excessive Recreation Pattern
238/// ```text
239/// create_calls: 100
240/// open_calls:   100
241/// close_calls:  100
242/// ```
243/// This indicates connections are being created and destroyed repeatedly instead of being
244/// reused, suggesting inefficient connection pooling.
245///
246/// # Examples
247///
248/// ## Monitoring Connection Usage
249///
250/// ```
251/// # use async_trait::async_trait;
252/// # use std::sync::Arc;
253/// # use tokio::runtime::Builder;
254/// # use tokio::sync::Mutex;
255/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
256/// # #[derive(Debug)]
257/// # struct SshConnection { alive: bool }
258/// # #[async_trait]
259/// # impl Connection for SshConnection {
260/// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
261/// #         Box::new(SshConnection { alive: false })
262/// #     }
263/// #     fn is_alive(&self) -> bool { self.alive }
264/// #     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
265/// #         self.alive = true; Ok(())
266/// #     }
267/// #     fn close(&mut self) -> ConnectionKey {
268/// #         self.alive = false;
269/// #         ConnectionKey::new("router1", "ssh")
270/// #     }
271/// # }
272/// # let factory = Arc::new(|_key: &ConnectionKey| {
273/// #     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
274/// # });
275/// let manager = ConnectionManager::with_connection_factory(factory);
276/// let key = ConnectionKey::new("router1", "ssh");
277/// let params = ResolvedConnectionParams {
278///     hostname: "10.0.0.1".to_string(),
279///     port: Some(22),
280///     username: Some("admin".to_string()),
281///     password: None,
282///     platform: None,
283///     extras: None,
284/// };
285///
286/// // Perform operations
287/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
288/// runtime.block_on(async {
289///     manager.open_connection(&key, &params).await?;
290///     manager.open_connection(&key, &params).await?; // Reuses existing connection
291///     Ok::<(), String>(())
292/// })?;
293/// manager.close_connection(&key);
294///
295/// // Check counters
296/// let counters = manager.connection_counters_for("ssh").unwrap();
297/// assert_eq!(counters.create_calls, 1); // Created once
298/// assert_eq!(counters.open_calls, 1);   // Opened once (second call reused)
299/// assert_eq!(counters.close_calls, 1);  // Closed once
300/// # Ok::<(), String>(())
301/// ```
302///
303/// ## Detecting Connection Leaks in Tests
304///
305/// ```
306/// # use async_trait::async_trait;
307/// # use std::sync::Arc;
308/// # use tokio::runtime::Builder;
309/// # use tokio::sync::Mutex;
310/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
311/// # #[derive(Debug)]
312/// # struct SshConnection { alive: bool }
313/// # #[async_trait]
314/// # impl Connection for SshConnection {
315/// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
316/// #         Box::new(SshConnection { alive: false })
317/// #     }
318/// #     fn is_alive(&self) -> bool { self.alive }
319/// #     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
320/// #         self.alive = true; Ok(())
321/// #     }
322/// #     fn close(&mut self) -> ConnectionKey {
323/// #         self.alive = false;
324/// #         ConnectionKey::new("router1", "ssh")
325/// #     }
326/// # }
327/// # let factory = Arc::new(|_key: &ConnectionKey| {
328/// #     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
329/// # });
330/// let manager = ConnectionManager::with_connection_factory(factory);
331/// let params = ResolvedConnectionParams {
332///     hostname: "10.0.0.1".to_string(),
333///     port: Some(22),
334///     username: Some("admin".to_string()),
335///     password: None,
336///     platform: None,
337///     extras: None,
338/// };
339///
340/// // Open multiple connections
341/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
342/// for i in 1..=5 {
343///     let key = ConnectionKey::new(format!("router{}", i), "ssh");
344///     runtime.block_on(async { manager.open_connection(&key, &params).await })?;
345/// }
346///
347/// // Verify all connections were created
348/// let counters = manager.connection_counters_for("ssh").unwrap();
349/// assert_eq!(counters.create_calls, 5);
350/// assert_eq!(counters.open_calls, 5);
351///
352/// // Clean up and verify no leaks
353/// manager.close_all_connections();
354/// let counters = manager.connection_counters_for("ssh").unwrap();
355/// assert_eq!(counters.close_calls, 5); // All connections closed
356/// # Ok::<(), String>(())
357/// ```
358///
359/// ## Comparing Multiple Connection Types
360///
361/// ```
362/// # use async_trait::async_trait;
363/// # use std::sync::Arc;
364/// # use tokio::runtime::Builder;
365/// # use tokio::sync::Mutex;
366/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
367/// # #[derive(Debug)]
368/// # struct TestConnection { conn_type: String, alive: bool }
369/// # #[async_trait]
370/// # impl Connection for TestConnection {
371/// #     fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
372/// #         Box::new(TestConnection { conn_type: key.plugin_name.clone(), alive: false })
373/// #     }
374/// #     fn is_alive(&self) -> bool { self.alive }
375/// #     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
376/// #         self.alive = true; Ok(())
377/// #     }
378/// #     fn close(&mut self) -> ConnectionKey {
379/// #         self.alive = false;
380/// #         ConnectionKey::new("host", &self.conn_type)
381/// #     }
382/// # }
383/// # let factory = Arc::new(|key: &ConnectionKey| {
384/// #     Some(Arc::new(Mutex::new(TestConnection {
385/// #         conn_type: key.plugin_name.clone(),
386/// #         alive: false
387/// #     })) as Arc<Mutex<dyn Connection>>)
388/// # });
389/// let manager = ConnectionManager::with_connection_factory(factory);
390/// let params = ResolvedConnectionParams {
391///     hostname: "10.0.0.1".to_string(),
392///     port: Some(22),
393///     username: Some("admin".to_string()),
394///     password: None,
395///     platform: None,
396///     extras: None,
397/// };
398///
399/// // Open different connection plugin names
400/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
401/// runtime.block_on(async {
402///     manager.open_connection(&ConnectionKey::new("router1", "ssh"), &params).await?;
403///     manager.open_connection(&ConnectionKey::new("router1", "netconf"), &params).await?;
404///     Ok::<(), String>(())
405/// })?;
406///
407/// // Get snapshot of all counters
408/// let snapshot = manager.connection_counters_snapshot();
409/// let ssh_counters = snapshot.get("ssh").unwrap();
410/// let netconf_counters = snapshot.get("netconf").unwrap();
411///
412/// assert_eq!(ssh_counters.create_calls, 1);
413/// assert_eq!(netconf_counters.create_calls, 1);
414/// # Ok::<(), String>(())
415/// ```
416#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
417pub struct ConnectionCounters {
418    pub create_calls: usize,
419    pub open_calls: usize,
420    pub close_calls: usize,
421}
422/// Thread-safe manager for connection lifecycle and pooling.
423///
424/// `ConnectionManager` provides centralized management of connections to remote hosts,
425/// handling connection creation, caching, opening, and closing. It uses a factory pattern
426/// to create connections dynamically based on connection plugin name, and maintains a pool of
427/// active connections for reuse across multiple operations.
428///
429/// The manager is designed for concurrent access and uses lock-free data structures
430/// (`DashMap`) for the connection pool and counters, with an `RwLock` for the factory
431/// to minimize contention.
432///
433/// # Architecture
434///
435/// The manager consists of four main components:
436///
437/// 1. **Connection Pool** (`connections_map`): A `DashMap` storing active connections
438///    keyed by `ConnectionKey` (hostname + connection plugin name). Connections are wrapped
439///    in `Arc<Mutex<_>>` for thread-safe sharing and interior mutability.
440///
441/// 2. **Connection Factory** (`connection_factory`): An optional factory function that
442///    creates new connections on demand. The factory is wrapped in `RwLock<Option<Arc<_>>>`
443///    to allow runtime configuration while supporting concurrent reads.
444///
445/// 3. **Usage Counters** (`counters`): A `DashMap` tracking create, open, and close
446///    operations per connection plugin name. Useful for monitoring, debugging, and testing.
447///
448/// 4. **Caching Strategy**: Connections are created lazily on first access and cached
449///    for subsequent use. The same connection instance is reused until explicitly closed.
450///
451/// # Connection Lifecycle
452///
453/// 1. **Creation**: When `get_or_create()` is called with a new key, the factory is
454///    invoked to create a connection. The connection is inserted into the pool and
455///    the `create_calls` counter is incremented.
456///
457/// 2. **Opening**: The `open_connection()` method checks if a connection is alive
458///    before calling `open()`. Only actual open operations increment the `open_calls`
459///    counter.
460///
461/// 3. **Reuse**: Subsequent calls with the same key return the cached connection
462///    without creating a new one or reopening it if it's still alive.
463///
464/// 4. **Closing**: Connections can be closed individually via `close_connection()` or
465///    all at once via `close_all_connections()`. Closed connections are removed from
466///    the pool and the `close_calls` counter is incremented.
467///
468/// # Thread Safety
469///
470/// The manager is fully thread-safe and designed for concurrent access:
471///
472/// - **Lock-Free Pool**: `DashMap` provides concurrent access to the connection pool
473///   without requiring a global lock. Different threads can access different connections
474///   simultaneously.
475///
476/// - **Per-Connection Locking**: Each connection is wrapped in `Mutex`, allowing
477///   fine-grained locking. Only the thread actively using a connection holds its lock.
478///
479/// - **Factory Configuration**: The factory uses `RwLock` to allow multiple concurrent
480///   reads (connection creation) while serializing writes (factory updates).
481///
482/// - **Lock Ordering**: Methods acquire locks in a consistent order (factory → connection)
483///   and release them promptly to prevent deadlocks.
484///
485/// # Factory Pattern
486///
487/// The connection factory is a function that takes a `ConnectionKey` and returns an
488/// optional connection. This design allows:
489///
490/// - **Plugin-Based Architecture**: Different connection plugin names (SSH, NETCONF, HTTP)
491///   can be registered dynamically via plugins.
492///
493/// - **Lazy Loading**: Connections are only created when needed, reducing startup time
494///   and resource usage.
495///
496/// - **Graceful Degradation**: If no plugin is registered for a connection plugin name, the
497///   factory returns `None` and the manager propagates this to the caller.
498///
499/// # Usage Counters
500///
501/// The manager tracks three types of operations per connection plugin name:
502///
503/// - `create_calls`: Number of times a new connection was created
504/// - `open_calls`: Number of times `open()` was called on a connection
505/// - `close_calls`: Number of times a connection was closed
506///
507/// These counters are useful for:
508/// - Monitoring connection pool efficiency
509/// - Debugging connection leaks or excessive creation
510/// - Testing connection lifecycle behavior
511///
512/// # Examples
513///
514/// ## Basic Setup with Factory
515///
516/// ```
517/// use async_trait::async_trait;
518/// use std::sync::Arc;
519/// use tokio::sync::Mutex;
520/// use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
521///
522/// #[derive(Debug)]
523/// struct SshConnection {
524///     alive: bool,
525/// }
526///
527/// #[async_trait]
528/// impl Connection for SshConnection {
529///     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
530///         Box::new(SshConnection { alive: false })
531///     }
532///
533///     fn is_alive(&self) -> bool {
534///         self.alive
535///     }
536///
537///     async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
538///         -> Result<(), String> {
539///         self.alive = true;
540///         Ok(())
541///     }
542///
543///     fn close(&mut self) -> ConnectionKey {
544///         self.alive = false;
545///         ConnectionKey::new("router1", "ssh")
546///     }
547/// }
548///
549/// // Create a factory that returns SSH connections
550/// let factory = Arc::new(|key: &ConnectionKey| {
551///     if key.plugin_name == "ssh" {
552///         Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
553///     } else {
554///         None
555///     }
556/// });
557///
558/// let manager = ConnectionManager::with_connection_factory(factory);
559/// ```
560///
561/// ## Connection Reuse
562///
563/// ```
564/// # use async_trait::async_trait;
565/// # use std::sync::Arc;
566/// # use tokio::sync::Mutex;
567/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
568/// # #[derive(Debug)]
569/// # struct SshConnection { alive: bool }
570/// # #[async_trait]
571/// # impl Connection for SshConnection {
572/// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
573/// #         Box::new(SshConnection { alive: false })
574/// #     }
575/// #     fn is_alive(&self) -> bool { self.alive }
576/// #     async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
577/// #         -> Result<(), String> { self.alive = true; Ok(()) }
578/// #     fn close(&mut self) -> ConnectionKey {
579/// #         self.alive = false;
580/// #         ConnectionKey::new("router1", "ssh")
581/// #     }
582/// # }
583/// # let factory = Arc::new(|_key: &ConnectionKey| {
584/// #     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
585/// # });
586/// let manager = ConnectionManager::with_connection_factory(factory);
587/// let key = ConnectionKey::new("router1", "ssh");
588///
589/// // First access creates the connection
590/// let conn1 = manager.get_or_create(key.clone())?.unwrap();
591///
592/// // Second access returns the same connection
593/// let conn2 = manager.get_or_create(key)?.unwrap();
594///
595/// assert!(Arc::ptr_eq(&conn1, &conn2));
596/// # Ok::<(), String>(())
597/// ```
598///
599/// ## Monitoring Connection Usage
600///
601/// ```
602/// # use async_trait::async_trait;
603/// # use std::sync::Arc;
604/// # use tokio::runtime::Builder;
605/// # use tokio::sync::Mutex;
606/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams};
607/// # #[derive(Debug)]
608/// # struct SshConnection { alive: bool }
609/// # #[async_trait]
610/// # impl Connection for SshConnection {
611/// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
612/// #         Box::new(SshConnection { alive: false })
613/// #     }
614/// #     fn is_alive(&self) -> bool { self.alive }
615/// #     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
616/// #         self.alive = true; Ok(())
617/// #     }
618/// #     fn close(&mut self) -> ConnectionKey {
619/// #         self.alive = false;
620/// #         ConnectionKey::new("router1", "ssh")
621/// #     }
622/// # }
623/// # let factory = Arc::new(|_key: &ConnectionKey| {
624/// #     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
625/// # });
626/// let manager = ConnectionManager::with_connection_factory(factory);
627/// let key = ConnectionKey::new("router1", "ssh");
628/// let params = ResolvedConnectionParams {
629///     hostname: "10.0.0.1".to_string(),
630///     port: Some(22),
631///     username: Some("admin".to_string()),
632///     password: None,
633///     platform: None,
634///     extras: None,
635/// };
636///
637/// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
638/// runtime.block_on(async { manager.open_connection(&key, &params).await })?;
639///
640/// // Check counters
641/// let counters = manager.connection_counters_for("ssh").unwrap();
642/// assert_eq!(counters.create_calls, 1);
643/// assert_eq!(counters.open_calls, 1);
644/// # Ok::<(), String>(())
645/// ```
646///
647/// ## Cleanup
648///
649/// ```
650/// # use async_trait::async_trait;
651/// # use std::sync::Arc;
652/// # use tokio::sync::Mutex;
653/// # use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
654/// # #[derive(Debug)]
655/// # struct SshConnection { alive: bool }
656/// # #[async_trait]
657/// # impl Connection for SshConnection {
658/// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
659/// #         Box::new(SshConnection { alive: false })
660/// #     }
661/// #     fn is_alive(&self) -> bool { self.alive }
662/// #     async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
663/// #         -> Result<(), String> { self.alive = true; Ok(()) }
664/// #     fn close(&mut self) -> ConnectionKey {
665/// #         self.alive = false;
666/// #         ConnectionKey::new("router1", "ssh")
667/// #     }
668/// # }
669/// # let factory = Arc::new(|_key: &ConnectionKey| {
670/// #     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
671/// # });
672/// let manager = ConnectionManager::with_connection_factory(factory);
673/// let key1 = ConnectionKey::new("router1", "ssh");
674/// let key2 = ConnectionKey::new("router2", "ssh");
675///
676/// manager.get_or_create(key1.clone())?;
677/// manager.get_or_create(key2.clone())?;
678///
679/// // Close specific connection
680/// manager.close_connection(&key1);
681///
682/// // Close all remaining connections
683/// manager.close_all_connections();
684///
685/// let counters = manager.connection_counters_for("ssh").unwrap();
686/// assert_eq!(counters.close_calls, 2);
687/// # Ok::<(), String>(())
688/// ```
689pub struct ConnectionManager {
690    connections_map: DashMap<ConnectionKey, Arc<Mutex<dyn Connection>>>,
691    connection_factory: RwLock<Option<Arc<ConnectionFactory>>>,
692    counters: Arc<DashMap<String, ConnectionCounters>>,
693}
694
695impl fmt::Debug for ConnectionManager {
696    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697        f.debug_struct("ConnectionManager")
698            .field("connections_map_len", &self.connections_map.len())
699            .field(
700                "has_connection_factory",
701                &self
702                    .connection_factory
703                    .read()
704                    .map(|factory| factory.is_some())
705                    .unwrap_or(false),
706            )
707            .finish()
708    }
709}
710
711impl ConnectionManager {
712    pub fn with_connection_factory(factory: Arc<ConnectionFactory>) -> Self {
713        Self {
714            connections_map: DashMap::new(),
715            connection_factory: RwLock::new(Some(factory)),
716            counters: Arc::new(DashMap::new()),
717        }
718    }
719
720    fn increment_create(&self, connection_type: &str) {
721        let mut entry = self
722            .counters
723            .entry(connection_type.to_string())
724            .or_default();
725        entry.create_calls += 1;
726    }
727
728    fn increment_open(&self, connection_type: &str) {
729        let mut entry = self
730            .counters
731            .entry(connection_type.to_string())
732            .or_default();
733        entry.open_calls += 1;
734    }
735
736    fn increment_close(&self, connection_type: &str) {
737        let mut entry = self
738            .counters
739            .entry(connection_type.to_string())
740            .or_default();
741        entry.close_calls += 1;
742    }
743
744    pub fn connection_counters_for(&self, connection_type: &str) -> Option<ConnectionCounters> {
745        self.counters.get(connection_type).map(|entry| *entry)
746    }
747
748    pub fn connection_counters_snapshot(&self) -> HashMap<String, ConnectionCounters> {
749        self.counters
750            .iter()
751            .map(|entry| (entry.key().clone(), *entry.value()))
752            .collect()
753    }
754
755    pub fn get(&self, key: &ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>> {
756        self.connections_map
757            .get(key)
758            .map(|entry| entry.value().clone())
759    }
760
761    pub fn insert(&self, key: ConnectionKey, connection: Arc<Mutex<dyn Connection>>) {
762        self.connections_map.insert(key, connection);
763    }
764
765    /// Retrieves an existing connection or creates a new one using the configured factory.
766    ///
767    /// This method provides thread-safe, lazy initialization of connections. It first checks
768    /// for an existing connection in the map, and if missing, it uses the connection factory
769    /// to create one and inserts it.
770    ///
771    /// # Concurrency and Race Behavior
772    ///
773    /// - Creation uses `DashMap::entry`, so only one connection is created per unique key,
774    ///   even under concurrent access.
775    /// - The factory is called while holding the entry lock for that key’s shard. This avoids
776    ///   race conditions but means a slow factory can temporarily block other operations on the
777    ///   same shard.
778    /// - If the factory returns `None`, no entry is inserted and subsequent calls may retry.
779    ///
780    /// # Parameters
781    ///
782    /// * `key` - A `ConnectionKey` identifying the connection by hostname and connection plugin name.
783    ///
784    /// # Returns
785    ///
786    /// - `Ok(Some(connection))` if a connection exists or was created
787    /// - `Ok(None)` if the factory returns `None` (e.g., no plugin registered)
788    /// - `Err(...)` if the factory lock is poisoned or not configured
789    ///
790    /// # Errors
791    ///
792    /// - `"connection factory not set"` if no factory is configured
793    /// - `"connection factory lock poisoned"` if the factory lock is poisoned
794    ///
795    /// # Examples
796    ///
797    /// ```
798    /// use async_trait::async_trait;
799    /// use std::sync::Arc;
800    /// use tokio::sync::Mutex;
801    /// use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
802    ///
803    /// #[derive(Debug)]
804    /// struct DummyConnection;
805    ///
806    /// #[async_trait]
807    /// impl Connection for DummyConnection {
808    ///     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
809    ///         Box::new(DummyConnection)
810    ///     }
811    ///     fn is_alive(&self) -> bool { true }
812    ///     async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
813    ///         -> Result<(), String> { Ok(()) }
814    ///     fn close(&mut self) -> ConnectionKey {
815    ///         ConnectionKey::new("router1", "ssh")
816    ///     }
817    /// }
818    ///
819    /// let factory = Arc::new(|_key: &ConnectionKey| {
820    ///     Some(Arc::new(Mutex::new(DummyConnection)) as Arc<Mutex<dyn Connection>>)
821    /// });
822    /// let manager = ConnectionManager::with_connection_factory(factory);
823    ///
824    /// let key = ConnectionKey::new("router1", "ssh");
825    /// let connection = manager.get_or_create(key)?;
826    /// assert!(connection.is_some());
827    /// # Ok::<(), String>(())
828    /// ```
829    pub fn get_or_create(
830        &self,
831        key: ConnectionKey,
832    ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String> {
833        let factory = {
834            let guard = self
835                .connection_factory
836                .read()
837                .map_err(|_| "connection factory lock poisoned".to_string())?;
838            guard
839                .clone()
840                .ok_or_else(|| "connection factory not set".to_string())?
841        };
842
843        match self.connections_map.entry(key) {
844            dashmap::mapref::entry::Entry::Occupied(entry) => Ok(Some(entry.get().clone())),
845            dashmap::mapref::entry::Entry::Vacant(entry) => {
846                let key_for_factory = entry.key().clone();
847                let connection_type = key_for_factory.plugin_name.clone();
848                let Some(connection) = factory(&key_for_factory) else {
849                    return Ok(None);
850                };
851                self.increment_create(&connection_type);
852                entry.insert(connection.clone());
853                Ok(Some(connection))
854            }
855        }
856    }
857
858    pub fn set_connection_factory(&self, factory: Arc<ConnectionFactory>) {
859        if let Ok(mut slot) = self.connection_factory.write() {
860            *slot = Some(factory);
861        }
862    }
863
864    /// Close the connection associated with the given key and remove
865    /// it from `connections_map`.
866    pub fn close_connection(&self, key: &ConnectionKey) {
867        if let Some((_, connection)) = self.connections_map.remove(key) {
868            let mut connection = connection.blocking_lock();
869            connection.close();
870            self.increment_close(&key.plugin_name);
871        }
872    }
873
874    /// Close all connections in `connections_map` and then clear the map.
875    pub fn close_all_connections(&self) {
876        self.connections_map.iter().for_each(|entry| {
877            let mut connection = entry.value().blocking_lock();
878            connection.close();
879            self.increment_close(&entry.key().plugin_name);
880        });
881        self.connections_map.clear();
882    }
883
884    /// Opens a connection for the specified key, creating it if necessary.
885    ///
886    /// This method provides a high-level interface for establishing connections to remote hosts.
887    /// It combines connection retrieval/creation with automatic opening, ensuring the connection
888    /// is ready for use before returning. The method handles the full lifecycle:
889    ///
890    /// 1. **Retrieve or Create**: Calls `get_or_create()` to obtain a connection from the map
891    ///    or create a new one using the configured factory
892    /// 2. **Check Alive Status**: Acquires the connection's mutex and checks if it's already open
893    /// 3. **Open if Needed**: If the connection is not alive, calls `open()` with the provided
894    ///    parameters and increments the open counter
895    /// 4. **Return Ready Connection**: Returns the connection wrapped in `Arc<Mutex<_>>` for
896    ///    thread-safe access
897    ///
898    /// # Parameters
899    ///
900    /// * `key` - A `ConnectionKey` identifying the connection by hostname and connection plugin name.
901    ///   This key is used to look up or create the connection in the manager's map.
902    /// * `params` - A `ResolvedConnectionParams` containing the connection parameters such as
903    ///   hostname, port, username, password, and platform. These parameters are passed to the
904    ///   connection's `open()` method if the connection needs to be established.
905    ///
906    /// # Thread Safety and Locking
907    ///
908    /// The method uses a two-phase locking strategy to prevent deadlocks:
909    ///
910    /// 1. **Factory Lock**: `get_or_create()` briefly acquires the factory's `RwLock` to clone
911    ///    the `Arc<ConnectionFactory>`, then releases it before calling the factory function.
912    ///    This prevents holding the factory lock during connection creation.
913    ///
914    /// 2. **Connection Lock**: After obtaining the connection, the method acquires its `Mutex`
915    ///    in a scoped block (`{ ... }`). The lock is automatically released when the scope ends,
916    ///    before returning the connection. This allows the caller to acquire the lock again
917    ///    without deadlock.
918    ///
919    /// **Why the scoped lock?**
920    /// ```text
921    /// Without scope:                    With scope:
922    /// ---------------                   -----------
923    /// let mut guard = conn.lock();      {
924    /// guard.open(params)?;                  let mut guard = conn.lock();
925    /// // guard still held                   guard.open(params)?;
926    /// Ok(Some(conn))                    } // guard dropped here
927    /// // Caller tries conn.lock()       Ok(Some(conn))
928    /// // DEADLOCK! 💥                   // Caller can lock successfully ✓
929    /// ```
930    ///
931    /// # Connection Lifecycle
932    ///
933    /// The method respects the connection's alive state:
934    /// - If `is_alive()` returns `true`, the connection is already open and no action is taken
935    /// - If `is_alive()` returns `false`, `open()` is called to establish the connection
936    /// - The `open_calls` counter is incremented only when `open()` is actually called
937    ///
938    /// This prevents unnecessary reconnection attempts and tracks actual connection operations.
939    ///
940    /// # Returns
941    ///
942    /// Returns `Ok(Some(Arc<Mutex<dyn Connection>>))` if:
943    /// - The connection was successfully retrieved or created, AND
944    /// - The connection was already alive OR was successfully opened
945    ///
946    /// Returns `Ok(None)` if:
947    /// - The factory function returned `None` (e.g., no plugin registered for this connection plugin name)
948    ///
949    /// Returns `Err(String)` if:
950    /// - The connection factory is not configured: `"connection factory not set"`
951    /// - The factory lock is poisoned: `"connection factory lock poisoned"`
952    /// - The connection lock is poisoned: `"connection lock poisoned"`
953    /// - The connection's `open()` method returns an error (error message from the connection)
954    ///
955    /// # Examples
956    ///
957    /// ## Basic Usage
958    ///
959    /// ```
960    /// use async_trait::async_trait;
961    /// use std::sync::Arc;
962    /// use tokio::runtime::Builder;
963    /// use tokio::sync::Mutex;
964    /// use genja_core::inventory::{
965    ///     Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
966    /// };
967    ///
968    /// #[derive(Debug)]
969    /// struct SshConnection {
970    ///     alive: bool,
971    /// }
972    ///
973    /// #[async_trait]
974    /// impl Connection for SshConnection {
975    ///     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
976    ///         Box::new(SshConnection { alive: false })
977    ///     }
978    ///
979    ///     fn is_alive(&self) -> bool {
980    ///         self.alive
981    ///     }
982    ///
983    ///     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
984    ///         self.alive = true;
985    ///         Ok(())
986    ///     }
987    ///
988    ///     fn close(&mut self) -> ConnectionKey {
989    ///         self.alive = false;
990    ///         ConnectionKey::new("router1", "ssh")
991    ///     }
992    /// }
993    ///
994    /// let factory = Arc::new(|_key: &ConnectionKey| {
995    ///     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
996    /// });
997    /// let manager = ConnectionManager::with_connection_factory(factory);
998    ///
999    /// let key = ConnectionKey::new("router1", "ssh");
1000    /// let params = ResolvedConnectionParams {
1001    ///     hostname: "10.0.0.1".to_string(),
1002    ///     port: Some(22),
1003    ///     username: Some("admin".to_string()),
1004    ///     password: None,
1005    ///     platform: None,
1006    ///     extras: None,
1007    /// };
1008    ///
1009    /// // First call creates and opens the connection
1010    /// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
1011    /// let connection = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
1012    /// assert!(connection.is_some());
1013    ///
1014    /// // Second call reuses the existing connection without reopening
1015    /// let same_connection = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
1016    /// assert!(Arc::ptr_eq(&connection.unwrap(), &same_connection.unwrap()));
1017    /// # Ok::<(), String>(())
1018    /// ```
1019    ///
1020    /// ## Handling Missing Plugins
1021    ///
1022    /// ```
1023    /// use std::sync::Arc;
1024    /// use tokio::runtime::Builder;
1025    /// use genja_core::inventory::{ConnectionKey, ConnectionManager, ResolvedConnectionParams};
1026    ///
1027    /// // Factory returns None for unknown connection plugin names
1028    /// let factory = Arc::new(|key: &ConnectionKey| {
1029    ///     if key.plugin_name == "ssh" {
1030    ///         // ... return SSH connection
1031    ///         None // simplified for example
1032    ///     } else {
1033    ///         None // No plugin for this type
1034    ///     }
1035    /// });
1036    /// let manager = ConnectionManager::with_connection_factory(factory);
1037    ///
1038    /// let key = ConnectionKey::new("router1", "telnet");
1039    /// let params = ResolvedConnectionParams {
1040    ///     hostname: "10.0.0.1".to_string(),
1041    ///     port: None,
1042    ///     username: None,
1043    ///     password: None,
1044    ///     platform: None,
1045    ///     extras: None,
1046    /// };
1047    ///
1048    /// let runtime = Builder::new_current_thread().enable_all().build().unwrap();
1049    /// let result = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
1050    /// assert!(result.is_none()); // No plugin available
1051    /// # Ok::<(), String>(())
1052    /// ```
1053    ///
1054    /// ## Thread-Safe Concurrent Access
1055    ///
1056    /// ```
1057    /// use async_trait::async_trait;
1058    /// use std::sync::Arc;
1059    /// use std::thread;
1060    /// use tokio::runtime::Builder;
1061    /// use tokio::sync::Mutex;
1062    /// use genja_core::inventory::{
1063    ///     Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
1064    /// };
1065    ///
1066    /// # #[derive(Debug)]
1067    /// # struct SshConnection { alive: bool }
1068    /// # #[async_trait]
1069    /// # impl Connection for SshConnection {
1070    /// #     fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
1071    /// #         Box::new(SshConnection { alive: false })
1072    /// #     }
1073    /// #     fn is_alive(&self) -> bool { self.alive }
1074    /// #     async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
1075    /// #         self.alive = true;
1076    /// #         Ok(())
1077    /// #     }
1078    /// #     fn close(&mut self) -> ConnectionKey {
1079    /// #         self.alive = false;
1080    /// #         ConnectionKey::new("router1", "ssh")
1081    /// #     }
1082    /// # }
1083    /// let factory = Arc::new(|_key: &ConnectionKey| {
1084    ///     Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
1085    /// });
1086    /// let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
1087    /// let runtime = Arc::new(Builder::new_current_thread().enable_all().build().unwrap());
1088    ///
1089    /// let key = ConnectionKey::new("router1", "ssh");
1090    /// let params = Arc::new(ResolvedConnectionParams {
1091    ///     hostname: "10.0.0.1".to_string(),
1092    ///     port: None,
1093    ///     username: None,
1094    ///     password: None,
1095    ///     platform: None,
1096    ///     extras: None,
1097    /// });
1098    ///
1099    /// // Multiple threads can safely open the same connection
1100    /// let handles: Vec<_> = (0..3)
1101    ///     .map(|_| {
1102    ///         let manager = Arc::clone(&manager);
1103    ///         let runtime = Arc::clone(&runtime);
1104    ///         let key = key.clone();
1105    ///         let params = Arc::clone(&params);
1106    ///         thread::spawn(move || {
1107    ///             runtime.block_on(async { manager.open_connection(&key, &params).await })
1108    ///         })
1109    ///     })
1110    ///     .collect();
1111    ///
1112    /// for handle in handles {
1113    ///     let result = handle.join().unwrap();
1114    ///     assert!(result.is_ok());
1115    /// }
1116    /// ```
1117    pub async fn open_connection(
1118        &self,
1119        key: &ConnectionKey,
1120        params: &ResolvedConnectionParams,
1121    ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String> {
1122        let Some(connection) = self.get_or_create(key.clone())? else {
1123            return Ok(None);
1124        };
1125
1126        {
1127            let mut guard = connection.lock().await;
1128            if !guard.is_alive() {
1129                guard.open(params).await?;
1130                self.increment_open(&key.plugin_name);
1131            }
1132        }
1133        Ok(Some(connection))
1134    }
1135}
1136
1137impl Default for ConnectionManager {
1138    fn default() -> Self {
1139        Self {
1140            connections_map: DashMap::new(),
1141            connection_factory: RwLock::new(None),
1142            counters: Arc::new(DashMap::new()),
1143        }
1144    }
1145}