Skip to main content

rustenium_core/
connection.rs

1use std::{collections::HashMap, net::TcpListener};
2use std::sync::Arc;
3use rustenium_bidi_definitions::base::EventResponse;
4use rustenium_cdp_definitions::base;
5use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
6use tokio::sync::{oneshot, Mutex};
7
8use crate::listeners::{
9    CommandResponseListener, CommandResponseState, EventListener, Listener,
10    CdpCommandResponseListener, CdpCommandResponseState, CdpEventListener, CdpListener,
11};
12use crate::transport::ConnectionTransport;
13
14pub fn find_free_port() -> std::io::Result<u16> {
15    let listener = TcpListener::bind("127.0.0.1:0")?;
16    let port = listener.local_addr()?.port();
17    Ok(port)
18}
19
20// ── BiDi Connection ──────────────────────────────────────────────────────────
21
22pub struct BidiConnection<T: ConnectionTransport> {
23    transport: T,
24    pub commands_response_subscriptions: Arc<Mutex<HashMap<u64, oneshot::Sender<CommandResponseState>>>>,
25    event_listener: EventListener,
26}
27
28impl<T: ConnectionTransport> BidiConnection<T> {
29    pub fn new(connection_transport: T) -> Self {
30        Self {
31            transport: connection_transport,
32            commands_response_subscriptions: Arc::new(Mutex::new(HashMap::new())),
33            event_listener: EventListener::new(),
34        }
35    }
36
37    pub async fn register_event_listener_channel(&mut self, channel: UnboundedSender<EventResponse>) {
38        self.event_listener.listeners.lock().await.push(channel);
39    }
40
41    pub fn start_listeners(&self) {
42        let (listener_tx, listener_rx) = unbounded_channel::<String>();
43        let (command_response_tx, command_response_rx) = unbounded_channel::<CommandResponseState>();
44        let (event_tx, event_rx) = unbounded_channel::<EventResponse>();
45
46        self.transport.listen(listener_tx);
47
48        let listener = Listener::new(listener_rx, command_response_tx, event_tx);
49        listener.start();
50
51        let commands_response_listener = CommandResponseListener::new(command_response_rx, self.commands_response_subscriptions.clone());
52        commands_response_listener.start();
53        self.event_listener.start(event_rx);
54    }
55
56    pub async fn send(&mut self, data: String) {
57        self.transport.send(data).await;
58    }
59
60    pub async fn close(&self) {
61        self.transport.close().await;
62    }
63}
64
65// ── CDP Connection ───────────────────────────────────────────────────────────
66
67pub struct CdpConnection<T: ConnectionTransport> {
68    transport: T,
69    pub commands_response_subscriptions: Arc<Mutex<HashMap<u16, oneshot::Sender<CdpCommandResponseState>>>>,
70    event_listener: CdpEventListener,
71}
72
73impl<T: ConnectionTransport> CdpConnection<T> {
74    pub fn new(connection_transport: T) -> Self {
75        Self {
76            transport: connection_transport,
77            commands_response_subscriptions: Arc::new(Mutex::new(HashMap::new())),
78            event_listener: CdpEventListener::new(),
79        }
80    }
81
82    pub async fn register_event_listener_channel(&mut self, channel: UnboundedSender<base::EventResponse>) {
83        self.event_listener.listeners.lock().await.push(channel);
84    }
85
86    pub fn start_listeners(&self) {
87        let (listener_tx, listener_rx) = unbounded_channel::<String>();
88        let (command_response_tx, command_response_rx) = unbounded_channel::<CdpCommandResponseState>();
89        let (event_tx, event_rx) = unbounded_channel::<base::EventResponse>();
90
91        self.transport.listen(listener_tx);
92
93        let listener = CdpListener::new(listener_rx, command_response_tx, event_tx);
94        listener.start();
95
96        let commands_response_listener = CdpCommandResponseListener::new(command_response_rx, self.commands_response_subscriptions.clone());
97        commands_response_listener.start();
98        self.event_listener.start(event_rx);
99    }
100
101    pub async fn send(&mut self, data: String) {
102        self.transport.send(data).await;
103    }
104
105    pub async fn close(&self) {
106        self.transport.close().await;
107    }
108}