rustenium_core/
connection.rs1use std::{collections::HashMap, net::TcpListener};
2use std::sync::Arc;
3use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
4use rustenium_bidi_commands::{Event};
5use tokio::sync::{oneshot, Mutex};
6
7use crate::{listeners::CommandResponseState, transport::ConnectionTransport};
8use crate::listeners::{CommandResponseListener, EventListener, Listener};
9
10pub struct Connection<T: ConnectionTransport> {
11 transport: T,
12 pub commands_response_subscriptions: Arc<Mutex<HashMap<u64, oneshot::Sender<CommandResponseState>>>>,
13 event_listener: EventListener,
14}
15
16pub fn find_free_port() -> std::io::Result<u16> {
17 let listener = TcpListener::bind("127.0.0.1:0")?;
18 let port = listener.local_addr()?.port();
19 Ok(port)
20}
21
22impl<T> Connection<T>
23where
24 T: ConnectionTransport,
25{
26 pub fn new(connection_transport: T) -> Self {
27 Self {
28 transport: connection_transport,
29 commands_response_subscriptions: Arc::new(Mutex::new(HashMap::new())),
30 event_listener: EventListener::new(),
31 }
32 }
33
34 pub async fn register_event_listener_channel(&mut self, channel: UnboundedSender<Event>) {
35 self.event_listener.listeners.lock().await.push(channel);
36 }
37
38 pub fn start_listeners(&self) -> () {
39 let (listener_tx, listener_rx) = unbounded_channel::<String>();
40 let (command_response_tx, command_response_rx) = unbounded_channel::<CommandResponseState>();
41 let (event_tx, event_rx) = unbounded_channel::<Event>();
42
43 self.transport.listen(listener_tx);
44
45 let listener = Listener::new(listener_rx, command_response_tx, event_tx);
46 listener.start();
47
48 let commands_response_listener = CommandResponseListener::new(command_response_rx, self.commands_response_subscriptions.clone());
49 commands_response_listener.start();
50 self.event_listener.start(event_rx);
51 }
52
53 pub async fn send(&mut self, data: String) {
54 self.transport.send(data).await;
55 }
56
57 pub fn close(&self) {
58 self.transport.close();
59 }
60}