Skip to main content

ConnectionManager

Struct ConnectionManager 

Source
pub struct ConnectionManager { /* private fields */ }
Expand description

Thread-safe manager for connection lifecycle and pooling.

ConnectionManager provides centralized management of connections to remote hosts, handling connection creation, caching, opening, and closing. It uses a factory pattern to create connections dynamically based on connection plugin name, and maintains a pool of active connections for reuse across multiple operations.

The manager is designed for concurrent access and uses lock-free data structures (DashMap) for the connection pool and counters, with an RwLock for the factory to minimize contention.

§Architecture

The manager consists of four main components:

  1. Connection Pool (connections_map): A DashMap storing active connections keyed by ConnectionKey (hostname + connection plugin name). Connections are wrapped in Arc<Mutex<_>> for thread-safe sharing and interior mutability.

  2. Connection Factory (connection_factory): An optional factory function that creates new connections on demand. The factory is wrapped in RwLock<Option<Arc<_>>> to allow runtime configuration while supporting concurrent reads.

  3. Usage Counters (counters): A DashMap tracking create, open, and close operations per connection plugin name. Useful for monitoring, debugging, and testing.

  4. Caching Strategy: Connections are created lazily on first access and cached for subsequent use. The same connection instance is reused until explicitly closed.

§Connection Lifecycle

  1. Creation: When get_or_create() is called with a new key, the factory is invoked to create a connection. The connection is inserted into the pool and the create_calls counter is incremented.

  2. Opening: The open_connection() method checks if a connection is alive before calling open(). Only actual open operations increment the open_calls counter.

  3. Reuse: Subsequent calls with the same key return the cached connection without creating a new one or reopening it if it’s still alive.

  4. Closing: Connections can be closed individually via close_connection() or all at once via close_all_connections(). Closed connections are removed from the pool and the close_calls counter is incremented.

§Thread Safety

The manager is fully thread-safe and designed for concurrent access:

  • Lock-Free Pool: DashMap provides concurrent access to the connection pool without requiring a global lock. Different threads can access different connections simultaneously.

  • Per-Connection Locking: Each connection is wrapped in Mutex, allowing fine-grained locking. Only the thread actively using a connection holds its lock.

  • Factory Configuration: The factory uses RwLock to allow multiple concurrent reads (connection creation) while serializing writes (factory updates).

  • Lock Ordering: Methods acquire locks in a consistent order (factory → connection) and release them promptly to prevent deadlocks.

§Factory Pattern

The connection factory is a function that takes a ConnectionKey and returns an optional connection. This design allows:

  • Plugin-Based Architecture: Different connection plugin names (SSH, NETCONF, HTTP) can be registered dynamically via plugins.

  • Lazy Loading: Connections are only created when needed, reducing startup time and resource usage.

  • Graceful Degradation: If no plugin is registered for a connection plugin name, the factory returns None and the manager propagates this to the caller.

§Usage Counters

The manager tracks three types of operations per connection plugin name:

  • create_calls: Number of times a new connection was created
  • open_calls: Number of times open() was called on a connection
  • close_calls: Number of times a connection was closed

These counters are useful for:

  • Monitoring connection pool efficiency
  • Debugging connection leaks or excessive creation
  • Testing connection lifecycle behavior

§Examples

§Basic Setup with Factory

use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;
use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};

#[derive(Debug)]
struct SshConnection {
    alive: bool,
}

#[async_trait]
impl Connection for SshConnection {
    fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
        Box::new(SshConnection { alive: false })
    }

    fn is_alive(&self) -> bool {
        self.alive
    }

    async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
        -> Result<(), String> {
        self.alive = true;
        Ok(())
    }

    fn close(&mut self) -> ConnectionKey {
        self.alive = false;
        ConnectionKey::new("router1", "ssh")
    }
}

// Create a factory that returns SSH connections
let factory = Arc::new(|key: &ConnectionKey| {
    if key.plugin_name == "ssh" {
        Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
    } else {
        None
    }
});

let manager = ConnectionManager::with_connection_factory(factory);

§Connection Reuse

let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");

// First access creates the connection
let conn1 = manager.get_or_create(key.clone())?.unwrap();

// Second access returns the same connection
let conn2 = manager.get_or_create(key)?.unwrap();

assert!(Arc::ptr_eq(&conn1, &conn2));

§Monitoring Connection Usage

let manager = ConnectionManager::with_connection_factory(factory);
let key = ConnectionKey::new("router1", "ssh");
let params = ResolvedConnectionParams {
    hostname: "10.0.0.1".to_string(),
    port: Some(22),
    username: Some("admin".to_string()),
    password: None,
    platform: None,
    extras: None,
};

let runtime = Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async { manager.open_connection(&key, &params).await })?;

// Check counters
let counters = manager.connection_counters_for("ssh").unwrap();
assert_eq!(counters.create_calls, 1);
assert_eq!(counters.open_calls, 1);

