rustenium_core/
connection.rs1use std::{collections::HashMap, net::TcpListener};
2use std::sync::Arc;
3use rustenium_bidi_definitions::base::EventResponse;
4use rustenium_cdp_definitions::base;
5use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
6use tokio::sync::{oneshot, Mutex};
7
8use crate::listeners::{
9 CommandResponseListener, CommandResponseState, EventListener, Listener,
10 CdpCommandResponseListener, CdpCommandResponseState, CdpEventListener, CdpListener,
11};
12use crate::transport::ConnectionTransport;
13
14pub fn find_free_port() -> std::io::Result<u16> {
15 let listener = TcpListener::bind("127.0.0.1:0")?;
16 let port = listener.local_addr()?.port();
17 Ok(port)
18}
19
20pub struct BidiConnection<T: ConnectionTransport> {
23 transport: T,
24 pub commands_response_subscriptions: Arc<Mutex<HashMap<u64, oneshot::Sender<CommandResponseState>>>>,
25 event_listener: EventListener,
26}
27
28impl<T: ConnectionTransport> BidiConnection<T> {
29 pub fn new(connection_transport: T) -> Self {
30 Self {
31 transport: connection_transport,
32 commands_response_subscriptions: Arc::new(Mutex::new(HashMap::new())),
33 event_listener: EventListener::new(),
34 }
35 }
36
37 pub async fn register_event_listener_channel(&mut self, channel: UnboundedSender<EventResponse>) {
38 self.event_listener.listeners.lock().await.push(channel);
39 }
40
41 pub fn start_listeners(&self) {
42 let (listener_tx, listener_rx) = unbounded_channel::<String>();
43 let (command_response_tx, command_response_rx) = unbounded_channel::<CommandResponseState>();
44 let (event_tx, event_rx) = unbounded_channel::<EventResponse>();
45
46 self.transport.listen(listener_tx);
47
48 let listener = Listener::new(listener_rx, command_response_tx, event_tx);
49 listener.start();
50
51 let commands_response_listener = CommandResponseListener::new(command_response_rx, self.commands_response_subscriptions.clone());
52 commands_response_listener.start();
53 self.event_listener.start(event_rx);
54 }
55
56 pub async fn send(&mut self, data: String) {
57 self.transport.send(data).await;
58 }
59
60 pub async fn close(&self) {
61 self.transport.close().await;
62 }
63}
64
65pub struct CdpConnection<T: ConnectionTransport> {
68 transport: T,
69 pub commands_response_subscriptions: Arc<Mutex<HashMap<u16, oneshot::Sender<CdpCommandResponseState>>>>,
70 event_listener: CdpEventListener,
71}
72
73impl<T: ConnectionTransport> CdpConnection<T> {
74 pub fn new(connection_transport: T) -> Self {
75 Self {
76 transport: connection_transport,
77 commands_response_subscriptions: Arc::new(Mutex::new(HashMap::new())),
78 event_listener: CdpEventListener::new(),
79 }
80 }
81
82 pub async fn register_event_listener_channel(&mut self, channel: UnboundedSender<base::EventResponse>) {
83 self.event_listener.listeners.lock().await.push(channel);
84 }
85
86 pub fn start_listeners(&self) {
87 let (listener_tx, listener_rx) = unbounded_channel::<String>();
88 let (command_response_tx, command_response_rx) = unbounded_channel::<CdpCommandResponseState>();
89 let (event_tx, event_rx) = unbounded_channel::<base::EventResponse>();
90
91 self.transport.listen(listener_tx);
92
93 let listener = CdpListener::new(listener_rx, command_response_tx, event_tx);
94 listener.start();
95
96 let commands_response_listener = CdpCommandResponseListener::new(command_response_rx, self.commands_response_subscriptions.clone());
97 commands_response_listener.start();
98 self.event_listener.start(event_rx);
99 }
100
101 pub async fn send(&mut self, data: String) {
102 self.transport.send(data).await;
103 }
104
105 pub async fn close(&self) {
106 self.transport.close().await;
107 }
108}