ataraxia_voice/
vortex_socket.rs

1//! # A Voice Server Implementation for Ataraxia revolt library
2
3
4
5
6use std::{sync::{Arc}};
7
8use futures_util::{SinkExt, StreamExt, stream::{SplitSink, SplitStream}};
9use serde_json::json;
10use tokio::{net::TcpStream, sync::Mutex};
11use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
12use rtp_rs::*;
13
14#[derive(Clone)]
15
16pub struct VoiceClient {
17    /// Your bot's Token
18    pub token: String,
19    /// The actual Socket Connection
20    socket: Option<Socket>,
21    #[allow(dead_code)]
22    api_url: String,
23    ata_socket: Arc<Option<Socket>>
24
25}
26
27
28#[derive(Clone)]
29struct Socket {
30    socket_writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
31    socket_reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
32    udp_socket: Arc<Mutex<tokio::net::UdpSocket>>,
33}
34
35
36
37impl VoiceClient {
38    pub async fn new(token: String,  api_url: Option<String>) -> Self {
39
40        let api_url = match api_url {
41            Some(a) => a,
42            None => "https://vortex.revolt.chat/".to_owned()
43        };
44
45
46        Self {
47            token,
48            socket: None,
49            api_url,
50            ata_socket: Arc::new(None)
51        }
52    }
53
54
55    pub async fn init(&mut self, channel_id: &str) {
56        let websocket = Socket::new().await;
57        self.socket = Some(websocket);
58        let conn = self.socket.clone().unwrap().connect(&self.token, channel_id).await;
59        self.ata_socket = Arc::new(Some(conn));
60        println!("Connected!");
61    }
62
63
64    pub async fn play_source(&mut self, _source: &str) {
65
66       
67    }
68
69}
70
71impl Socket {
72    pub async fn new() -> Socket {
73        let (ws_stream, _) = connect_async("wss://vortex.revolt.chat").await.unwrap();
74        let (writer, reader) = ws_stream.split();
75        let udp_socket = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
76
77        Socket {
78            socket_writer: Arc::from(Mutex::new(writer)),
79            socket_reader: Arc::from(Mutex::new(reader)),
80            udp_socket: Arc::from(Mutex::new(udp_socket))
81        }
82    }
83
84    /// Authenticate to Voice Servers
85    /// Where `token` is your bots token
86    /// and `channel_id` is the channel id of the voice channel you are connecting to
87    pub async fn connect(self, token: &String, channel_id: &str) -> Socket {
88        self.socket_writer.lock().await.send(Message::Text(json!({
89            "id": 0,
90            "data": {
91                "roomId": channel_id,
92                "token": token,
93            },
94            "type": "Authenticate"
95        }).to_string())).await.unwrap();
96
97        self.socket_writer.lock().await.send(Message::Text(json!({
98            "id": 1,
99            "type": "RoomInfo"
100        }).to_string())).await.unwrap();
101
102        self.socket_writer.lock().await.send(Message::Text(json!(
103            {
104                "id":25,
105                "type":"InitializeTransports",
106                "data":{"mode":"CombinedRTP",
107                "rtpCapabilities":{
108                    "codecs":[{"mimeType":"audio/opus","kind":"audio","preferredPayloadType":100,"clockRate":48000,"channels":2,"parameters":{"minptime":10,"useinbandfec":1},"rtcpFeedback":[{"type":"transport-cc","parameter":""}]}],
109                    "headerExtensions":[
110                        {"kind":"audio","uri":"urn:ietf:params:rtp-hdrext:sdes:mid","preferredId":1,"preferredEncrypt":false,"direction":"sendrecv"},
111                        {"kind":"video","uri":"urn:ietf:params:rtp-hdrext:sdes:mid","preferredId":1,"preferredEncrypt":false,"direction":"sendrecv"},
112                        {"kind":"audio","uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","preferredId":4,"preferredEncrypt":false,"direction":"sendrecv"},
113                        {"kind":"video","uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","preferredId":4,"preferredEncrypt":false,"direction":"sendrecv"},
114                        {"kind":"video","uri":"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01","preferredId":5,"preferredEncrypt":false,"direction":"sendrecv"},
115                        {"kind":"audio","uri":"urn:ietf:params:rtp-hdrext:ssrc-audio-level","preferredId":10,"preferredEncrypt":false,"direction":"sendrecv"},
116                        {"kind":"video","uri":"urn:3gpp:video-orientation","preferredId":11,"preferredEncrypt":false,"direction":"sendrecv"},
117                        {"kind":"video","uri":"urn:ietf:params:rtp-hdrext:toffset","preferredId":12,"preferredEncrypt":false,"direction":"sendrecv"}]}}}
118    ).to_string())).await.unwrap();
119    
120
121    self.socket_writer.lock().await.send(Message::Text(json!(
122        {
123            "id":30,
124            "type":"StartProduce",
125            "data":{
126                "type":"audio","rtpParameters":{
127                    "codecs":[
128                        {"mimeType":"audio/opus","payloadType":111,"clockRate":48000,"channels":2,"parameters":{"minptime":10,"useinbandfec":1},
129                        "rtcpFeedback":[{"type":"transport-cc","parameter":""}]}],
130                        "headerExtensions":[
131                            {"uri":"urn:ietf:params:rtp-hdrext:sdes:mid","id":4,"encrypt":false,"parameters":{}},
132                            {"uri":"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time","id":2,"encrypt":false,"parameters":{}},
133                            {"uri":"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01","id":3,"encrypt":false,"parameters":{}},
134                            {"uri":"urn:ietf:params:rtp-hdrext:ssrc-audio-level","id":1,"encrypt":false,"parameters":{}}],
135                            "encodings":[{"ssrc":3082236920i64,"dtx":false}],
136                            "rtcp":{"cname":"PxvC7Ug841mk/2iE","reducedSize":false},
137                            "mid":"0"}}}
138).to_string())).await.unwrap();
139
140println!("3 pew pew");
141
142
143        let handler_reader = Arc::clone(&self.socket_reader);
144        let handler_writer = Arc::clone(&self.socket_writer);
145        let arc_token = Arc::clone(&Arc::new(token.to_owned()));
146
147        let self_clone = self.clone();
148        println!("2 pew pew");
149
150
151        tokio::spawn(async move {
152            crate::vortex_socket::Socket::handler(&self_clone, handler_reader, handler_writer, arc_token).await;
153        });
154
155        println!("pew pew");
156
157        self
158    }
159
160
161
162
163    pub async fn handler(&self, reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
164        _writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
165        _token: Arc<String>
166    )
167    {
168            while let Some(message) = reader.lock().await.next().await {
169                match message {
170                    Ok(message) => {
171
172                        if message.is_text() {
173                            let json: serde_json::Value = serde_json::from_str(&message.to_string()).unwrap();
174                            let _json_clone = json.clone();
175                            
176                            match json["type"].as_str() {                                
177                                Some("Authenticate") => {
178
179                                    println!("Received Authenticated");
180
181                                    // spawn heartbeat thread 
182                                    
183                                    //  let writer_clone = Arc::clone(&writer);
184                                    //  tokio::spawn(async move {
185                                    //      loop {
186                                    //          println!("[VORTEX] Sending Heartbeat...");
187                                    //          let ping_result = writer_clone.lock().await.send(Message::Text(serde_json::json!({
188                                    //              "type": "Ping",
189                                    //              "data": std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
190                                    //          }).to_string())).await;
191
192                                    //          match ping_result {
193                                    //              Ok(_) => {
194                                    //                  println!("[VORTEX] Heartbeat Sent");
195                                    //              },
196                                    //              Err(e) => {
197                                    //                  println!("[VORTEX] Heartbeat Failed: {}", e);
198                                    //                  // close socket
199                                    //                  break;
200                                    //                 writer_clone.lock().await.close();
201                                    //              }
202                                    //          }
203
204                                    //          // release lock and wait for next heartbeat
205                                    //          tokio::time::sleep(std::time::Duration::from_secs(10)).await;
206                                    //      }
207                                    //  });
208                                },
209                                Some("InitializeTransports") => {
210                                    println!("[DEBUG] JSON PAYLOAD {:?}", json);
211
212                                    let ip = json["data"]["ip"].as_str().unwrap();
213                                    let port = json["data"]["port"].as_i64().unwrap();
214
215                                    self.udp_socket.lock().await.connect(
216                                        format!("{}:{}", ip, port)
217                                    ).await.unwrap();
218                                    println!("[VORTEX] UDP Socket Connected");
219                                },
220
221                                Some("StartProduce") => {
222                                    // Send Audio here
223                                    println!("[VORTEX] Start Produce");
224
225                                    // sleep for a bit to let the client connect
226                                    tokio::time::sleep(std::time::Duration::from_secs(10)).await;
227                                    println!("[VORTEX] Sending Audio");
228
229                                    let ffmpeg = std::process::Command::new("ffmpeg")
230                                        .arg("-i")
231                                        .arg("/home/me/audio/meddl.webm")
232                                        .arg("-f")
233                                        .arg("s16le")
234                                        .arg("-ac")
235                                        .arg("2")
236                                        .arg("-ar")
237                                        .arg("48000")
238                                        .arg("-acodec")
239                                        .arg("pcm_f32le")
240                                        .arg("-")
241                                        .output()
242
243                                        .expect("[CRITICAL] Failed to execute ffmpeg");
244
245                                    // split 
246
247                                    let packet = ffmpeg.stdout;
248
249                                    // split packet into chunks of RTP_PACKET_SIZE
250                                    const RTP_PACKET_SIZE: usize = 1200;
251                                    let mut packet_chunks = packet.chunks(RTP_PACKET_SIZE);
252
253
254                                    while let Some(payload) = packet_chunks.next() {
255
256                                        self.udp_socket.lock().await.send(&payload).await.unwrap();
257                                    }
258                                        
259
260                                   /*  let result = RtpPacketBuilder::new()
261                                        .payload_type(10)
262                                        .payload(&packet_to_send)
263                                        .build();
264                                    if let Ok(packet) = result {
265                                        println!("Packet: {:?}", packet);
266                                        self.udp_socket.lock().await.send(&packet).await.unwrap();
267                                    } */
268                                }
269
270                                Some(&_) => {
271                                    println!("[VORTEX] Received Message Type: {}", json["type"]);
272                                },
273                                None => {},
274                            }
275                        }
276
277                    }
278                    Err(e) => {
279                        return eprintln!("{:?}", e);
280                    }
281                }
282            }
283    }
284}