§Cleanup

let manager = ConnectionManager::with_connection_factory(factory);
let key1 = ConnectionKey::new("router1", "ssh");
let key2 = ConnectionKey::new("router2", "ssh");

manager.get_or_create(key1.clone())?;
manager.get_or_create(key2.clone())?;

// Close specific connection
manager.close_connection(&key1);

// Close all remaining connections
manager.close_all_connections();

let counters = manager.connection_counters_for("ssh").unwrap();
assert_eq!(counters.close_calls, 2);

Implementations§

Source§

impl ConnectionManager

Source

pub fn with_connection_factory(factory: Arc<ConnectionFactory>) -> Self

Source

pub fn connection_counters_for( &self, connection_type: &str, ) -> Option<ConnectionCounters>

Source

pub fn connection_counters_snapshot( &self, ) -> HashMap<String, ConnectionCounters>

Source

pub fn get(&self, key: &ConnectionKey) -> Option<Arc<Mutex<dyn Connection>>>

Source

pub fn insert(&self, key: ConnectionKey, connection: Arc<Mutex<dyn Connection>>)

Source

pub fn get_or_create( &self, key: ConnectionKey, ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>

Retrieves an existing connection or creates a new one using the configured factory.

This method provides thread-safe, lazy initialization of connections. It first checks for an existing connection in the map, and if missing, it uses the connection factory to create one and inserts it.

§Concurrency and Race Behavior
  • Creation uses DashMap::entry, so only one connection is created per unique key, even under concurrent access.
  • The factory is called while holding the entry lock for that key’s shard. This avoids race conditions but means a slow factory can temporarily block other operations on the same shard.
  • If the factory returns None, no entry is inserted and subsequent calls may retry.
§Parameters
  • key - A ConnectionKey identifying the connection by hostname and connection plugin name.
§Returns
  • Ok(Some(connection)) if a connection exists or was created
  • Ok(None) if the factory returns None (e.g., no plugin registered)
  • Err(...) if the factory lock is poisoned or not configured
§Errors
  • "connection factory not set" if no factory is configured
  • "connection factory lock poisoned" if the factory lock is poisoned
§Examples
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;
use genja_core::inventory::{Connection, ConnectionKey, ConnectionManager};

#[derive(Debug)]
struct DummyConnection;

#[async_trait]
impl Connection for DummyConnection {
    fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
        Box::new(DummyConnection)
    }
    fn is_alive(&self) -> bool { true }
    async fn open(&mut self, _params: &genja_core::inventory::ResolvedConnectionParams)
        -> Result<(), String> { Ok(()) }
    fn close(&mut self) -> ConnectionKey {
        ConnectionKey::new("router1", "ssh")
    }
}

let factory = Arc::new(|_key: &ConnectionKey| {
    Some(Arc::new(Mutex::new(DummyConnection)) as Arc<Mutex<dyn Connection>>)
});
let manager = ConnectionManager::with_connection_factory(factory);

let key = ConnectionKey::new("router1", "ssh");
let connection = manager.get_or_create(key)?;
assert!(connection.is_some());
Source

pub fn set_connection_factory(&self, factory: Arc<ConnectionFactory>)

Source

pub fn close_connection(&self, key: &ConnectionKey)

Close the connection associated with the given key and remove it from connections_map.

Source

pub fn close_all_connections(&self)

Close all connections in connections_map and then clear the map.

Source

pub async fn open_connection( &self, key: &ConnectionKey, params: &ResolvedConnectionParams, ) -> Result<Option<Arc<Mutex<dyn Connection>>>, String>

Opens a connection for the specified key, creating it if necessary.

This method provides a high-level interface for establishing connections to remote hosts. It combines connection retrieval/creation with automatic opening, ensuring the connection is ready for use before returning. The method handles the full lifecycle:

  1. Retrieve or Create: Calls get_or_create() to obtain a connection from the map or create a new one using the configured factory
  2. Check Alive Status: Acquires the connection’s mutex and checks if it’s already open
  3. Open if Needed: If the connection is not alive, calls open() with the provided parameters and increments the open counter
  4. Return Ready Connection: Returns the connection wrapped in Arc<Mutex<_>> for thread-safe access
§Parameters
  • key - A ConnectionKey identifying the connection by hostname and connection plugin name. This key is used to look up or create the connection in the manager’s map.
  • params - A ResolvedConnectionParams containing the connection parameters such as hostname, port, username, password, and platform. These parameters are passed to the connection’s open() method if the connection needs to be established.
§Thread Safety and Locking

The method uses a two-phase locking strategy to prevent deadlocks:

  1. Factory Lock: get_or_create() briefly acquires the factory’s RwLock to clone the Arc<ConnectionFactory>, then releases it before calling the factory function. This prevents holding the factory lock during connection creation.

  2. Connection Lock: After obtaining the connection, the method acquires its Mutex in a scoped block ({ ... }). The lock is automatically released when the scope ends, before returning the connection. This allows the caller to acquire the lock again without deadlock.

