genja_plugin_manager/connection_factory.rs
1//! Connection factory implementation for plugin-based connections.
2//!
3//! This module provides the infrastructure to integrate plugin-based connections
4//! into the core inventory system's connection management. It bridges the gap
5//! between the `PluginConnection` trait used by plugins and the `Connection` trait
6//! expected by the inventory system.
7//!
8//! # Overview
9//!
10//! The module consists of two main components:
11//!
12//! 1. **[`PluginConnectionAdapter`]** - An adapter that wraps `PluginConnection`
13//! implementations and provides the `Connection` interface. It tracks connection
14//! lifecycle state and delegates operations to the underlying plugin.
15//!
16//! 2. **[`build_connection_factory`]** - A factory function that creates a
17//! `ConnectionFactory` closure. This factory looks up plugins by connection type
18//! and creates appropriate connection instances wrapped in adapters.
19//!
20//! # Architecture
21//!
22//! ```text
23//! ┌─────────────────────────────────────────────────────────────┐
24//! │ Connection Manager │
25//! │ (genja_core::inventory) │
26//! └─────────────────────────┬───────────────────────────────────┘
27//! │
28//! │ Uses ConnectionFactory
29//! ▼
30//! ┌─────────────────────────────────────────────────────────────┐
31//! │ build_connection_factory() │
32//! │ Returns: Arc<ConnectionFactory> │
33//! └─────────────────────────┬───────────────────────────────────┘
34//! │
35//! │ Queries for plugins
36//! ▼
37//! ┌─────────────────────────────────────────────────────────────┐
38//! │ PluginManager │
39//! │ (Registered Connection Plugins) │
40//! └─────────────────────────┬───────────────────────────────────┘
41//! │
42//! │ Returns plugin instance
43//! ▼
44//! ┌─────────────────────────────────────────────────────────────┐
45//! │ PluginConnectionAdapter │
46//! │ Wraps: Box<dyn PluginConnection> │
47//! │ Implements: Connection trait │
48//! └─────────────────────────┬───────────────────────────────────┘
49//! │
50//! │ Delegates to
51//! ▼
52//! ┌─────────────────────────────────────────────────────────────┐
53//! │ Plugin Implementation │
54//! │ (e.g., SSH, Telnet, NETCONF) │
55//! │ Implements: PluginConnection trait │
56//! └─────────────────────────────────────────────────────────────┘
57//! ```
58//!
59//! # Usage
60//!
61//! ## Basic Setup
62//!
63//! ```no_run
64//! use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
65//! use genja_core::inventory::{ConnectionManager, ConnectionKey};
66//! use std::sync::Arc;
67//!
68//! // 1. Create and configure plugin manager
69//! let mut plugin_manager = PluginManager::new();
70//! // Register connection plugins...
71//!
72//! // 2. Build connection factory
73//! let factory = build_connection_factory(Arc::new(plugin_manager));
74//!
75//! // 3. Set factory in connection manager
76//! let connection_manager = ConnectionManager::default();
77//! connection_manager.set_connection_factory(factory);
78//!
79//! // 4. Use connection manager to create connections
80//! let key = ConnectionKey::new("router1", "ssh");
81//! // let connection = connection_manager.get_or_create(key);
82//! ```
83//!
84//! ## Plugin Integration
85//!
86//! Connection plugins must implement the `PluginConnection` trait:
87//!
88//! ```no_run
89//! use async_trait::async_trait;
90//! use genja_plugin_manager::plugin_types::{Plugin, PluginConnection};
91//! use genja_core::inventory::{ConnectionKey, ResolvedConnectionParams};
92//!
93//! struct MyConnectionPlugin {
94//! key: ConnectionKey,
95//! }
96//!
97//! impl Plugin for MyConnectionPlugin {
98//! fn name(&self) -> String {
99//! "my_connection".to_string()
100//! }
101//! }
102//!
103//! #[async_trait]
104//! impl PluginConnection for MyConnectionPlugin {
105//! fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
106//! Box::new(MyConnectionPlugin { key: key.clone() })
107//! }
108//!
109//! async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String> {
110//! // Establish connection
111//! let _ = params;
112//! Ok(())
113//! }
114//!
115//! fn close(&mut self) -> ConnectionKey {
116//! // Clean up connection
117//! self.key.clone()
118//! }
119//!
120//! fn is_alive(&self) -> bool {
121//! // Check connection status
122//! true
123//! }
124//! }
125//! ```
126//!
127//! # Connection Lifecycle
128//!
129//! The adapter manages the connection lifecycle through the following states:
130//!
131//! 1. **Created** - Adapter is instantiated with `alive = false`
132//! 2. **Opening** - `open()` is called with connection parameters
133//! 3. **Open** - Connection established successfully, `alive = true`
134//! 4. **Closing** - `close()` is called to tear down connection
135//! 5. **Closed** - Connection terminated, `alive = false`
136//!
137//! ## State Transitions
138//!
139//! ```text
140//! ┌─────────┐
141//! │ Created │ (alive = false)
142//! └────┬────┘
143//! │
144//! │ open() called
145//! ▼
146//! ┌─────────┐
147//! │ Opening │
148//! └────┬────┘
149//! │
150//! ├─ Success ──► ┌──────┐
151//! │ │ Open │ (alive = true)
152//! │ └───┬──┘
153//! │ │
154//! │ │ close() called
155//! │ ▼
156//! │ ┌────────┐
157//! └─ Failure ──► │ Closed │ (alive = false)
158//! └────────┘
159//! ```
160//!
161//! # Thread Safety
162//!
163//! All components in this module are designed for concurrent use:
164//!
165//! - The connection factory is wrapped in `Arc` and can be shared across threads
166//! - Each connection adapter is wrapped in `Arc<Mutex<_>>` for safe mutation
167//! - The `PluginManager` reference is shared via `Arc` in the factory closure
168//!
169//! # Error Handling
170//!
171//! The factory returns `Option<Arc<Mutex<dyn Connection>>>`:
172//!
173//! - `Some(connection)` - Plugin found and connection created successfully
174//! - `None` - Plugin not found or not a connection plugin
175//!
176//! Connection operations return `Result<(), String>`:
177//!
178//! - `Ok(())` - Operation succeeded
179//! - `Err(message)` - Operation failed with error description
180//!
181//! # Performance Considerations
182//!
183//! - **Connection Pooling**: The `ConnectionManager` handles connection reuse
184//! - **Lazy Creation**: Connections are created only when needed
185//! - **Plugin Lookup**: Plugin queries are O(1) hash map lookups
186//! - **Lock Contention**: Each connection has its own mutex to minimize contention
187//!
188//! # Examples
189//!
190//! ## Complete Integration Example
191//!
192//! ```no_run
193//! use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
194//! use genja_core::inventory::{
195//! ConnectionManager, ConnectionKey, ResolvedConnectionParams,
196//! BaseBuilderHost, Host, Hosts, Inventory,
197//! };
198//! use std::sync::Arc;
199//! use tokio::runtime::Builder;
200//!
201//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
202//! // Set up plugin manager with connection plugins
203//! let mut plugin_manager = PluginManager::new();
204//! // plugin_manager.load_plugins_from_directory("plugins")?;
205//!
206//! // Create connection factory
207//! let factory = build_connection_factory(Arc::new(plugin_manager));
208//!
209//! // Build inventory with hosts
210//! let mut hosts = Hosts::new();
211//! hosts.add_host("router1", Host::builder()
212//! .hostname("10.0.0.1")
213//! .port(22)
214//! .username("admin")
215//! .platform("cisco_ios")
216//! .build());
217//!
218//! let inventory = Inventory::builder()
219//! .hosts(hosts)
220//! .build();
221//!
222//! // Set up connection manager
223//! let connection_manager = inventory.connections();
224//! connection_manager.set_connection_factory(factory);
225//!
226//! // Create and use connection
227//! let key = ConnectionKey::new("router1", "ssh");
228//!
229//! // Open connection
230//! let params = ResolvedConnectionParams {
231//! hostname: "10.0.0.1".to_string(),
232//! port: Some(22),
233//! username: Some("admin".to_string()),
234//! password: Some("secret".to_string()),
235//! platform: Some("cisco_ios".to_string()),
236//! extras: None,
237//! };
238//! let runtime = Builder::new_current_thread().enable_all().build()?;
239//! let connection = runtime
240//! .block_on(async { connection_manager.open_connection(&key, ¶ms).await })?
241//! .expect("connection plugin not found");
242//!
243//! let conn = runtime.block_on(connection.lock());
244//!
245//! // Use connection...
246//! assert!(conn.is_alive());
247//!
248//! // Close connection
249//! drop(conn);
250//! connection_manager.close_connection(&key);
251//! # Ok(())
252//! # }
253//! ```
254//!
255//! # See Also
256//!
257//! - [`PluginManager`] - Manages plugin registration and lookup
258//! - [`PluginConnection`] - Plugin connection trait
259//! - [`Connection`] - Core connection trait
260//! - [`ConnectionFactory`] - Factory function type for creating connections
261
262use crate::PluginManager;
263use crate::plugin_types::{PluginConnection, Plugins};
264use async_trait::async_trait;
265use genja_core::inventory::{
266 Connection, ConnectionFactory, ConnectionKey, ResolvedConnectionParams,
267};
268use std::sync::Arc;
269use tokio::sync::Mutex;
270
271/// Adapter that bridges `PluginConnection` trait objects to the `Connection` trait.
272///
273/// This adapter wraps a `PluginConnection` implementation and provides the `Connection`
274/// interface expected by the inventory system. It maintains connection lifecycle state
275/// and delegates operations to the underlying plugin.
276///
277/// # Purpose
278///
279/// The adapter serves two main purposes:
280/// 1. **Trait Adaptation**: Converts `PluginConnection` trait objects into `Connection`
281/// trait objects, allowing plugins to integrate with the core connection management system.
282/// 2. **State Tracking**: Maintains the `alive` flag to track whether a connection has been
283/// successfully opened, providing quick status checks without querying the plugin.
284///
285/// # Lifecycle
286///
287/// The adapter tracks connection state through the `alive` flag:
288/// - Initially `false` when created
289/// - Set to `true` after successful `open()` call
290/// - Reset to `false` after `close()` call
291///
292/// # Thread Safety
293///
294/// This adapter is typically wrapped in `Arc<Mutex<_>>` by the connection factory,
295/// ensuring thread-safe access to the underlying plugin connection.
296///
297/// # Examples
298///
299/// ```no_run
300/// use genja_plugin_manager::connection_factory::PluginConnectionAdapter;
301/// use genja_core::inventory::{Connection, ConnectionKey, ResolvedConnectionParams};
302///
303/// // Typically created by the connection factory, not directly
304/// // let plugin_connection = ...; // Some PluginConnection implementation
305/// // let adapter = PluginConnectionAdapter::new(plugin_connection);
306/// ```
307#[derive(Debug)]
308#[doc(hidden)]
309pub struct PluginConnectionAdapter {
310 /// The underlying plugin connection implementation.
311 ///
312 /// This boxed trait object contains the actual connection logic provided
313 /// by the plugin. All connection operations are delegated to this inner plugin.
314 inner: Box<dyn PluginConnection>,
315
316 /// Tracks whether the connection is currently alive.
317 ///
318 /// This flag is set to `true` when `open()` succeeds and reset to `false`
319 /// when `close()` is called. It provides a quick way to check connection
320 /// status without querying the plugin directly.
321 alive: bool,
322}
323
324impl PluginConnectionAdapter {
325 /// Creates a new adapter wrapping the given plugin connection.
326 ///
327 /// The adapter is initialized with `alive` set to `false`, indicating
328 /// that no connection has been established yet.
329 ///
330 /// # Parameters
331 ///
332 /// * `inner` - The plugin connection implementation to wrap
333 ///
334 /// # Returns
335 ///
336 /// A new `PluginConnectionAdapter` instance ready to manage the connection lifecycle.
337 ///
338 /// # Examples
339 ///
340 /// ```no_run
341 /// use genja_plugin_manager::connection_factory::PluginConnectionAdapter;
342 ///
343 /// // let plugin_connection = ...; // Some PluginConnection implementation
344 /// // let adapter = PluginConnectionAdapter::new(plugin_connection);
345 /// // assert!(!adapter.is_alive());
346 /// ```
347 fn new(inner: Box<dyn PluginConnection>) -> Self {
348 Self {
349 inner,
350 alive: false,
351 }
352 }
353
354 #[doc(hidden)]
355 pub fn inner_plugin_connection(&self) -> &dyn PluginConnection {
356 self.inner.as_ref()
357 }
358}
359
360#[async_trait]
361impl Connection for PluginConnectionAdapter {
362 /// Creates a new connection instance for the specified key.
363 ///
364 /// Delegates to the underlying plugin's `create()` method and wraps the result
365 /// in a new adapter. This allows each host to have its own connection instance
366 /// while maintaining consistent lifecycle management.
367 ///
368 /// # Parameters
369 ///
370 /// * `key` - The connection key identifying the host and connection type
371 ///
372 /// # Returns
373 ///
374 /// A boxed `Connection` trait object containing the new connection instance.
375 ///
376 /// # Examples
377 ///
378 /// ```no_run
379 /// use genja_core::inventory::{Connection, ConnectionKey};
380 ///
381 /// // let adapter: Box<dyn Connection> = ...;
382 /// // let key = ConnectionKey {
383 /// // hostname: "router1".to_string(),
384 /// // plugin_name: "ssh".to_string(),
385 /// // };
386 /// // let new_connection = adapter.create(&key);
387 /// ```
388 fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
389 let instance = self.inner.create(key);
390 Box::new(PluginConnectionAdapter::new(instance))
391 }
392
393 /// Checks if the connection is currently alive.
394 ///
395 /// Returns the value of the internal `alive` flag, which is set to `true`
396 /// after a successful `open()` call and reset to `false` after `close()`.
397 ///
398 /// # Returns
399 ///
400 /// `true` if the connection has been opened and not yet closed, `false` otherwise.
401 ///
402 /// # Note
403 ///
404 /// This method returns cached state and does not verify the actual connection
405 /// status with the underlying plugin. For real-time validation, plugins should
406 /// implement additional health check mechanisms.
407 ///
408 /// # Examples
409 ///
410 /// ```no_run
411 /// use genja_core::inventory::Connection;
412 ///
413 /// // let mut adapter: Box<dyn Connection> = ...;
414 /// // assert!(!adapter.is_alive());
415 /// // adapter.open(¶ms)?;
416 /// // assert!(adapter.is_alive());
417 /// ```
418 fn is_alive(&self) -> bool {
419 self.alive
420 }
421
422 /// Opens a connection using the provided parameters.
423 ///
424 /// Delegates the connection establishment to the underlying plugin. If the
425 /// plugin successfully opens the connection, the `alive` flag is set to `true`.
426 /// If the operation fails, the flag remains `false`.
427 ///
428 /// # Parameters
429 ///
430 /// * `params` - Resolved connection parameters including hostname, port,
431 /// credentials, and platform-specific settings
432 ///
433 /// # Returns
434 ///
435 /// * `Ok(())` - Connection opened successfully
436 /// * `Err(String)` - Connection failed with error message
437 ///
438 /// # State Changes
439 ///
440 /// On success, sets `alive` to `true`. On failure, `alive` remains `false`.
441 ///
442 /// # Examples
443 ///
444 /// ```no_run
445 /// use genja_core::inventory::{Connection, ResolvedConnectionParams};
446 ///
447 /// // let mut adapter: Box<dyn Connection> = ...;
448 /// // let params = ResolvedConnectionParams {
449 /// // hostname: "10.0.0.1".to_string(),
450 /// // port: Some(22),
451 /// // username: Some("admin".to_string()),
452 /// // password: Some("secret".to_string()),
453 /// // platform: Some("linux".to_string()),
454 /// // extras: None,
455 /// // };
456 /// // adapter.open(¶ms)?;
457 /// ```
458 async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String> {
459 let result = self.inner.open(params).await;
460 if result.is_ok() {
461 self.alive = true;
462 }
463 result
464 }
465
466 async fn execute_command(&mut self, command: &str) -> Result<String, String> {
467 self.inner.execute_command(command).await
468 }
469
470 /// Closes the connection and returns its key.
471 ///
472 /// Delegates the connection teardown to the underlying plugin and resets
473 /// the `alive` flag to `false`. The connection key is returned for tracking
474 /// or cleanup purposes.
475 ///
476 /// # Returns
477 ///
478 /// The `ConnectionKey` identifying the closed connection.
479 ///
480 /// # State Changes
481 ///
482 /// Always sets `alive` to `false`, regardless of the plugin's close operation result.
483 ///
484 /// # Examples
485 ///
486 /// ```no_run
487 /// use genja_core::inventory::Connection;
488 ///
489 /// // let mut adapter: Box<dyn Connection> = ...;
490 /// // adapter.open(¶ms)?;
491 /// // let key = adapter.close();
492 /// // assert!(!adapter.is_alive());
493 /// ```
494 fn close(&mut self) -> ConnectionKey {
495 let key = self.inner.close();
496 self.alive = false;
497 key
498 }
499}
500
501/// Builds a connection factory that creates connections from registered plugins.
502///
503/// This function creates a `ConnectionFactory` closure that looks up plugins by
504/// connection type and creates appropriate connection instances. The factory
505/// integrates plugin-based connections into the inventory's connection management
506/// system.
507///
508/// # How It Works
509///
510/// 1. The factory receives a `ConnectionKey` specifying the connection type
511/// 2. It queries the `PluginManager` for a plugin matching that type
512/// 3. If a `Connection` plugin is found, it creates a new connection instance
513/// 4. The instance is wrapped in a `PluginConnectionAdapter` for trait compatibility
514/// 5. The adapter is wrapped in `Arc<Mutex<_>>` for thread-safe access
515///
516/// # Parameters
517///
518/// * `plugins` - Shared reference to the plugin manager containing registered plugins
519///
520/// # Returns
521///
522/// An `Arc<ConnectionFactory>` that can be used to create plugin-based connections.
523///
524/// # Plugin Requirements
525///
526/// The factory only works with plugins registered as `Plugins::Connection` variants.
527/// Other plugin types are ignored and will result in `None` being returned.
528///
529/// # Thread Safety
530///
531/// The returned factory is thread-safe and can be shared across multiple threads.
532/// Each created connection is wrapped in `Arc<Mutex<_>>` for safe concurrent access.
533///
534/// # Examples
535///
536/// ```no_run
537/// use genja_plugin_manager::{PluginManager, connection_factory::build_connection_factory};
538/// use genja_core::inventory::{ConnectionKey, ConnectionManager};
539/// use std::sync::Arc;
540///
541/// // Create plugin manager and register plugins
542/// let mut plugin_manager = PluginManager::new();
543/// // plugin_manager.register_plugin(...);
544///
545/// // Build connection factory
546/// let factory = build_connection_factory(Arc::new(plugin_manager));
547///
548/// // Use factory with connection manager
549/// let connection_manager = ConnectionManager::default();
550/// connection_manager.set_connection_factory(factory);
551///
552/// // Create connections through the manager
553/// let key = ConnectionKey::new("router1", "ssh");
554/// // let connection = connection_manager.get_or_create(key);
555/// ```
556///
557/// # See Also
558///
559/// * [`PluginConnectionAdapter`] - The adapter that wraps plugin connections
560/// * [`PluginManager`] - Manages registered plugins
561/// * [`ConnectionFactory`] - The factory type returned by this function
562pub fn build_connection_factory(plugins: Arc<PluginManager>) -> Arc<ConnectionFactory> {
563 Arc::new(move |key: &ConnectionKey| {
564 let plugin = plugins.get_plugin(&key.plugin_name)?;
565 match plugin {
566 Plugins::Connection(connection) => {
567 let instance = connection.create(key);
568 let adapter = PluginConnectionAdapter::new(instance);
569 Some(Arc::new(Mutex::new(adapter)) as Arc<Mutex<dyn Connection>>)
570 }
571 _ => None,
572 }
573 })
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use crate::plugin_types::{Plugin, PluginConnection, PluginRunner};
580 use genja_core::inventory::Connection;
581 use genja_core::inventory::ConnectionManager;
582 use genja_core::task::Tasks;
583 use std::future::Future;
584 use tokio::runtime::Builder;
585
586 fn run_async<F: Future>(future: F) -> F::Output {
587 Builder::new_current_thread()
588 .enable_all()
589 .build()
590 .expect("test runtime should build")
591 .block_on(future)
592 }
593
594 #[derive(Debug)]
595 struct TestConnection {
596 name: &'static str,
597 key: ConnectionKey,
598 alive: bool,
599 }
600
601 impl TestConnection {
602 fn new(name: &'static str, key: ConnectionKey) -> Self {
603 Self {
604 name,
605 key,
606 alive: false,
607 }
608 }
609 }
610
611 impl Plugin for TestConnection {
612 fn name(&self) -> String {
613 self.name.to_string()
614 }
615 }
616
617 #[async_trait]
618 impl PluginConnection for TestConnection {
619 fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
620 Box::new(Self::new(self.name, key.clone()))
621 }
622
623 async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
624 self.alive = true;
625 Ok(())
626 }
627
628 fn close(&mut self) -> ConnectionKey {
629 self.alive = false;
630 self.key.clone()
631 }
632
633 fn is_alive(&self) -> bool {
634 self.alive
635 }
636 }
637
638 #[derive(Debug)]
639 struct DummyRunner {
640 name: &'static str,
641 }
642
643 impl Plugin for DummyRunner {
644 fn name(&self) -> String {
645 self.name.to_string()
646 }
647 }
648
649 #[async_trait]
650 impl PluginRunner for DummyRunner {
651 async fn run_task(
652 &self,
653 _task: &genja_core::task::TaskDefinition,
654 _hosts: &genja_core::inventory::Hosts,
655 _connection_resolver: Option<
656 std::sync::Arc<dyn genja_core::task::TaskConnectionResolver>,
657 >,
658 _runner_config: &genja_core::settings::RunnerConfig,
659 _max_depth: usize,
660 ) -> Result<genja_core::task::TaskResults, genja_core::GenjaError> {
661 Ok(genja_core::task::TaskResults::new("runner"))
662 }
663
664 async fn run_tasks(
665 &self,
666 _tasks: &Tasks,
667 _hosts: &genja_core::inventory::Hosts,
668 _connection_resolver: Option<
669 std::sync::Arc<dyn genja_core::task::TaskConnectionResolver>,
670 >,
671 _runner_config: &genja_core::settings::RunnerConfig,
672 _max_depth: usize,
673 ) -> Result<Vec<genja_core::task::TaskResults>, genja_core::GenjaError> {
674 Ok(Vec::new())
675 }
676 }
677
678 fn default_params() -> ResolvedConnectionParams {
679 ResolvedConnectionParams {
680 hostname: "host1".to_string(),
681 port: Some(22),
682 username: Some("user".to_string()),
683 password: Some("pass".to_string()),
684 platform: Some("linux".to_string()),
685 extras: None,
686 }
687 }
688
689 #[test]
690 fn adapter_open_close_updates_alive_and_returns_key() {
691 let key = ConnectionKey::new("host1", "ssh");
692 let plugin = TestConnection::new("ssh", key.clone());
693
694 let mut adapter = PluginConnectionAdapter::new(Box::new(plugin));
695 assert!(!adapter.is_alive());
696
697 run_async(adapter.open(&default_params())).unwrap();
698 assert!(adapter.is_alive());
699
700 let closed_key = adapter.close();
701 assert_eq!(closed_key, key);
702 assert!(!adapter.is_alive());
703 }
704
705 #[test]
706 fn adapter_create_uses_plugin_create_and_starts_dead() {
707 let key = ConnectionKey::new("host1", "ssh");
708 let plugin = TestConnection::new("ssh", key.clone());
709 let adapter = PluginConnectionAdapter::new(Box::new(plugin));
710
711 let new_key = ConnectionKey::new("host2", "ssh");
712 let new_conn = adapter.create(&new_key);
713 assert!(!new_conn.is_alive());
714 }
715
716 #[test]
717 fn factory_returns_none_for_missing_or_non_connection_plugins() {
718 let manager = Arc::new(PluginManager::new());
719 let factory = build_connection_factory(Arc::clone(&manager));
720 let key = ConnectionKey::new("host1", "ssh");
721 assert!(factory(&key).is_none());
722
723 let mut manager = PluginManager::new();
724 manager.register_plugin(Plugins::Runner(Box::new(DummyRunner { name: "runner" })));
725 let factory = build_connection_factory(Arc::new(manager));
726 let key = ConnectionKey::new("host1", "runner");
727 assert!(factory(&key).is_none());
728 }
729
730 #[test]
731 fn factory_returns_adapter_for_connection_plugins() {
732 let key = ConnectionKey::new("host1", "ssh");
733 let plugin = TestConnection::new("ssh", key.clone());
734
735 let mut manager = PluginManager::new();
736 manager.register_plugin(Plugins::Connection(Box::new(plugin)));
737
738 let factory = build_connection_factory(Arc::new(manager));
739 let connection = factory(&key).expect("expected connection plugin");
740
741 {
742 let mut guard = run_async(connection.lock());
743 assert!(!guard.is_alive());
744 run_async(guard.open(&default_params())).unwrap();
745 assert!(guard.is_alive());
746 let closed_key = guard.close();
747 assert_eq!(closed_key, key);
748 assert!(!guard.is_alive());
749 }
750 }
751
752 #[test]
753 fn manager_counters_increment_on_open_and_close() {
754 let key = ConnectionKey::new("host1", "ssh");
755 let plugin = TestConnection::new("ssh", key.clone());
756
757 let mut manager = PluginManager::new();
758 manager.register_plugin(Plugins::Connection(Box::new(plugin)));
759
760 let factory = build_connection_factory(Arc::new(manager));
761 let connection_manager = ConnectionManager::with_connection_factory(factory);
762
763 let params = default_params();
764 let connection = run_async(connection_manager.open_connection(&key, ¶ms))
765 .unwrap()
766 .unwrap();
767
768 let counters = connection_manager.connection_counters_for("ssh").unwrap();
769 assert_eq!(counters.create_calls, 1);
770 assert_eq!(counters.open_calls, 1);
771 assert_eq!(counters.close_calls, 0);
772
773 drop(connection);
774 connection_manager.close_connection(&key);
775 let counters = connection_manager.connection_counters_for("ssh").unwrap();
776 assert_eq!(counters.create_calls, 1);
777 assert_eq!(counters.open_calls, 1);
778 assert_eq!(counters.close_calls, 1);
779 }
780
781 #[test]
782 fn open_connection_twice_does_not_double_count_open() {
783 let key = ConnectionKey::new("host1", "ssh");
784 let plugin = TestConnection::new("ssh", key.clone());
785
786 let mut manager = PluginManager::new();
787 manager.register_plugin(Plugins::Connection(Box::new(plugin)));
788
789 let factory = build_connection_factory(Arc::new(manager));
790 let connection_manager = ConnectionManager::with_connection_factory(factory);
791
792 let params = default_params();
793 run_async(connection_manager.open_connection(&key, ¶ms))
794 .unwrap()
795 .unwrap();
796
797 let counters_after_first = connection_manager.connection_counters_for("ssh").unwrap();
798 assert_eq!(counters_after_first.create_calls, 1);
799 assert_eq!(counters_after_first.open_calls, 1);
800
801 run_async(connection_manager.open_connection(&key, ¶ms))
802 .unwrap()
803 .unwrap();
804
805 let counters_after_second = connection_manager.connection_counters_for("ssh").unwrap();
806 assert_eq!(counters_after_second.create_calls, 1);
807 assert_eq!(counters_after_second.open_calls, 1);
808 }
809
810 #[test]
811 fn open_connection_errors_when_factory_missing() {
812 let connection_manager = ConnectionManager::default();
813 let key = ConnectionKey::new("host1", "ssh");
814 let params = default_params();
815
816 let err = run_async(connection_manager.open_connection(&key, ¶ms)).unwrap_err();
817 assert_eq!(err, "connection factory not set");
818 }
819
820 #[test]
821 fn connection_counters_snapshot_tracks_multiple_types() {
822 let key_ssh = ConnectionKey::new("host1", "ssh");
823 let key_telnet = ConnectionKey::new("host2", "telnet");
824
825 let plugin_ssh = TestConnection::new("ssh", key_ssh.clone());
826 let plugin_telnet = TestConnection::new("telnet", key_telnet.clone());
827
828 let mut manager = PluginManager::new();
829 manager.register_plugin(Plugins::Connection(Box::new(plugin_ssh)));
830 manager.register_plugin(Plugins::Connection(Box::new(plugin_telnet)));
831
832 let factory = build_connection_factory(Arc::new(manager));
833 let connection_manager = ConnectionManager::with_connection_factory(factory);
834
835 let params = default_params();
836 run_async(connection_manager.open_connection(&key_ssh, ¶ms))
837 .unwrap()
838 .unwrap();
839 run_async(connection_manager.open_connection(&key_telnet, ¶ms))
840 .unwrap()
841 .unwrap();
842
843 let snapshot = connection_manager.connection_counters_snapshot();
844 let ssh = snapshot.get("ssh").copied().unwrap();
845 let telnet = snapshot.get("telnet").copied().unwrap();
846
847 assert_eq!(ssh.create_calls, 1);
848 assert_eq!(ssh.open_calls, 1);
849 assert_eq!(ssh.close_calls, 0);
850
851 assert_eq!(telnet.create_calls, 1);
852 assert_eq!(telnet.open_calls, 1);
853 assert_eq!(telnet.close_calls, 0);
854 }
855
856 /// Tests that connections created by the factory are thread-safe and can handle concurrent access.
857 ///
858 /// This test verifies that the connection adapter wrapped in `Arc<Mutex<_>>` properly
859 /// synchronizes access from multiple threads. It spawns two threads that concurrently
860 /// attempt to open and close the same connection, ensuring that:
861 ///
862 /// 1. The mutex prevents data races on the connection state
863 /// 2. Multiple threads can safely acquire and release the lock
864 /// 3. Connection operations (open/close) are properly serialized
865 /// 4. The operation counters reflect all concurrent operations
866 ///
867 /// # Test Setup
868 ///
869 /// - Creates a test connection plugin with atomic counters to track operations
870 /// - Registers the plugin with a `PluginManager`
871 /// - Builds a connection factory and creates a connection instance
872 /// - Uses a barrier to synchronize thread execution for maximum contention
873 ///
874 /// # Test Execution
875 ///
876 /// - Thread A: Opens the connection
877 /// - Thread B: Closes then reopens the connection
878 /// - Main thread: Waits for both threads and performs final cleanup
879 ///
880 /// # Assertions
881 ///
882 /// - Verifies that at least 2 open operations occurred (one per thread)
883 /// - Verifies that at least 1 close operation occurred (from thread B)
884 ///
885 /// # Panics
886 ///
887 /// This test will panic if:
888 /// - The connection factory returns `None`
889 /// - Any thread fails to acquire the mutex lock
890 /// - Any connection operation fails
891 /// - The expected minimum operation counts are not met
892 #[test]
893 fn factory_connection_is_thread_safe() {
894 let key = ConnectionKey::new("host1", "ssh");
895 let plugin = TestConnection::new("ssh", key.clone());
896
897 let mut manager = PluginManager::new();
898 manager.register_plugin(Plugins::Connection(Box::new(plugin)));
899
900 let factory = build_connection_factory(Arc::new(manager));
901 let manager = Arc::new(ConnectionManager::with_connection_factory(factory));
902
903 let barrier = Arc::new(std::sync::Barrier::new(3));
904 let params = Arc::new(default_params());
905
906 let barrier_a = Arc::clone(&barrier);
907 let params_a = Arc::clone(¶ms);
908 let manager_a = Arc::clone(&manager);
909 let key_a = key.clone();
910 let thread_a = std::thread::spawn(move || {
911 barrier_a.wait();
912 run_async(manager_a.open_connection(&key_a, ¶ms_a))
913 .unwrap()
914 .unwrap();
915 });
916
917 let barrier_b = Arc::clone(&barrier);
918 let params_b = Arc::clone(¶ms);
919 let manager_b = Arc::clone(&manager);
920 let key_b = key.clone();
921 let thread_b = std::thread::spawn(move || {
922 barrier_b.wait();
923 run_async(manager_b.open_connection(&key_b, ¶ms_b))
924 .unwrap()
925 .unwrap();
926 });
927
928 barrier.wait();
929 thread_a.join().unwrap();
930 thread_b.join().unwrap();
931
932 manager.close_connection(&key);
933 let counters = manager.connection_counters_for("ssh").unwrap();
934
935 assert_eq!(counters.create_calls, 1);
936 assert_eq!(counters.open_calls, 1);
937 assert_eq!(counters.close_calls, 1);
938 }
939
940 #[test]
941 fn adapter_open_error_keeps_alive_false() {
942 #[derive(Debug)]
943 struct FailingConnection;
944
945 impl Plugin for FailingConnection {
946 fn name(&self) -> String {
947 "fail".to_string()
948 }
949 }
950
951 #[async_trait]
952 impl PluginConnection for FailingConnection {
953 fn create(&self, _key: &ConnectionKey) -> Box<dyn PluginConnection> {
954 Box::new(Self)
955 }
956
957 async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
958 Err("boom".to_string())
959 }
960
961 fn close(&mut self) -> ConnectionKey {
962 ConnectionKey::new("host1", "fail")
963 }
964
965 fn is_alive(&self) -> bool {
966 false
967 }
968 }
969
970 let mut adapter = PluginConnectionAdapter::new(Box::new(FailingConnection));
971 assert!(!adapter.is_alive());
972 let err = run_async(adapter.open(&default_params())).unwrap_err();
973 assert_eq!(err, "boom");
974 assert!(!adapter.is_alive());
975 }
976
977 #[test]
978 fn adapter_create_can_be_called_multiple_times() {
979 let key = ConnectionKey::new("host1", "ssh");
980 let plugin = TestConnection::new("ssh", key);
981 let adapter = PluginConnectionAdapter::new(Box::new(plugin));
982
983 let key_a = ConnectionKey::new("host-a", "ssh");
984 let key_b = ConnectionKey::new("host-b", "ssh");
985 let conn_a = adapter.create(&key_a);
986 let conn_b = adapter.create(&key_b);
987
988 assert!(!conn_a.is_alive());
989 assert!(!conn_b.is_alive());
990 }
991}