1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::env;
use gimbal_database::config::ConfigDatabase;
use moosicbox_auth::get_client_id_and_access_token;
use moosicbox_tunnel::TunnelRequest;
use moosicbox_tunnel_sender::{
TunnelMessage,
sender::{TunnelSender, TunnelSenderHandle},
};
use thiserror::Error;
use tokio::task::JoinHandle;
use url::Url;
use crate::{CANCELLATION_TOKEN, WS_SERVER_HANDLE};
#[derive(Debug, Error)]
pub enum SetupTunnelError {
#[error(transparent)]
IO(#[from] std::io::Error),
}
#[cfg_attr(feature = "profiling", profiling::function)]
#[allow(clippy::too_many_lines, clippy::module_name_repetitions)]
pub async fn setup_tunnel(
config_db: ConfigDatabase,
service_port: u16,
) -> Result<
(
Option<String>,
Option<JoinHandle<()>>,
Option<TunnelSenderHandle>,
),
SetupTunnelError,
> {
if let Ok(url) = env::var("WS_HOST") {
if url.is_empty() {
Ok((None, None, None))
} else {
log::debug!("Using WS_HOST: {url}");
let ws_url = url.clone();
let url = Url::parse(&url).expect("Invalid WS_HOST");
let hostname = url
.host_str()
.map(std::string::ToString::to_string)
.expect("Invalid WS_HOST");
let host = format!(
"{}://{hostname}{}",
if url.scheme() == "wss" {
"https"
} else {
"http"
},
url.port()
.map_or_else(String::new, |port| format!(":{port}"))
);
// FIXME: Handle retry
let (client_id, access_token) = {
get_client_id_and_access_token(&config_db, &host)
.await
.map_err(|e| {
std::io::Error::other(format!("Could not get access token: {e:?}"))
})?
};
let (mut tunnel, handle) = TunnelSender::new(
host.clone(),
ws_url,
client_id,
access_token,
config_db.clone(),
);
tunnel = tunnel.with_cancellation_token(CANCELLATION_TOKEN.clone());
Ok((
Some(host),
Some(moosicbox_task::spawn("server: tunnel", async move {
let mut rx = tunnel.start();
while let Some(m) = rx.recv().await {
match m {
TunnelMessage::Text(m) => {
log::debug!("Received text TunnelMessage {}", &m);
let tunnel = tunnel.clone();
moosicbox_task::spawn("server: tunnel message", async move {
match serde_json::from_str(&m).unwrap() {
TunnelRequest::Http(request) => {
if let Err(err) = tunnel
.tunnel_request(
service_port,
request.request_id,
request.method,
request.path,
request.query,
request.payload,
request.headers,
request.profile,
request.encoding,
)
.await
{
log::error!("Tunnel request failed: {err:?}");
}
}
TunnelRequest::Ws(request) => {
let sender = WS_SERVER_HANDLE
.read()
.await
.as_ref()
.ok_or("Failed to get ws server handle")?
.clone();
if let Err(err) = tunnel
.ws_request(
request.conn_id,
request.request_id,
request.body.clone(),
request.profile,
sender,
)
.await
{
log::error!(
"Failed to propagate ws request {} from conn_id {}: {err:?}",
request.request_id,
request.conn_id
);
}
}
TunnelRequest::Abort(request) => {
log::debug!("Aborting request {}", request.request_id);
tunnel.abort_request(request.request_id);
}
}
Ok::<_, String>(())
});
}
TunnelMessage::Binary(_) => {
unimplemented!("Binary TunnelMessage is not implemented")
}
TunnelMessage::Ping(_) | TunnelMessage::Pong(_) => {}
TunnelMessage::Close => {
log::info!("Tunnel connection was closed");
}
TunnelMessage::Frame(_) => {
unimplemented!("Frame TunnelMessage is not implemented")
}
}
}
log::debug!("Exiting tunnel message loop");
})),
Some(handle),
))
}
} else {
Ok((None, None, None))
}
}