Why the scoped lock?

Without scope:                    With scope:
---------------                   -----------
let mut guard = conn.lock();      {
guard.open(params)?;                  let mut guard = conn.lock();
// guard still held                   guard.open(params)?;
Ok(Some(conn))                    } // guard dropped here
// Caller tries conn.lock()       Ok(Some(conn))
// DEADLOCK! 💥                   // Caller can lock successfully ✓
§Connection Lifecycle

The method respects the connection’s alive state:

  • If is_alive() returns true, the connection is already open and no action is taken
  • If is_alive() returns false, open() is called to establish the connection
  • The open_calls counter is incremented only when open() is actually called

This prevents unnecessary reconnection attempts and tracks actual connection operations.

§Returns

Returns Ok(Some(Arc<Mutex<dyn Connection>>)) if:

  • The connection was successfully retrieved or created, AND
  • The connection was already alive OR was successfully opened

Returns Ok(None) if:

  • The factory function returned None (e.g., no plugin registered for this connection plugin name)

Returns Err(String) if:

  • The connection factory is not configured: "connection factory not set"
  • The factory lock is poisoned: "connection factory lock poisoned"
  • The connection lock is poisoned: "connection lock poisoned"
  • The connection’s open() method returns an error (error message from the connection)
§Examples
§Basic Usage
use async_trait::async_trait;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use genja_core::inventory::{
    Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
};

#[derive(Debug)]
struct SshConnection {
    alive: bool,
}

#[async_trait]
impl Connection for SshConnection {
    fn create(&self, _key: &ConnectionKey) -> Box<dyn Connection> {
        Box::new(SshConnection { alive: false })
    }

    fn is_alive(&self) -> bool {
        self.alive
    }

    async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
        self.alive = true;
        Ok(())
    }

    fn close(&mut self) -> ConnectionKey {
        self.alive = false;
        ConnectionKey::new("router1", "ssh")
    }
}

let factory = Arc::new(|_key: &ConnectionKey| {
    Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
});
let manager = ConnectionManager::with_connection_factory(factory);

let key = ConnectionKey::new("router1", "ssh");
let params = ResolvedConnectionParams {
    hostname: "10.0.0.1".to_string(),
    port: Some(22),
    username: Some("admin".to_string()),
    password: None,
    platform: None,
    extras: None,
};

// First call creates and opens the connection
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
let connection = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
assert!(connection.is_some());

// Second call reuses the existing connection without reopening
let same_connection = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
assert!(Arc::ptr_eq(&connection.unwrap(), &same_connection.unwrap()));
§Handling Missing Plugins
use std::sync::Arc;
use tokio::runtime::Builder;
use genja_core::inventory::{ConnectionKey, ConnectionManager, ResolvedConnectionParams};

// Factory returns None for unknown connection plugin names
let factory = Arc::new(|key: &ConnectionKey| {
    if key.plugin_name == "ssh" {
        // ... return SSH connection
        None // simplified for example
    } else {
        None // No plugin for this type
    }
});
let manager = ConnectionManager::with_connection_factory(factory);

let key = ConnectionKey::new("router1", "telnet");
let params = ResolvedConnectionParams {
    hostname: "10.0.0.1".to_string(),
    port: None,
    username: None,
    password: None,
    platform: None,
    extras: None,
};

let runtime = Builder::new_current_thread().enable_all().build().unwrap();
let result = runtime.block_on(async { manager.open_connection(&key, &params).await })?;
assert!(result.is_none()); // No plugin available
§Thread-Safe Concurrent Access
use async_trait::async_trait;
use std::sync::Arc;
use std::thread;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use genja_core::inventory::{
    Connection, ConnectionKey, ConnectionManager, ResolvedConnectionParams
};

let factory = Arc::new(|_key: &ConnectionKey| {
    Some(Arc::new(Mutex::new(SshConnection { alive: false })) as Arc<Mutex<dyn Connection>>)
});
let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
let runtime = Arc::new(Builder::new_current_thread().enable_all().build().unwrap());

let key = ConnectionKey::new("router1", "ssh");
let params = Arc::new(ResolvedConnectionParams {
    hostname: "10.0.0.1".to_string(),
    port: None,
    username: None,
    password: None,
    platform: None,
    extras: None,
});

// Multiple threads can safely open the same connection
let handles: Vec<_> = (0..3)
    .map(|_| {
        let manager = Arc::clone(&manager);
        let runtime = Arc::clone(&runtime);
        let key = key.clone();
        let params = Arc::clone(&params);
        thread::spawn(move || {
            runtime.block_on(async { manager.open_connection(&key, &params).await })
        })
    })
    .collect();

for handle in handles {
    let result = handle.join().unwrap();
    assert!(result.is_ok());
}

Trait Implementations§

Source§

impl Debug for ConnectionManager

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for ConnectionManager

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.