Skip to main content

genja_plugin_manager/
connection_factory.rs

1//! Connection factory implementation for plugin-based connections.
2//!
3//! This module provides the infrastructure to integrate plugin-based connections
4//! into the core inventory system's connection management. It bridges the gap
5//! between the `PluginConnection` trait used by plugins and the `Connection` trait
6//! expected by the inventory system.
7//!
8//! # Overview
9//!
10//! The module consists of two main components:
11//!
12//! 1. **[`PluginConnectionAdapter`]** - An adapter that wraps `PluginConnection`
13//!    implementations and provides the `Connection` interface. It tracks connection
14//!    lifecycle state and delegates operations to the underlying plugin.
15//!
16//! 2. **[`build_connection_factory`]** - A factory function that creates a
17//!    `ConnectionFactory` closure. This factory looks up plugins by connection type
18//!    and creates appropriate connection instances wrapped in adapters.
19//!
20//! # Architecture
21//!
22//! ```text
23//! ┌─────────────────────────────────────────────────────────────┐
24//! │                    Connection Manager                       │
25//! │                  (genja_core::inventory)                    │
26//! └─────────────────────────┬───────────────────────────────────┘
27//!                           │
28//!                           │ Uses ConnectionFactory
29//!                           ▼
30//! ┌─────────────────────────────────────────────────────────────┐
31//! │              build_connection_factory()                     │
32//! │         Returns: Arc<ConnectionFactory>                     │
33//! └─────────────────────────┬───────────────────────────────────┘
34//!                           │
35//!                           │ Queries for plugins
36//!                           ▼
37//! ┌─────────────────────────────────────────────────────────────┐
38//! │                    PluginManager                            │
39//! │              (Registered Connection Plugins)                │
40//! └─────────────────────────┬───────────────────────────────────┘
41//!                           │
42//!                           │ Returns plugin instance
43//!                           ▼
44//! ┌─────────────────────────────────────────────────────────────┐
45//! │              PluginConnectionAdapter                        │
46//! │         Wraps: Box<dyn PluginConnection>                    │
47//! │         Implements: Connection trait                        │
48//! └─────────────────────────┬───────────────────────────────────┘
49//!                           │
50//!                           │ Delegates to
51//!                           ▼
52//! ┌─────────────────────────────────────────────────────────────┐
53//! │              Plugin Implementation                          │
54//! │         (e.g., SSH, Telnet, NETCONF)                        │
55//! │         Implements: PluginConnection trait                  │
56//! └─────────────────────────────────────────────────────────────┘
57//! ```
58//!
59//! # Usage
60//!
61//! ## Basic Setup
62//!
63//! ```no_run
64//! use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
65//! use genja_core::inventory::{ConnectionManager, ConnectionKey};
66//! use std::sync::Arc;
67//!
68//! // 1. Create and configure plugin manager
69//! let mut plugin_manager = PluginManager::new();
70//! // Register connection plugins...
71//!
72//! // 2. Build connection factory
73//! let factory = build_connection_factory(Arc::new(plugin_manager));
74//!
75//! // 3. Set factory in connection manager
76//! let connection_manager = ConnectionManager::default();
77//! connection_manager.set_connection_factory(factory);
78//!
79//! // 4. Use connection manager to create connections
80//! let key = ConnectionKey::new("router1", "ssh");
81//! // let connection = connection_manager.get_or_create(key);
82//! ```
83//!
84//! ## Plugin Integration
85//!
86//! Connection plugins must implement the `PluginConnection` trait:
87//!
88//! ```no_run
89//! use async_trait::async_trait;
90//! use genja_plugin_manager::plugin_types::{Plugin, PluginConnection};
91//! use genja_core::inventory::{ConnectionKey, ResolvedConnectionParams};
92//!
93//! struct MyConnectionPlugin {
94//!     key: ConnectionKey,
95//! }
96//!
97//! impl Plugin for MyConnectionPlugin {
98//!     fn name(&self) -> String {
99//!         "my_connection".to_string()
100//!     }
101//! }
102//!
103//! #[async_trait]
104//! impl PluginConnection for MyConnectionPlugin {
105//!     fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
106//!         Box::new(MyConnectionPlugin { key: key.clone() })
107//!     }
108//!
109//!     async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String> {
110//!         // Establish connection
111//!         let _ = params;
112//!         Ok(())
113//!     }
114//!
115//!     fn close(&mut self) -> ConnectionKey {
116//!         // Clean up connection
117//!         self.key.clone()
118//!     }
119//!
120//!     fn is_alive(&self) -> bool {
121//!         // Check connection status
122//!         true
123//!     }
124//! }
125//! ```
126//!
127//! # Connection Lifecycle
128//!
129//! The adapter manages the connection lifecycle through the following states:
130//!
131//! 1. **Created** - Adapter is instantiated with `alive = false`
132//! 2. **Opening** - `open()` is called with connection parameters
133//! 3. **Open** - Connection established successfully, `alive = true`
134//! 4. **Closing** - `close()` is called to tear down connection
135//! 5. **Closed** - Connection terminated, `alive = false`
136//!
137//! ## State Transitions
138//!
139//! ```text
140//! ┌─────────┐
141//! │ Created │ (alive = false)
142//! └────┬────┘
143//!      │
144//!      │ open() called
145//!      ▼
146//! ┌─────────┐
147//! │ Opening │
148//! └────┬────┘
149//!      │
150//!      ├─ Success ──► ┌──────┐
151//!      │              │ Open │ (alive = true)
152//!      │              └───┬──┘
153//!      │                  │
154//!      │                  │ close() called
155//!      │                  ▼
156//!      │              ┌────────┐
157//!      └─ Failure ──► │ Closed │ (alive = false)
158//!                     └────────┘
159//! ```
160//!
161//! # Thread Safety
162//!
163//! All components in this module are designed for concurrent use:
164//!
165//! - The connection factory is wrapped in `Arc` and can be shared across threads
166//! - Each connection adapter is wrapped in `Arc<Mutex<_>>` for safe mutation
167//! - The `PluginManager` reference is shared via `Arc` in the factory closure
168//!
169//! # Error Handling
170//!
171//! The factory returns `Option<Arc<Mutex<dyn Connection>>>`:
172//!
173//! - `Some(connection)` - Plugin found and connection created successfully
174//! - `None` - Plugin not found or not a connection plugin
175//!
176//! Connection operations return `Result<(), String>`:
177//!
178//! - `Ok(())` - Operation succeeded
179//! - `Err(message)` - Operation failed with error description
180//!
181//! # Performance Considerations
182//!
183//! - **Connection Pooling**: The `ConnectionManager` handles connection reuse
184//! - **Lazy Creation**: Connections are created only when needed
185//! - **Plugin Lookup**: Plugin queries are O(1) hash map lookups
186//! - **Lock Contention**: Each connection has its own mutex to minimize contention
187//!
188//! # Examples
189//!
190//! ## Complete Integration Example
191//!
192//! ```no_run
193//! use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
194//! use genja_core::inventory::{
195//!     ConnectionManager, ConnectionKey, ResolvedConnectionParams,
196//!     BaseBuilderHost, Host, Hosts, Inventory,
197//! };
198//! use std::sync::Arc;
199//! use tokio::runtime::Builder;
200//!
201//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
202//! // Set up plugin manager with connection plugins
203//! let mut plugin_manager = PluginManager::new();
204//! // plugin_manager.load_plugins_from_directory("plugins")?;
205//!
206//! // Create connection factory
207//! let factory = build_connection_factory(Arc::new(plugin_manager));
208//!
209//! // Build inventory with hosts
210//! let mut hosts = Hosts::new();
211//! hosts.add_host("router1", Host::builder()
212//!     .hostname("10.0.0.1")
213//!     .port(22)
214//!     .username("admin")
215//!     .platform("cisco_ios")
216//!     .build());
217//!
218//! let inventory = Inventory::builder()
219//!     .hosts(hosts)
220//!     .build();
221//!
222//! // Set up connection manager
223//! let connection_manager = inventory.connections();
224//! connection_manager.set_connection_factory(factory);
225//!
226//! // Create and use connection
227//! let key = ConnectionKey::new("router1", "ssh");
228//!
229//! // Open connection
230//! let params = ResolvedConnectionParams {
231//!     hostname: "10.0.0.1".to_string(),
232//!     port: Some(22),
233//!     username: Some("admin".to_string()),
234//!     password: Some("secret".to_string()),
235//!     platform: Some("cisco_ios".to_string()),
236//!     extras: None,
237//! };
238//! let runtime = Builder::new_current_thread().enable_all().build()?;
239//! let connection = runtime
240//!     .block_on(async { connection_manager.open_connection(&key, &params).await })?
241//!     .expect("connection plugin not found");
242//!
243//! let conn = runtime.block_on(connection.lock());
244//!
245//! // Use connection...
246//! assert!(conn.is_alive());
247//!
248//! // Close connection
249//! drop(conn);
250//! connection_manager.close_connection(&key);
251//! # Ok(())
252//! # }
253//! ```
254//!
255//! # See Also
256//!
257//! - [`PluginManager`] - Manages plugin registration and lookup
258//! - [`PluginConnection`] - Plugin connection trait
259//! - [`Connection`] - Core connection trait
260//! - [`ConnectionFactory`] - Factory function type for creating connections
261
262use crate::PluginManager;
263use crate::plugin_types::{PluginConnection, Plugins};
264use async_trait::async_trait;
265use genja_core::inventory::{
266    Connection, ConnectionFactory, ConnectionKey, ResolvedConnectionParams,
267};
268use std::sync::Arc;
269use tokio::sync::Mutex;
270
271/// Adapter that bridges `PluginConnection` trait objects to the `Connection` trait.
272///
273/// This adapter wraps a `PluginConnection` implementation and provides the `Connection`
274/// interface expected by the inventory system. It maintains connection lifecycle state
275/// and delegates operations to the underlying plugin.
276///
277/// # Purpose
278///
279/// The adapter serves two main purposes:
280/// 1. **Trait Adaptation**: Converts `PluginConnection` trait objects into `Connection`
281///    trait objects, allowing plugins to integrate with the core connection management system.
282/// 2. **State Tracking**: Maintains the `alive` flag to track whether a connection has been
283///    successfully opened, providing quick status checks without querying the plugin.
284///
285/// # Lifecycle
286///
287/// The adapter tracks connection state through the `alive` flag:
288/// - Initially `false` when created
289/// - Set to `true` after successful `open()` call
290/// - Reset to `false` after `close()` call
291///
292/// # Thread Safety
293///
294/// This adapter is typically wrapped in `Arc<Mutex<_>>` by the connection factory,
295/// ensuring thread-safe access to the underlying plugin connection.
296///
297/// # Examples
298///
299/// ```no_run
300/// use genja_plugin_manager::connection_factory::PluginConnectionAdapter;
301/// use genja_core::inventory::{Connection, ConnectionKey, ResolvedConnectionParams};
302///
303/// // Typically created by the connection factory, not directly
304/// // let plugin_connection = ...; // Some PluginConnection implementation
305/// // let adapter = PluginConnectionAdapter::new(plugin_connection);
306/// ```
307#[derive(Debug)]
308#[doc(hidden)]
309pub struct PluginConnectionAdapter {
310    /// The underlying plugin connection implementation.
311    ///
312    /// This boxed trait object contains the actual connection logic provided
313    /// by the plugin. All connection operations are delegated to this inner plugin.
314    inner: Box<dyn PluginConnection>,
315
316    /// Tracks whether the connection is currently alive.
317    ///
318    /// This flag is set to `true` when `open()` succeeds and reset to `false`
319    /// when `close()` is called. It provides a quick way to check connection
320    /// status without querying the plugin directly.
321    alive: bool,
322}
323
324impl PluginConnectionAdapter {
325    /// Creates a new adapter wrapping the given plugin connection.
326    ///
327    /// The adapter is initialized with `alive` set to `false`, indicating
328    /// that no connection has been established yet.
329    ///
330    /// # Parameters
331    ///
332    /// * `inner` - The plugin connection implementation to wrap
333    ///
334    /// # Returns
335    ///
336    /// A new `PluginConnectionAdapter` instance ready to manage the connection lifecycle.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// use genja_plugin_manager::connection_factory::PluginConnectionAdapter;
342    ///
343    /// // let plugin_connection = ...; // Some PluginConnection implementation
344    /// // let adapter = PluginConnectionAdapter::new(plugin_connection);
345    /// // assert!(!adapter.is_alive());
346    /// ```
347    fn new(inner: Box<dyn PluginConnection>) -> Self {
348        Self {
349            inner,
350            alive: false,
351        }
352    }
353
354    #[doc(hidden)]
355    pub fn inner_plugin_connection(&self) -> &dyn PluginConnection {
356        self.inner.as_ref()
357    }
358}
359
360#[async_trait]
361impl Connection for PluginConnectionAdapter {
362    /// Creates a new connection instance for the specified key.
363    ///
364    /// Delegates to the underlying plugin's `create()` method and wraps the result
365    /// in a new adapter. This allows each host to have its own connection instance
366    /// while maintaining consistent lifecycle management.
367    ///
368    /// # Parameters
369    ///
370    /// * `key` - The connection key identifying the host and connection type
371    ///
372    /// # Returns
373    ///
374    /// A boxed `Connection` trait object containing the new connection instance.
375    ///
376    /// # Examples
377    ///
378    /// ```no_run
379    /// use genja_core::inventory::{Connection, ConnectionKey};
380    ///
381    /// // let adapter: Box<dyn Connection> = ...;
382    /// // let key = ConnectionKey {
383    /// //     hostname: "router1".to_string(),
384    /// //     plugin_name: "ssh".to_string(),
385    /// // };
386    /// // let new_connection = adapter.create(&key);
387    /// ```
388    fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
389        let instance = self.inner.create(key);
390        Box::new(PluginConnectionAdapter::new(instance))
391    }
392
393    /// Checks if the connection is currently alive.
394    ///
395    /// Returns the value of the internal `alive` flag, which is set to `true`
396    /// after a successful `open()` call and reset to `false` after `close()`.
397    ///
398    /// # Returns
399    ///
400    /// `true` if the connection has been opened and not yet closed, `false` otherwise.
401    ///
402    /// # Note
403    ///
404    /// This method returns cached state and does not verify the actual connection
405    /// status with the underlying plugin. For real-time validation, plugins should
406    /// implement additional health check mechanisms.
407    ///
408    /// # Examples
409    ///
410    /// ```no_run
411    /// use genja_core::inventory::Connection;
412    ///
413    /// // let mut adapter: Box<dyn Connection> = ...;
414    /// // assert!(!adapter.is_alive());
415    /// // adapter.open(&params)?;
416    /// // assert!(adapter.is_alive());
417    /// ```
418    fn is_alive(&self) -> bool {
419        self.alive
420    }
421
422    /// Opens a connection using the provided parameters.
423    ///
424    /// Delegates the connection establishment to the underlying plugin. If the
425    /// plugin successfully opens the connection, the `alive` flag is set to `true`.
426    /// If the operation fails, the flag remains `false`.
427    ///
428    /// # Parameters
429    ///
430    /// * `params` - Resolved connection parameters including hostname, port,
431    ///              credentials, and platform-specific settings
432    ///
433    /// # Returns
434    ///
435    /// * `Ok(())` - Connection opened successfully
436    /// * `Err(String)` - Connection failed with error message
437    ///
438    /// # State Changes
439    ///
440    /// On success, sets `alive` to `true`. On failure, `alive` remains `false`.
441    ///
442    /// # Examples
443    ///
444    /// ```no_run
445    /// use genja_core::inventory::{Connection, ResolvedConnectionParams};
446    ///
447    /// // let mut adapter: Box<dyn Connection> = ...;
448    /// // let params = ResolvedConnectionParams {
449    /// //     hostname: "10.0.0.1".to_string(),
450    /// //     port: Some(22),
451    /// //     username: Some("admin".to_string()),
452    /// //     password: Some("secret".to_string()),
453    /// //     platform: Some("linux".to_string()),
454    /// //     extras: None,
455    /// // };
456    /// // adapter.open(&params)?;
457    /// ```
458    async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String> {
459        let result = self.inner.open(params).await;
460        if result.is_ok() {
461            self.alive = true;
462        }
463        result
464    }
465
466    async fn execute_command(&mut self, command: &str) -> Result<String, String> {
467        self.inner.execute_command(command).await
468    }
469
470    /// Closes the connection and returns its key.
471    ///
472    /// Delegates the connection teardown to the underlying plugin and resets
473    /// the `alive` flag to `false`. The connection key is returned for tracking
474    /// or cleanup purposes.
475    ///
476    /// # Returns
477    ///
478    /// The `ConnectionKey` identifying the closed connection.
479    ///
480    /// # State Changes
481    ///
482    /// Always sets `alive` to `false`, regardless of the plugin's close operation result.
483    ///
484    /// # Examples
485    ///
486    /// ```no_run
487    /// use genja_core::inventory::Connection;
488    ///
489    /// // let mut adapter: Box<dyn Connection> = ...;
490    /// // adapter.open(&params)?;
491    /// // let key = adapter.close();
492    /// // assert!(!adapter.is_alive());
493    /// ```
494    fn close(&mut self) -> ConnectionKey {
495        let key = self.inner.close();
496        self.alive = false;
497        key
498    }
499}
500
501/// Builds a connection factory that creates connections from registered plugins.
502///
503/// This function creates a `ConnectionFactory` closure that looks up plugins by
504/// connection type and creates appropriate connection instances. The factory
505/// integrates plugin-based connections into the inventory's connection management
506/// system.
507///
508/// # How It Works
509///
510/// 1. The factory receives a `ConnectionKey` specifying the connection type
511/// 2. It queries the `PluginManager` for a plugin matching that type
512/// 3. If a `Connection` plugin is found, it creates a new connection instance
513/// 4. The instance is wrapped in a `PluginConnectionAdapter` for trait compatibility
514/// 5. The adapter is wrapped in `Arc<Mutex<_>>` for thread-safe access
515///
516/// # Parameters
517///
518/// * `plugins` - Shared reference to the plugin manager containing registered plugins
519///
520/// # Returns
521///
522/// An `Arc<ConnectionFactory>` that can be used to create plugin-based connections.
523///
524/// # Plugin Requirements
525///
526/// The factory only works with plugins registered as `Plugins::Connection` variants.
527/// Other plugin types are ignored and will result in `None` being returned.
528///
529/// # Thread Safety
530///
531/// The returned factory is thread-safe and can be shared across multiple threads.
532/// Each created connection is wrapped in `Arc<Mutex<_>>` for safe concurrent access.
533///
534/// # Examples
535///
536/// ```no_run
537/// use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
538/// use genja_core::inventory::{ConnectionKey, ConnectionManager};
539/// use std::sync::Arc;
540///
541/// // Create plugin manager and register plugins
542/// let mut plugin_manager = PluginManager::new();
543/// // plugin_manager.register_plugin(...);
544///
545/// // Build connection factory
546/// let factory = build_connection_factory(Arc::new(plugin_manager));
547///
548/// // Use factory with connection manager
549/// let connection_manager = ConnectionManager::default();
550/// connection_manager.set_connection_factory(factory);
551///
552/// // Create connections through the manager
553/// let key = ConnectionKey::new("router1", "ssh");
554/// // let connection = connection_manager.get_or_create(key);
555/// ```
556///
557/// # See Also
558///
559/// * [`PluginConnectionAdapter`] - The adapter that wraps plugin connections
560/// * [`PluginManager`] - Manages registered plugins
561/// * [`ConnectionFactory`] - The factory type returned by this function
562pub fn build_connection_factory(plugins: Arc<PluginManager>) -> Arc<ConnectionFactory> {
563    Arc::new(move |key: &ConnectionKey| {
564        let plugin = plugins.get_plugin(&key.plugin_name)?;
565        match plugin {
566            Plugins::Connection(connection) => {
567                let instance = connection.create(key);
568                let adapter = PluginConnectionAdapter::new(instance);
569                Some(Arc::new(Mutex::new(adapter)) as Arc<Mutex<dyn Connection>>)
570            }
571            _ => None,
572        }
573    })
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use crate::plugin_types::{Plugin, PluginConnection, PluginRunner};
580    use genja_core::inventory::Connection;
581    use genja_core::inventory::ConnectionManager;
582    use genja_core::task::Tasks;
583    use std::future::Future;
584    use tokio::runtime::Builder;
585
586    fn run_async<F: Future>(future: F) -> F::Output {
587        Builder::new_current_thread()
588            .enable_all()
589            .build()
590            .expect("test runtime should build")
591            .block_on(future)
592    }
593
594    #[derive(Debug)]
595    struct TestConnection {
596        name: &'static str,
597        key: ConnectionKey,
598        alive: bool,
599    }
600
601    impl TestConnection {
602        fn new(name: &'static str, key: ConnectionKey) -> Self {
603            Self {
604                name,
605                key,
606                alive: false,
607            }
608        }
609    }
610
611    impl Plugin for TestConnection {
612        fn name(&self) -> String {
613            self.name.to_string()
614        }
615    }
616
617    #[async_trait]
618    impl PluginConnection for TestConnection {
619        fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
620            Box::new(Self::new(self.name, key.clone()))
621        }
622
623        async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
624            self.alive = true;
625            Ok(())
626        }
627
628        fn close(&mut self) -> ConnectionKey {
629            self.alive = false;
630            self.key.clone()
631        }
632
633        fn is_alive(&self) -> bool {
634            self.alive
635        }
636    }
637
638    #[derive(Debug)]
639    struct DummyRunner {
640        name: &'static str,
641    }
642
643    impl Plugin for DummyRunner {
644        fn name(&self) -> String {
645            self.name.to_string()
646        }
647    }
648
649    #[async_trait]
650    impl PluginRunner for DummyRunner {
651        async fn run_task(
652            &self,
653            _task: &genja_core::task::TaskDefinition,
654            _hosts: &genja_core::inventory::Hosts,
655            _connection_resolver: Option<
656                std::sync::Arc<dyn genja_core::task::TaskConnectionResolver>,
657            >,
658            _runner_config: &genja_core::settings::RunnerConfig,
659            _max_depth: usize,
660        ) -> Result<genja_core::task::TaskResults, genja_core::GenjaError> {
661            Ok(genja_core::task::TaskResults::new("runner"))
662        }
663
664        async fn run_tasks(
665            &self,
666            _tasks: &Tasks,
667            _hosts: &genja_core::inventory::Hosts,
668            _connection_resolver: Option<
669                std::sync::Arc<dyn genja_core::task::TaskConnectionResolver>,
670            >,
671            _runner_config: &genja_core::settings::RunnerConfig,
672            _max_depth: usize,
673        ) -> Result<Vec<genja_core::task::TaskResults>, genja_core::GenjaError> {
674            Ok(Vec::new())
675        }
676    }
677
678    fn default_params() -> ResolvedConnectionParams {
679        ResolvedConnectionParams {
680            hostname: "host1".to_string(),
681            port: Some(22),
682            username: Some("user".to_string()),
683            password: Some("pass".to_string()),
684            platform: Some("linux".to_string()),
685            extras: None,
686        }
687    }
688
689    #[test]
690    fn adapter_open_close_updates_alive_and_returns_key() {
691        let key = ConnectionKey::new("host1", "ssh");
692        let plugin = TestConnection::new("ssh", key.clone());
693
694        let mut adapter = PluginConnectionAdapter::new(Box::new(plugin));
695        assert!(!adapter.is_alive());
696
697        run_async(adapter.open(&default_params())).unwrap();
698        assert!(adapter.is_alive());
699
700        let closed_key = adapter.close();
701        assert_eq!(closed_key, key);
702        assert!(!adapter.is_alive());
703    }
704
705    #[test]
706    fn adapter_create_uses_plugin_create_and_starts_dead() {
707        let key = ConnectionKey::new("host1", "ssh");
708        let plugin = TestConnection::new("ssh", key.clone());
709        let adapter = PluginConnectionAdapter::new(Box::new(plugin));
710
711        let new_key = ConnectionKey::new("host2", "ssh");
712        let new_conn = adapter.create(&new_key);
713        assert!(!new_conn.is_alive());
714    }
715
716    #[test]
717    fn factory_returns_none_for_missing_or_non_connection_plugins() {
718        let manager = Arc::new(PluginManager::new());
719        let factory = build_connection_factory(Arc::clone(&manager));
720        let key = ConnectionKey::new("host1", "ssh");
721        assert!(factory(&key).is_none());
722
723        let mut manager = PluginManager::new();
724        manager.register_plugin(Plugins::Runner(Box::new(DummyRunner { name: "runner" })));
725        let factory = build_connection_factory(Arc::new(manager));
726        let key = ConnectionKey::new("host1", "runner");
727        assert!(factory(&key).is_none());
728    }
729
730    #[test]
731    fn factory_returns_adapter_for_connection_plugins() {
732        let key = ConnectionKey::new("host1", "ssh");
733        let plugin = TestConnection::new("ssh", key.clone());
734
735        let mut manager = PluginManager::new();
736        manager.register_plugin(Plugins::Connection(Box::new(plugin)));
737
738        let factory = build_connection_factory(Arc::new(manager));
739        let connection = factory(&key).expect("expected connection plugin");
740
741        {
742            let mut guard = run_async(connection.lock());
743            assert!(!guard.is_alive());
744            run_async(guard.open(&default_params())).unwrap();
745            assert!(guard.is_alive());
746            let closed_key = guard.close();
747            assert_eq!(closed_key, key);
748            assert!(!guard.is_alive());
749        }
750    }
751
752    #[test]
753    fn manager_counters_increment_on_open_and_close() {
754        let key = ConnectionKey::new("host1", "ssh");
755        let plugin = TestConnection::new("ssh", key.clone());
756
757        let mut manager = PluginManager::new();
758        manager.register_plugin(Plugins::Connection(Box::new(plugin)));
759
760        let factory = build_connection_factory(Arc::new(manager));
761        let connection_manager = ConnectionManager::with_connection_factory(factory);
762
763        let params = default_params();
764        let connection = run_async(connection_manager.open_connection(&key, &params))
765            .unwrap()
766            .unwrap();
767
768        let counters = connection_manager.connection_counters_for("ssh").unwrap();
769        assert_eq!(counters.create_calls, 1);
770        assert_eq!(counters.open_calls, 1);
771        assert_eq!(counters.close_calls, 0);
772
773        drop(connection);
774        connection_manager.close_connection(&key);
775        let counters = connection_manager.connection_counters_for("ssh").unwrap();
776        assert_eq!(counters.create_calls, 1);
777        assert_eq!(counters.open_calls, 1);
778        assert_eq!(counters.close_calls, 1);
779    }
780
781    #[test]
782    fn open_connection_twice_does_not_double_count_open() {
783        let key = ConnectionKey::new("host1", "ssh");
784        let plugin = TestConnection::new("ssh", key.clone());
785
786        let mut manager = PluginManager::new();
787        manager.register_plugin(Plugins::Connection(Box::new(plugin)));
788
789        let factory = build_connection_factory(Arc::new(manager));
790        let connection_manager = ConnectionManager::with_connection_factory(factory);
791
792        let params = default_params();
793        run_async(connection_manager.open_connection(&key, &params))
794            .unwrap()
795            .unwrap();
796
797        let counters_after_first = connection_manager.connection_counters_for("ssh").unwrap();
798        assert_eq!(counters_after_first.create_calls, 1);
799        assert_eq!(counters_after_first.open_calls, 1);
800
801        run_async(connection_manager.open_connection(&key, &params))
802            .unwrap()
803            .unwrap();
804
805        let counters_after_second = connection_manager.connection_counters_for("ssh").unwrap();
806        assert_eq!(counters_after_second.create_calls, 1);
807        assert_eq!(counters_after_second.open_calls, 1);
808    }
809
810    #[test]
811    fn open_connection_errors_when_factory_missing() {
812        let connection_manager = ConnectionManager::default();
813        let key = ConnectionKey::new("host1", "ssh");
814        let params = default_params();
815
816        let err = run_async(connection_manager.open_connection(&key, &params)).unwrap_err();
817        assert_eq!(err, "connection factory not set");
818    }
819
820    #[test]
821    fn connection_counters_snapshot_tracks_multiple_types() {
822        let key_ssh = ConnectionKey::new("host1", "ssh");
823        let key_telnet = ConnectionKey::new("host2", "telnet");
824
825        let plugin_ssh = TestConnection::new("ssh", key_ssh.clone());
826        let plugin_telnet = TestConnection::new("telnet", key_telnet.clone());
827
828        let mut manager = PluginManager::new();
829        manager.register_plugin(Plugins::Connection(Box::new(plugin_ssh)));
830        manager.register_plugin(Plugins::Connection(Box::new(plugin_telnet)));
831
832        let factory = build_connection_factory(Arc::new(manager));
833        let connection_manager = ConnectionManager::with_connection_factory(factory);
834
835        let params = default_params();
836        run_async(connection_manager.open_connection(&key_ssh, &params))
837            .unwrap()
838            .unwrap();
839        run_async(connection_manager.open_connection(&key_telnet, &params))
840            .unwrap()
841            .unwrap();
842
843        let snapshot = connection_manager.connection_counters_snapshot();
844        let ssh = snapshot.get("ssh").copied().unwrap();
845        let telnet = snapshot.get("telnet").copied().unwrap();
846
847        assert_eq!(ssh.create_calls, 1);
848        assert_eq!(ssh.open_calls, 1);
849        assert_eq!(ssh.close_calls, 0);
850
851        assert_eq!(telnet.create_calls, 1);
852        assert_eq!(telnet.open_calls, 1);
853        assert_eq!(telnet.close_calls, 0);
854    }
855
856    /// Tests that connections created by the factory are thread-safe and can handle concurrent access.
857    ///
858    /// This test verifies that the connection adapter wrapped in `Arc<Mutex<_>>` properly
859    /// synchronizes access from multiple threads. It spawns two threads that concurrently
860    /// attempt to open and close the same connection, ensuring that:
861    ///
862    /// 1. The mutex prevents data races on the connection state
863    /// 2. Multiple threads can safely acquire and release the lock
864    /// 3. Connection operations (open/close) are properly serialized
865    /// 4. The operation counters reflect all concurrent operations
866    ///
867    /// # Test Setup
868    ///
869    /// - Creates a test connection plugin with atomic counters to track operations
870    /// - Registers the plugin with a `PluginManager`
871    /// - Builds a connection factory and creates a connection instance
872    /// - Uses a barrier to synchronize thread execution for maximum contention
873    ///
874    /// # Test Execution
875    ///
876    /// - Thread A: Opens the connection
877    /// - Thread B: Closes then reopens the connection
878    /// - Main thread: Waits for both threads and performs final cleanup
879    ///
880    /// # Assertions
881    ///
882    /// - Verifies that at least 2 open operations occurred (one per thread)
883    /// - Verifies that at least 1 close operation occurred (from thread B)
884    ///
885    /// # Panics
886    ///
887    /// This test will panic if:
888    /// - The connection factory returns `None`
889    /// - Any thread fails to acquire the mutex lock
890    /// - Any connection operation fails
891    /// - The expected minimum operation counts are not met
892    #[test]
893    fn factory_connection_is_thread_safe() {
894        let key = ConnectionKey::new("host1", "ssh");
895        let plugin = TestConnection::new("ssh", key.clone());
896
897        let mut manager = PluginManager::new();
898        manager.register_plugin(Plugins::Connection(Box::new(plugin)));
899
900        let factory = build_connection_factory(Arc::new(manager));
901        let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
902
903        let barrier = Arc::new(std::sync::Barrier::new(3));
904        let params = Arc::new(default_params());
905
906        let barrier_a = Arc::clone(&barrier);
907        let params_a = Arc::clone(&params);
908        let manager_a = Arc::clone(&manager);
909        let key_a = key.clone();
910        let thread_a = std::thread::spawn(move || {
911            barrier_a.wait();
912            run_async(manager_a.open_connection(&key_a, &params_a))
913                .unwrap()
914                .unwrap();
915        });
916
917        let barrier_b = Arc::clone(&barrier);
918        let params_b = Arc::clone(&params);
919        let manager_b = Arc::clone(&manager);
920        let key_b = key.clone();
921        let thread_b = std::thread::spawn(move || {
922            barrier_b.wait();
923            run_async(manager_b.open_connection(&key_b, &params_b))
924                .unwrap()
925                .unwrap();
926        });
927
928        barrier.wait();
929        thread_a.join().unwrap();
930        thread_b.join().unwrap();
931
932        manager.close_connection(&key);
933        let counters = manager.connection_counters_for("ssh").unwrap();
934
935        assert_eq!(counters.create_calls, 1);
936        assert_eq!(counters.open_calls, 1);
937        assert_eq!(counters.close_calls, 1);
938    }
939
940    #[test]
941    fn adapter_open_error_keeps_alive_false() {
942        #[derive(Debug)]
943        struct FailingConnection;
944
945        impl Plugin for FailingConnection {
946            fn name(&self) -> String {
947                "fail".to_string()
948            }
949        }
950
951        #[async_trait]
952        impl PluginConnection for FailingConnection {
953            fn create(&self, _key: &ConnectionKey) -> Box<dyn PluginConnection> {
954                Box::new(Self)
955            }
956
957            async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
958                Err("boom".to_string())
959            }
960
961            fn close(&mut self) -> ConnectionKey {
962                ConnectionKey::new("host1", "fail")
963            }
964
965            fn is_alive(&self) -> bool {
966                false
967            }
968        }
969
970        let mut adapter = PluginConnectionAdapter::new(Box::new(FailingConnection));
971        assert!(!adapter.is_alive());
972        let err = run_async(adapter.open(&default_params())).unwrap_err();
973        assert_eq!(err, "boom");
974        assert!(!adapter.is_alive());
975    }
976
977    #[test]
978    fn adapter_create_can_be_called_multiple_times() {
979        let key = ConnectionKey::new("host1", "ssh");
980        let plugin = TestConnection::new("ssh", key);
981        let adapter = PluginConnectionAdapter::new(Box::new(plugin));
982
983        let key_a = ConnectionKey::new("host-a", "ssh");
984        let key_b = ConnectionKey::new("host-b", "ssh");
985        let conn_a = adapter.create(&key_a);
986        let conn_b = adapter.create(&key_b);
987
988        assert!(!conn_a.is_alive());
989        assert!(!conn_b.is_alive());
990    }
991}