active_call/media/track/
websocket.rs1use super::{Track, TrackConfig, TrackPacketSender, track_codec::TrackCodec};
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::Samples,
6 media::TrackId,
7 media::processor::ProcessorChain,
8};
9use anyhow::Result;
10use async_trait::async_trait;
11use audio_codec::bytes_to_samples;
12use bytes::Bytes;
13use std::{sync::Mutex, time::Duration};
14use tokio::select;
15use tokio_util::sync::CancellationToken;
16use tracing::{info, warn};
17
18pub type WebsocketBytesSender = tokio::sync::mpsc::UnboundedSender<Bytes>;
19pub type WebsocketBytesReceiver = tokio::sync::mpsc::UnboundedReceiver<Bytes>;
20
21pub struct WebsocketTrack {
22 track_id: TrackId,
23 config: TrackConfig,
24 cancel_token: CancellationToken,
25 processor_chain: ProcessorChain,
26 rx: Mutex<Option<WebsocketBytesReceiver>>,
27 encoder: TrackCodec,
28 payload_type: u8,
29 event_sender: EventSender,
30 ssrc: u32,
31}
32
33impl WebsocketTrack {
34 pub fn new(
35 cancel_token: CancellationToken,
36 track_id: TrackId,
37 track_config: TrackConfig,
38 event_sender: EventSender,
39 audio_receiver: WebsocketBytesReceiver,
40 codec: Option<String>,
41 ssrc: u32,
42 ) -> Self {
43 let processor_chain = ProcessorChain::new(track_config.samplerate);
44 let payload_type = match codec.unwrap_or("pcm".to_string()).to_lowercase().as_str() {
45 "pcmu" => 0,
46 "pcma" => 8,
47 "g722" => 9,
48 _ => u8::MAX, };
50 Self {
51 track_id,
52 config: track_config,
53 cancel_token,
54 processor_chain,
55 rx: Mutex::new(Some(audio_receiver)),
56 encoder: TrackCodec::new(),
57 payload_type,
58 event_sender,
59 ssrc,
60 }
61 }
62}
63
64#[async_trait]
65impl Track for WebsocketTrack {
66 fn ssrc(&self) -> u32 {
67 self.ssrc
68 }
69 fn id(&self) -> &TrackId {
70 &self.track_id
71 }
72 fn config(&self) -> &TrackConfig {
73 &self.config
74 }
75 fn processor_chain(&mut self) -> &mut ProcessorChain {
76 &mut self.processor_chain
77 }
78
79 async fn handshake(&mut self, _offer: String, _timeout: Option<Duration>) -> Result<String> {
80 Ok("".to_string())
81 }
82 async fn update_remote_description(&mut self, _answer: &String) -> Result<()> {
83 Ok(())
84 }
85
86 async fn start(
87 &self,
88 event_sender: EventSender,
89 packet_sender: TrackPacketSender,
90 ) -> Result<()> {
91 let track_id = self.track_id.clone();
92 let token = self.cancel_token.clone();
93 let mut audio_from_ws = match self.rx.lock().unwrap().take() {
94 Some(rx) => rx,
95 None => {
96 warn!(track_id, "no audio from ws");
97 return Ok(());
98 }
99 };
100 let sample_rate = self.config.samplerate;
101 let payload_type = self.payload_type;
102 let start_time = crate::media::get_timestamp();
103 let ssrc = self.ssrc;
104 tokio::spawn(async move {
105 let track_id_clone = track_id.clone();
106 let audio_from_ws_loop = async move {
107 let mut sequence_number = 0;
108 while let Some(bytes) = audio_from_ws.recv().await {
109 sequence_number += 1;
110
111 let samples = match payload_type {
112 u8::MAX => Samples::PCM {
113 samples: bytes_to_samples(&bytes.to_vec()),
114 },
115 _ => Samples::RTP {
116 sequence_number,
117 payload_type,
118 payload: bytes.to_vec(),
119 },
120 };
121
122 let packet = AudioFrame {
123 track_id: track_id_clone.clone(),
124 samples,
125 timestamp: crate::media::get_timestamp(),
126 sample_rate,
127 };
128 match packet_sender.send(packet) {
129 Ok(_) => (),
130 Err(e) => {
131 warn!("error sending packet: {}", e);
132 break;
133 }
134 }
135 }
136 };
137
138 select! {
139 _ = token.cancelled() => {
140 info!("RTC process cancelled");
141 },
142 _ = audio_from_ws_loop => {
143 info!("audio_from_ws_loop");
144 }
145 };
146
147 event_sender
148 .send(SessionEvent::TrackEnd {
149 track_id,
150 timestamp: crate::media::get_timestamp(),
151 duration: crate::media::get_timestamp() - start_time,
152 ssrc,
153 play_id: None,
154 })
155 .ok();
156 });
157 Ok(())
158 }
159
160 async fn stop(&self) -> Result<()> {
161 self.cancel_token.cancel();
162 Ok(())
163 }
164
165 async fn send_packet(&self, packet: &AudioFrame) -> Result<()> {
166 let (_, payload) = self.encoder.encode(self.payload_type, packet.clone());
167 if payload.is_empty() {
168 return Ok(());
169 }
170 self.event_sender
171 .send(SessionEvent::Binary {
172 track_id: self.track_id.clone(),
173 timestamp: crate::media::get_timestamp(),
174 data: payload,
175 })
176 .map(|_| ())
177 .map_err(|_| anyhow::anyhow!("error sending binary event"))
178 }
179}