1use std::{sync::{Arc}};
7
8use futures_util::{SinkExt, StreamExt, stream::{SplitSink, SplitStream}};
9use serde_json::json;
10use tokio::{net::TcpStream, sync::Mutex};
11use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
12use rtp_rs::*;
13
14#[derive(Clone)]
15
16pub struct VoiceClient {
17 pub token: String,
19 socket: Option<Socket>,
21 #[allow(dead_code)]
22 api_url: String,
23 ata_socket: Arc<Option<Socket>>
24
25}
26
27
28#[derive(Clone)]
29struct Socket {
30 socket_writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
31 socket_reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
32 udp_socket: Arc<Mutex<tokio::net::UdpSocket>>,
33}
34
35
36
37impl VoiceClient {
38 pub async fn new(token: String, api_url: Option<String>) -> Self {
39
40 let api_url = match api_url {
41 Some(a) => a,
42 None => "https://vortex.revolt.chat/".to_owned()
43 };
44
45
46 Self {
47 token,
48 socket: None,
49 api_url,
50 ata_socket: Arc::new(None)
51 }
52 }
53
54
55 pub async fn init(&mut self, channel_id: &str) {
56 let websocket = Socket::new().await;
57 self.socket = Some(websocket);
58 let conn = self.socket.clone().unwrap().connect(&self.token, channel_id).await;
59 self.ata_socket = Arc::new(Some(conn));
60 println!("Connected!");
61 }
62
63
64 pub async fn play_source(&mut self, _source: &str) {
65
66
67 }
68
69}
70
71impl Socket {
72 pub async fn new() -> Socket {
73 let (ws_stream, _) = connect_async("wss://vortex.revolt.chat").await.unwrap();
74 let (writer, reader) = ws_stream.split();
75 let udp_socket = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
76
77 Socket {
78 socket_writer: Arc::from(Mutex::new(writer)),
79 socket_reader: Arc::from(Mutex::new(reader)),
80 udp_socket: Arc::from(Mutex::new(udp_socket))
81 }
82 }
83
84 pub async fn connect(self, token: &String, channel_id: &str) -> Socket {
88 self.socket_writer.lock().await.send(Message::Text(json!({
89 "id": 0,
90 "data": {
91 "roomId": channel_id,
92 "token": token,
93 },
94 "type": "Authenticate"
95 }).to_string())).await.unwrap();
96
97 self.socket_writer.lock().await.send(Message::Text(json!({
98 "id": 1,
99 "type": "RoomInfo"
100 }).to_string())).await.unwrap();
101
102 self.socket_writer.lock().await.send(Message::Text(json!(
103 {
104 "id":25,
105 "type":"InitializeTransports",
106 "data":{"mode":"CombinedRTP",
107 "rtpCapabilities":{
108 "codecs":[{"mimeType":"audio/opus","kind":"audio","preferredPayloadType":100,"clockRate":48000,"channels":2,"parameters":{"minptime":10,"useinbandfec":1},"rtcpFeedback":[{"type":"transport-cc","parameter":""}]}],
109 "headerExtensions":[
110 {"kind":"audio","uri":"urn:ietf:params:rtp-hdrext:sdes:mid","preferredId":1,"preferredEncrypt":false,"direction":"sendrecv"},
111 {"kind":"video","uri":"urn:ietf:params:rtp-hdrext:sdes:mid","preferredId":1,"preferredEncrypt":false,"direction":"sendrecv"},
112 {"kind":"audio","uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","preferredId":4,"preferredEncrypt":false,"direction":"sendrecv"},
113 {"kind":"video","uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","preferredId":4,"preferredEncrypt":false,"direction":"sendrecv"},
114 {"kind":"video","uri":"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01","preferredId":5,"preferredEncrypt":false,"direction":"sendrecv"},
115 {"kind":"audio","uri":"urn:ietf:params:rtp-hdrext:ssrc-audio-level","preferredId":10,"preferredEncrypt":false,"direction":"sendrecv"},
116 {"kind":"video","uri":"urn:3gpp:video-orientation","preferredId":11,"preferredEncrypt":false,"direction":"sendrecv"},
117 {"kind":"video","uri":"urn:ietf:params:rtp-hdrext:toffset","preferredId":12,"preferredEncrypt":false,"direction":"sendrecv"}]}}}
118 ).to_string())).await.unwrap();
119
120
121 self.socket_writer.lock().await.send(Message::Text(json!(
122 {
123 "id":30,
124 "type":"StartProduce",
125 "data":{
126 "type":"audio","rtpParameters":{
127 "codecs":[
128 {"mimeType":"audio/opus","payloadType":111,"clockRate":48000,"channels":2,"parameters":{"minptime":10,"useinbandfec":1},
129 "rtcpFeedback":[{"type":"transport-cc","parameter":""}]}],
130 "headerExtensions":[
131 {"uri":"urn:ietf:params:rtp-hdrext:sdes:mid","id":4,"encrypt":false,"parameters":{}},
132 {"uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","id":2,"encrypt":false,"parameters":{}},
133 {"uri":"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01","id":3,"encrypt":false,"parameters":{}},
134 {"uri":"urn:ietf:params:rtp-hdrext:ssrc-audio-level","id":1,"encrypt":false,"parameters":{}}],
135 "encodings":[{"ssrc":3082236920i64,"dtx":false}],
136 "rtcp":{"cname":"PxvC7Ug841mk/2iE","reducedSize":false},
137 "mid":"0"}}}
138).to_string())).await.unwrap();
139
140println!("3 pew pew");
141
142
143 let handler_reader = Arc::clone(&self.socket_reader);
144 let handler_writer = Arc::clone(&self.socket_writer);
145 let arc_token = Arc::clone(&Arc::new(token.to_owned()));
146
147 let self_clone = self.clone();
148 println!("2 pew pew");
149
150
151 tokio::spawn(async move {
152 crate::vortex_socket::Socket::handler(&self_clone, handler_reader, handler_writer, arc_token).await;
153 });
154
155 println!("pew pew");
156
157 self
158 }
159
160
161
162
163 pub async fn handler(&self, reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
164 _writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
165 _token: Arc<String>
166 )
167 {
168 while let Some(message) = reader.lock().await.next().await {
169 match message {
170 Ok(message) => {
171
172 if message.is_text() {
173 let json: serde_json::Value = serde_json::from_str(&message.to_string()).unwrap();
174 let _json_clone = json.clone();
175
176 match json["type"].as_str() {
177 Some("Authenticate") => {
178
179 println!("Received Authenticated");
180
181 },
209 Some("InitializeTransports") => {
210 println!("[DEBUG] JSON PAYLOAD {:?}", json);
211
212 let ip = json["data"]["ip"].as_str().unwrap();
213 let port = json["data"]["port"].as_i64().unwrap();
214
215 self.udp_socket.lock().await.connect(
216 format!("{}:{}", ip, port)
217 ).await.unwrap();
218 println!("[VORTEX] UDP Socket Connected");
219 },
220
221 Some("StartProduce") => {
222 println!("[VORTEX] Start Produce");
224
225 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
227 println!("[VORTEX] Sending Audio");
228
229 let ffmpeg = std::process::Command::new("ffmpeg")
230 .arg("-i")
231 .arg("/home/me/audio/meddl.webm")
232 .arg("-f")
233 .arg("s16le")
234 .arg("-ac")
235 .arg("2")
236 .arg("-ar")
237 .arg("48000")
238 .arg("-acodec")
239 .arg("pcm_f32le")
240 .arg("-")
241 .output()
242
243 .expect("[CRITICAL] Failed to execute ffmpeg");
244
245 let packet = ffmpeg.stdout;
248
249 const RTP_PACKET_SIZE: usize = 1200;
251 let mut packet_chunks = packet.chunks(RTP_PACKET_SIZE);
252
253
254 while let Some(payload) = packet_chunks.next() {
255
256 self.udp_socket.lock().await.send(&payload).await.unwrap();
257 }
258
259
260 }
269
270 Some(&_) => {
271 println!("[VORTEX] Received Message Type: {}", json["type"]);
272 },
273 None => {},
274 }
275 }
276
277 }
278 Err(e) => {
279 return eprintln!("{:?}", e);
280 }
281 }
282 }
283 }
284}