active_call/media/track/
websocket.rs1use super::{Track, TrackConfig, TrackPacketSender, track_codec::TrackCodec};
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::Samples,
6 media::TrackId,
7 media::processor::ProcessorChain,
8};
9use anyhow::Result;
10use async_trait::async_trait;
11use audio_codec::bytes_to_samples;
12use bytes::Bytes;
13use std::{sync::Mutex, time::Duration};
14use tokio::select;
15use tokio_util::sync::CancellationToken;
16use tracing::{info, warn};
17
18pub type WebsocketBytesSender = tokio::sync::mpsc::UnboundedSender<Bytes>;
19pub type WebsocketBytesReceiver = tokio::sync::mpsc::UnboundedReceiver<Bytes>;
20
21pub struct WebsocketTrack {
22 track_id: TrackId,
23 config: TrackConfig,
24 cancel_token: CancellationToken,
25 processor_chain: ProcessorChain,
26 rx: Mutex<Option<WebsocketBytesReceiver>>,
27 encoder: TrackCodec,
28 payload_type: u8,
29 event_sender: EventSender,
30 ssrc: u32,
31}
32
33impl WebsocketTrack {
34 pub fn new(
35 cancel_token: CancellationToken,
36 track_id: TrackId,
37 track_config: TrackConfig,
38 event_sender: EventSender,
39 audio_receiver: WebsocketBytesReceiver,
40 codec: Option<String>,
41 ssrc: u32,
42 ) -> Self {
43 let processor_chain = ProcessorChain::new(track_config.samplerate);
44 let payload_type = match codec.unwrap_or("pcm".to_string()).to_lowercase().as_str() {
45 "pcmu" => 0,
46 "pcma" => 8,
47 "g722" => 9,
48 _ => u8::MAX, };
50 Self {
51 track_id,
52 config: track_config,
53 cancel_token,
54 processor_chain,
55 rx: Mutex::new(Some(audio_receiver)),
56 encoder: TrackCodec::new(),
57 payload_type,
58 event_sender,
59 ssrc,
60 }
61 }
62}
63
64#[async_trait]
65impl Track for WebsocketTrack {
66 fn ssrc(&self) -> u32 {
67 self.ssrc
68 }
69 fn id(&self) -> &TrackId {
70 &self.track_id
71 }
72 fn config(&self) -> &TrackConfig {
73 &self.config
74 }
75 fn processor_chain(&mut self) -> &mut ProcessorChain {
76 &mut self.processor_chain
77 }
78
79 async fn handshake(&mut self, _offer: String, _timeout: Option<Duration>) -> Result<String> {
80 Ok("".to_string())
81 }
82 async fn update_remote_description(&mut self, _answer: &String) -> Result<()> {
83 Ok(())
84 }
85
86 async fn start(
87 &mut self,
88 event_sender: EventSender,
89 packet_sender: TrackPacketSender,
90 ) -> Result<()> {
91 let track_id = self.track_id.clone();
92 let token = self.cancel_token.clone();
93 let mut audio_from_ws = match self.rx.lock().unwrap().take() {
94 Some(rx) => rx,
95 None => {
96 warn!(track_id, "no audio from ws");
97 return Ok(());
98 }
99 };
100 let sample_rate = self.config.samplerate;
101 let channels = self.config.channels;
102 let payload_type = self.payload_type;
103 let start_time = crate::media::get_timestamp();
104 let ssrc = self.ssrc;
105 let mut processor_chain = self.processor_chain.clone();
106 crate::spawn(async move {
107 let track_id_clone = track_id.clone();
108 let audio_from_ws_loop = async move {
109 let mut sequence_number = 0;
110 while let Some(bytes) = audio_from_ws.recv().await {
111 sequence_number += 1;
112
113 let samples = match payload_type {
114 u8::MAX => Samples::PCM {
115 samples: bytes_to_samples(&bytes.to_vec()),
116 },
117 _ => Samples::RTP {
118 sequence_number,
119 payload_type,
120 payload: bytes.to_vec(),
121 },
122 };
123
124 let mut packet = AudioFrame {
125 track_id: track_id_clone.clone(),
126 samples,
127 timestamp: crate::media::get_timestamp(),
128 sample_rate,
129 channels,
130 ..Default::default()
131 };
132
133 if let Err(e) = processor_chain.process_frame(&mut packet) {
134 warn!("error processing frame: {}", e);
135 }
136
137 match packet_sender.send(packet) {
138 Ok(_) => (),
139 Err(e) => {
140 warn!("error sending packet: {}", e);
141 break;
142 }
143 }
144 }
145 };
146
147 select! {
148 _ = token.cancelled() => {
149 info!("RTC process cancelled");
150 },
151 _ = audio_from_ws_loop => {
152 info!("audio_from_ws_loop");
153 }
154 };
155
156 event_sender
157 .send(SessionEvent::TrackEnd {
158 track_id,
159 timestamp: crate::media::get_timestamp(),
160 duration: crate::media::get_timestamp() - start_time,
161 ssrc,
162 play_id: None,
163 })
164 .ok();
165 });
166 Ok(())
167 }
168
169 async fn stop(&self) -> Result<()> {
170 self.cancel_token.cancel();
171 Ok(())
172 }
173
174 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
175 let packet = packet.clone();
176 let (_, payload) = self.encoder.encode(self.payload_type, packet);
180 if payload.is_empty() {
181 return Ok(());
182 }
183 self.event_sender
184 .send(SessionEvent::Binary {
185 track_id: self.track_id.clone(),
186 timestamp: crate::media::get_timestamp(),
187 data: payload,
188 })
189 .map(|_| ())
190 .map_err(|_| anyhow::anyhow!("error sending binary event"))
191 }
192}