pub struct ConnectionManager { /* private fields */ }Expand description
Thread-safe manager for connection lifecycle and pooling.
ConnectionManager provides centralized management of connections to remote hosts,
handling connection creation, caching, opening, and closing. It uses a factory pattern
to create connections dynamically based on connection plugin name, and maintains a pool of
active connections for reuse across multiple operations.
The manager is designed for concurrent access and uses lock-free data structures
(DashMap) for the connection pool and counters, with an RwLock for the factory
to minimize contention.
§Architecture
The manager consists of four main components:
-
Connection Pool (
connections_map): ADashMapstoring active connections keyed byConnectionKey(hostname + connection plugin name). Connections are wrapped inArc<Mutex<_>>for thread-safe sharing and interior mutability. -
Connection Factory (
connection_factory): An optional factory function that creates new connections on demand. The factory is wrapped inRwLock<Option<Arc<_>>>to allow runtime configuration while supporting concurrent reads. -
Usage Counters (
counters): ADashMaptracking create, open, and close operations per connection plugin name. Useful for monitoring, debugging, and testing. -
Caching Strategy: Connections are created lazily on first access and cached for subsequent use. The same connection instance is reused until explicitly closed.
§Connection Lifecycle
-
Creation: When
get_or_create()is called with a new key, the factory is invoked to create a connection. The connection is inserted into the pool and thecreate_callscounter is incremented. -
Opening: The
open_connection()method checks if a connection is alive before callingopen(). Only actual open operations increment theopen_callscounter. -
Reuse: Subsequent calls with the same key return the cached connection without creating a new one or reopening it if it’s still alive.
-
Closing: Connections can be closed individually via
close_connection()or all at once viaclose_all_connections(). Closed connections are removed from the pool and theclose_callscounter is incremented.
§Thread Safety
The manager is fully thread-safe and designed for concurrent access:
-
Lock-Free Pool:
DashMapprovides concurrent access to the connection pool without requiring a global lock. Different threads can access different connections simultaneously. -
Per-Connection Locking: Each connection is wrapped in
Mutex, allowing fine-grained locking. Only the thread actively using a connection holds its lock. -
Factory Configuration: The factory uses
RwLockto allow multiple concurrent reads (connection creation) while serializing writes (factory updates). -
Lock Ordering: Methods acquire locks in a consistent order (factory → connection) and release them promptly to prevent deadlocks.
§Factory Pattern
The connection factory is a function that takes a ConnectionKey and returns an
optional connection. This design allows:
-
Plugin-Based Architecture: Different connection plugin names (SSH, NETCONF, HTTP) can be registered dynamically via plugins.
-
Lazy Loading: Connections are only created when needed, reducing startup time and resource usage.
-
Graceful Degradation: If no plugin is registered for a connection plugin name, the factory returns
Noneand the manager propagates this to the caller.
§Usage Counters
The manager tracks three types of operations per connection plugin name:
create_calls: Number of times a new connection was createdopen_calls: Number of timesopen()was called on a connectionclose_calls: Number of times a connection was closed
These counters are useful for:
- Monitoring connection pool efficiency
- Debugging connection leaks or excessive creation
- Testing connection lifecycle behavior
§Examples
§Basic Setup with Factory
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;
use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
#[derive(Debug)]
struct SshConnection {
alive: bool,
}
#[async_trait]
impl Connection for SshConnection {
fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
Box::new(SshConnection { alive: false })
}
fn is_alive(&self) -> bool {
self.alive
}
async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
-> Result<(), String> {
self.alive = true;
Ok(())
}
fn close(&mut self) -> ConnectionKey {
self.alive = false;
ConnectionKey::new("router1", "ssh")
}
}
// Create a factory that returns SSH connections
let factory = Arc::new(|key: &ConnectionKey| {
if key.plugin_name == "ssh" {
Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
} else {
None
}
});
let manager = ConnectionManager::with_connection_factory(factory);§Connection Reuse
let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");
// First access creates the connection
let conn1 = manager.get_or_create(key.clone())?.unwrap();
// Second access returns the same connection
let conn2 = manager.get_or_create(key)?.unwrap();
assert!(Arc::ptr_eq(&conn1, &conn2));§Monitoring Connection Usage
let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");
let params = ResolvedConnectionParams {
hostname: "10.0.0.1".to_string(),
port: Some(22),
username: Some("admin".to_string()),
password: None,
platform: None,
extras: None,
};
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
// Check counters
let counters = manager.connection_counters_for("ssh").unwrap();
assert_eq!(counters.create_calls, 1);
assert_eq!(counters.open_calls, 1);§Cleanup
let manager = ConnectionManager::with_connection_factory(factory);
let key1 = ConnectionKey::new("router1", "ssh");
let key2 = ConnectionKey::new("router2", "ssh");
manager.get_or_create(key1.clone())?;
manager.get_or_create(key2.clone())?;
// Close specific connection
manager.close_connection(&key1);
// Close all remaining connections
manager.close_all_connections();
let counters = manager.connection_counters_for("ssh").unwrap();
assert_eq!(counters.close_calls, 2);Implementations§
Source§impl ConnectionManager
impl ConnectionManager
pub fn with_connection_factory(factory: Arc<ConnectionFactory>) -> Self
pub fn connection_counters_for( &self, connection_type: &str, ) -> Option<ConnectionCounters>
pub fn connection_counters_snapshot( &self, ) -> HashMap<String, ConnectionCounters>
pub fn get(&self, key: &ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>>
pub fn insert(&self, key: ConnectionKey, connection: Arc<Mutex<dyn Connection>>)
Sourcepub fn get_or_create(
&self,
key: ConnectionKey,
) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>
pub fn get_or_create( &self, key: ConnectionKey, ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>
Retrieves an existing connection or creates a new one using the configured factory.
This method provides thread-safe, lazy initialization of connections. It first checks for an existing connection in the map, and if missing, it uses the connection factory to create one and inserts it.
§Concurrency and Race Behavior
- Creation uses
DashMap::entry, so only one connection is created per unique key, even under concurrent access. - The factory is called while holding the entry lock for that key’s shard. This avoids race conditions but means a slow factory can temporarily block other operations on the same shard.
- If the factory returns
None, no entry is inserted and subsequent calls may retry.
§Parameters
key- AConnectionKeyidentifying the connection by hostname and connection plugin name.
§Returns
Ok(Some(connection))if a connection exists or was createdOk(None)if the factory returnsNone(e.g., no plugin registered)Err(...)if the factory lock is poisoned or not configured
§Errors
"connection factory not set"if no factory is configured"connection factory lock poisoned"if the factory lock is poisoned
§Examples
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;
use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};
#[derive(Debug)]
struct DummyConnection;
#[async_trait]
impl Connection for DummyConnection {
fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
Box::new(DummyConnection)
}
fn is_alive(&self) -> bool { true }
async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
-> Result<(), String> { Ok(()) }
fn close(&mut self) -> ConnectionKey {
ConnectionKey::new("router1", "ssh")
}
}
let factory = Arc::new(|_key: &ConnectionKey| {
Some(Arc::new(Mutex::new(DummyConnection)) as Arc<Mutex<dyn Connection>>)
});
let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");
let connection = manager.get_or_create(key)?;
assert!(connection.is_some());pub fn set_connection_factory(&self, factory: Arc<ConnectionFactory>)
Sourcepub fn close_connection(&self, key: &ConnectionKey)
pub fn close_connection(&self, key: &ConnectionKey)
Close the connection associated with the given key and remove
it from connections_map.
Sourcepub fn close_all_connections(&self)
pub fn close_all_connections(&self)
Close all connections in connections_map and then clear the map.
Sourcepub async fn open_connection(
&self,
key: &ConnectionKey,
params: &ResolvedConnectionParams,
) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>
pub async fn open_connection( &self, key: &ConnectionKey, params: &ResolvedConnectionParams, ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>
Opens a connection for the specified key, creating it if necessary.
This method provides a high-level interface for establishing connections to remote hosts. It combines connection retrieval/creation with automatic opening, ensuring the connection is ready for use before returning. The method handles the full lifecycle:
- Retrieve or Create: Calls
get_or_create()to obtain a connection from the map or create a new one using the configured factory - Check Alive Status: Acquires the connection’s mutex and checks if it’s already open
- Open if Needed: If the connection is not alive, calls
open()with the provided parameters and increments the open counter - Return Ready Connection: Returns the connection wrapped in
Arc<Mutex<_>>for thread-safe access
§Parameters
key- AConnectionKeyidentifying the connection by hostname and connection plugin name. This key is used to look up or create the connection in the manager’s map.params- AResolvedConnectionParamscontaining the connection parameters such as hostname, port, username, password, and platform. These parameters are passed to the connection’sopen()method if the connection needs to be established.
§Thread Safety and Locking
The method uses a two-phase locking strategy to prevent deadlocks:
-
Factory Lock:
get_or_create()briefly acquires the factory’sRwLockto clone theArc<ConnectionFactory>, then releases it before calling the factory function. This prevents holding the factory lock during connection creation. -
Connection Lock: After obtaining the connection, the method acquires its
Mutexin a scoped block ({ ... }). The lock is automatically released when the scope ends, before returning the connection. This allows the caller to acquire the lock again without deadlock.
Why the scoped lock?
Without scope: With scope:
--------------- -----------
let mut guard = conn.lock(); {
guard.open(params)?; let mut guard = conn.lock();
// guard still held guard.open(params)?;
Ok(Some(conn)) } // guard dropped here
// Caller tries conn.lock() Ok(Some(conn))
// DEADLOCK! 💥 // Caller can lock successfully ✓§Connection Lifecycle
The method respects the connection’s alive state:
- If
is_alive()returnstrue, the connection is already open and no action is taken - If
is_alive()returnsfalse,open()is called to establish the connection - The
open_callscounter is incremented only whenopen()is actually called
This prevents unnecessary reconnection attempts and tracks actual connection operations.
§Returns
Returns Ok(Some(Arc<Mutex<dyn Connection>>)) if:
- The connection was successfully retrieved or created, AND
- The connection was already alive OR was successfully opened
Returns Ok(None) if:
- The factory function returned
None(e.g., no plugin registered for this connection plugin name)
Returns Err(String) if:
- The connection factory is not configured:
"connection factory not set" - The factory lock is poisoned:
"connection factory lock poisoned" - The connection lock is poisoned:
"connection lock poisoned" - The connection’s
open()method returns an error (error message from the connection)
§Examples
§Basic Usage
use async_trait::async_trait;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use genja_core::inventory::{
Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
};
#[derive(Debug)]
struct SshConnection {
alive: bool,
}
#[async_trait]
impl Connection for SshConnection {
fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
Box::new(SshConnection { alive: false })
}
fn is_alive(&self) -> bool {
self.alive
}
async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
self.alive = true;
Ok(())
}
fn close(&mut self) -> ConnectionKey {
self.alive = false;
ConnectionKey::new("router1", "ssh")
}
}
let factory = Arc::new(|_key: &ConnectionKey| {
Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
});
let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");
let params = ResolvedConnectionParams {
hostname: "10.0.0.1".to_string(),
port: Some(22),
username: Some("admin".to_string()),
password: None,
platform: None,
extras: None,
};
// First call creates and opens the connection
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
let connection = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
assert!(connection.is_some());
// Second call reuses the existing connection without reopening
let same_connection = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
assert!(Arc::ptr_eq(&connection.unwrap(), &same_connection.unwrap()));§Handling Missing Plugins
use std::sync::Arc;
use tokio::runtime::Builder;
use genja_core::inventory::{ConnectionKey, ConnectionManager, ResolvedConnectionParams};
// Factory returns None for unknown connection plugin names
let factory = Arc::new(|key: &ConnectionKey| {
if key.plugin_name == "ssh" {
// ... return SSH connection
None // simplified for example
} else {
None // No plugin for this type
}
});
let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "telnet");
let params = ResolvedConnectionParams {
hostname: "10.0.0.1".to_string(),
port: None,
username: None,
password: None,
platform: None,
extras: None,
};
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
let result = runtime.block_on(async { manager.open_connection(&key, ¶ms).await })?;
assert!(result.is_none()); // No plugin available§Thread-Safe Concurrent Access
use async_trait::async_trait;
use std::sync::Arc;
use std::thread;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use genja_core::inventory::{
Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
};
let factory = Arc::new(|_key: &ConnectionKey| {
Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
});
let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
let runtime = Arc::new(Builder::new_current_thread().enable_all().build().unwrap());
let key = ConnectionKey::new("router1", "ssh");
let params = Arc::new(ResolvedConnectionParams {
hostname: "10.0.0.1".to_string(),
port: None,
username: None,
password: None,
platform: None,
extras: None,
});
// Multiple threads can safely open the same connection
let handles: Vec<_> = (0..3)
.map(|_| {
let manager = Arc::clone(&manager);
let runtime = Arc::clone(&runtime);
let key = key.clone();
let params = Arc::clone(¶ms);
thread::spawn(move || {
runtime.block_on(async { manager.open_connection(&key, ¶ms).await })
})
})
.collect();
for handle in handles {
let result = handle.join().unwrap();
assert!(result.is_ok());
}